Skip to content

Commit a08713f

Browse files
Migrate Cassandra extension project (#3913)
* Migrate Cassandra extension project * Add `spring-integration-cassandra` module based on the extension project * Add Java DSL for Cassandra module * Add documentation * Add `CassandraContainerTest` based on a Testcontainers * Fix language in docs Co-authored-by: Gary Russell <[email protected]> * * Fix `reactive-streams.adoc` for a proper link to the new `spring-integration-cassandra` module Co-authored-by: Gary Russell <[email protected]>
1 parent 83f2a9c commit a08713f

34 files changed

+2357
-3
lines changed

build.gradle

+11-1
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ ext {
110110
springSecurityVersion = project.hasProperty('springSecurityVersion') ? project.springSecurityVersion : '6.0.0-SNAPSHOT'
111111
springVersion = project.hasProperty('springVersion') ? project.springVersion : '6.0.0-SNAPSHOT'
112112
springWsVersion = '4.0.0-SNAPSHOT'
113-
testcontainersVersion = '1.17.3'
113+
testcontainersVersion = '1.17.5'
114114
tomcatVersion = '10.0.23'
115115
xmlUnitVersion = '2.9.0'
116116
xstreamVersion = '1.4.19'
@@ -498,6 +498,16 @@ project('spring-integration-camel') {
498498
}
499499
}
500500

501+
project('spring-integration-cassandra') {
502+
description = 'Spring Integration Support for Apache Cassandra'
503+
504+
dependencies {
505+
api project(':spring-integration-core')
506+
api 'org.springframework.data:spring-data-cassandra'
507+
508+
testImplementation 'org.testcontainers:cassandra'
509+
}
510+
}
501511

502512
project('spring-integration-core') {
503513
description = 'Spring Integration Core'
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.cassandra.config.xml;
18+
19+
import org.springframework.integration.config.xml.AbstractIntegrationNamespaceHandler;
20+
21+
/**
22+
* The namespace handler for "int-cassandra" namespace.
23+
*
24+
* @author Artem Bilan
25+
* @author Filippo Balicchia
26+
*
27+
* @since 6.0
28+
*/
29+
public class CassandraNamespaceHandler extends AbstractIntegrationNamespaceHandler {
30+
31+
@Override
32+
public void init() {
33+
registerBeanDefinitionParser("outbound-channel-adapter", new CassandraOutboundChannelAdapterParser());
34+
registerBeanDefinitionParser("outbound-gateway", new CassandraOutboundGatewayParser());
35+
}
36+
37+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.cassandra.config.xml;
18+
19+
import org.w3c.dom.Element;
20+
21+
import org.springframework.beans.factory.support.AbstractBeanDefinition;
22+
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
23+
import org.springframework.beans.factory.xml.ParserContext;
24+
import org.springframework.integration.cassandra.outbound.CassandraMessageHandler;
25+
import org.springframework.integration.config.xml.AbstractOutboundChannelAdapterParser;
26+
27+
/**
28+
* The parser for the {@code <int-cassandra:outbound-channel-adapter>}.
29+
*
30+
* @author Filippo Balicchia
31+
* @author Artem Bilan
32+
*
33+
* @since 6.0
34+
*/
35+
public class CassandraOutboundChannelAdapterParser extends AbstractOutboundChannelAdapterParser {
36+
37+
@Override
38+
protected AbstractBeanDefinition parseConsumer(Element element, ParserContext parserContext) {
39+
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(CassandraMessageHandler.class);
40+
builder.addPropertyValue("producesReply", false);
41+
CassandraParserUtils.processOutboundTypeAttributes(element, parserContext, builder);
42+
return builder.getBeanDefinition();
43+
}
44+
45+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.cassandra.config.xml;
18+
19+
import org.w3c.dom.Element;
20+
21+
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
22+
import org.springframework.beans.factory.xml.ParserContext;
23+
import org.springframework.integration.cassandra.outbound.CassandraMessageHandler;
24+
import org.springframework.integration.config.xml.AbstractConsumerEndpointParser;
25+
import org.springframework.integration.config.xml.IntegrationNamespaceUtils;
26+
27+
/**
28+
* The parser for the {@code <int-cassandra:outbound-gateway>}.
29+
*
30+
* @author Filippo Balicchia
31+
* @author Artem Bilan
32+
*
33+
* @since 6.0
34+
*/
35+
public class CassandraOutboundGatewayParser extends AbstractConsumerEndpointParser {
36+
37+
38+
@Override
39+
protected String getInputChannelAttributeName() {
40+
return "request-channel";
41+
}
42+
43+
@Override
44+
protected BeanDefinitionBuilder parseHandler(Element element, ParserContext parserContext) {
45+
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(CassandraMessageHandler.class);
46+
builder.addPropertyValue("producesReply", true);
47+
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "reply-channel", "outputChannel");
48+
CassandraParserUtils.processOutboundTypeAttributes(element, parserContext, builder);
49+
return builder;
50+
}
51+
52+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.cassandra.config.xml;
18+
19+
import java.util.List;
20+
21+
import org.w3c.dom.Element;
22+
23+
import org.springframework.beans.factory.config.BeanDefinition;
24+
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
25+
import org.springframework.beans.factory.support.ManagedMap;
26+
import org.springframework.beans.factory.xml.AbstractBeanDefinitionParser;
27+
import org.springframework.beans.factory.xml.ParserContext;
28+
import org.springframework.integration.config.xml.IntegrationNamespaceUtils;
29+
import org.springframework.util.CollectionUtils;
30+
import org.springframework.util.StringUtils;
31+
import org.springframework.util.xml.DomUtils;
32+
33+
/**
34+
* The {@code int-cassandra} namespace XML parser helper.
35+
*
36+
* @author Filippo Balicchia
37+
* @author Artem Bilan
38+
*
39+
* @since 6.0
40+
*/
41+
public final class CassandraParserUtils {
42+
43+
public static void processOutboundTypeAttributes(Element element, ParserContext parserContext,
44+
BeanDefinitionBuilder builder) {
45+
46+
String cassandraTemplate = element.getAttribute("cassandra-template");
47+
String mode = element.getAttribute("mode");
48+
String ingestQuery = element.getAttribute("ingest-query");
49+
String query = element.getAttribute("query");
50+
51+
if (!StringUtils.hasText(cassandraTemplate)) {
52+
parserContext.getReaderContext().error("cassandra-template is required", element);
53+
}
54+
55+
builder.addConstructorArgReference(cassandraTemplate);
56+
if (StringUtils.hasText(mode)) {
57+
builder.addConstructorArgValue(mode);
58+
}
59+
60+
BeanDefinition statementExpressionDef = IntegrationNamespaceUtils
61+
.createExpressionDefIfAttributeDefined("statement-expression", element);
62+
63+
if (statementExpressionDef != null) {
64+
builder.addPropertyValue("statementExpression", statementExpressionDef);
65+
}
66+
67+
if (!areMutuallyExclusive(query, statementExpressionDef, ingestQuery)) {
68+
parserContext.getReaderContext()
69+
.error("'query', 'ingest-query', 'statement-expression' are mutually exclusive", element);
70+
}
71+
72+
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "write-options");
73+
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "ingest-query");
74+
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "query");
75+
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "async");
76+
77+
List<Element> parameterExpressions = DomUtils.getChildElementsByTagName(element, "parameter-expression");
78+
if (!CollectionUtils.isEmpty(parameterExpressions)) {
79+
ManagedMap<String, Object> parameterExpressionsMap = new ManagedMap<>();
80+
for (Element parameterExpressionElement : parameterExpressions) {
81+
String name = parameterExpressionElement.getAttribute(AbstractBeanDefinitionParser.NAME_ATTRIBUTE);
82+
BeanDefinition expression =
83+
IntegrationNamespaceUtils.createExpressionDefIfAttributeDefined(
84+
IntegrationNamespaceUtils.EXPRESSION_ATTRIBUTE, parameterExpressionElement);
85+
if (expression != null) {
86+
parameterExpressionsMap.put(name, expression);
87+
}
88+
}
89+
builder.addPropertyValue("parameterExpressions", parameterExpressionsMap);
90+
}
91+
92+
}
93+
94+
public static boolean areMutuallyExclusive(String query, BeanDefinition statementExpressionDef,
95+
String ingestQuery) {
96+
97+
return !StringUtils.hasText(query) && statementExpressionDef == null && !StringUtils.hasText(ingestQuery)
98+
|| !(StringUtils.hasText(query) && statementExpressionDef != null && StringUtils.hasText(ingestQuery))
99+
&& (StringUtils.hasText(query) ^ statementExpressionDef != null) ^ StringUtils.hasText(ingestQuery);
100+
}
101+
102+
private CassandraParserUtils() {
103+
}
104+
105+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
/**
18+
* Provides classes for Cassandra parsers and namespace handlers.
19+
*/
20+
package org.springframework.integration.cassandra.config.xml;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.cassandra.dsl;
18+
19+
import org.springframework.data.cassandra.core.ReactiveCassandraOperations;
20+
import org.springframework.integration.cassandra.outbound.CassandraMessageHandler;
21+
22+
/**
23+
* Factory class for Apache Cassandra components DSL.
24+
*
25+
* @author Artem Bilan
26+
*
27+
* @since 6.0
28+
*/
29+
public final class Cassandra {
30+
31+
/**
32+
* Create an instance of {@link CassandraMessageHandlerSpec} for the provided {@link ReactiveCassandraOperations}.
33+
* @param cassandraOperations the {@link ReactiveCassandraOperations} to use.
34+
* @return the spec.
35+
*/
36+
public static CassandraMessageHandlerSpec outboundChannelAdapter(ReactiveCassandraOperations cassandraOperations) {
37+
return new CassandraMessageHandlerSpec(cassandraOperations);
38+
}
39+
40+
/**
41+
* Create an instance of {@link CassandraMessageHandlerSpec} for the provided {@link ReactiveCassandraOperations}.
42+
* @param cassandraOperations the {@link ReactiveCassandraOperations} to use.
43+
* @param queryType the {@link CassandraMessageHandler.Type} to use.
44+
* @return the spec.
45+
*/
46+
public static CassandraMessageHandlerSpec outboundChannelAdapter(ReactiveCassandraOperations cassandraOperations,
47+
CassandraMessageHandler.Type queryType) {
48+
49+
return new CassandraMessageHandlerSpec(cassandraOperations, queryType);
50+
}
51+
52+
/**
53+
* Create an instance of {@link CassandraMessageHandlerSpec} for the provided {@link ReactiveCassandraOperations}
54+
* in an outbound gateway mode.
55+
* @param cassandraOperations the {@link ReactiveCassandraOperations} to use.
56+
* @return the spec.
57+
*/
58+
public static CassandraMessageHandlerSpec outboundGateway(ReactiveCassandraOperations cassandraOperations) {
59+
return new CassandraMessageHandlerSpec(cassandraOperations)
60+
.producesReply(true);
61+
}
62+
63+
/**
64+
* Create an instance of {@link CassandraMessageHandlerSpec} for the provided {@link ReactiveCassandraOperations}
65+
* in an outbound gateway mode.
66+
* @param cassandraOperations the {@link ReactiveCassandraOperations} to use.
67+
* @param queryType the {@link CassandraMessageHandler.Type} to use.
68+
* @return the spec.
69+
*/
70+
public static CassandraMessageHandlerSpec outboundGateway(ReactiveCassandraOperations cassandraOperations,
71+
CassandraMessageHandler.Type queryType) {
72+
73+
return new CassandraMessageHandlerSpec(cassandraOperations, queryType)
74+
.producesReply(true);
75+
}
76+
77+
private Cassandra() {
78+
}
79+
80+
}

0 commit comments

Comments
 (0)