diff --git a/build.gradle b/build.gradle index f946e2e2b19..57a215f3fff 100644 --- a/build.gradle +++ b/build.gradle @@ -110,7 +110,7 @@ ext { springSecurityVersion = project.hasProperty('springSecurityVersion') ? project.springSecurityVersion : '6.0.0-SNAPSHOT' springVersion = project.hasProperty('springVersion') ? project.springVersion : '6.0.0-SNAPSHOT' springWsVersion = '4.0.0-SNAPSHOT' - testcontainersVersion = '1.17.3' + testcontainersVersion = '1.17.5' tomcatVersion = '10.0.23' xmlUnitVersion = '2.9.0' xstreamVersion = '1.4.19' @@ -497,6 +497,16 @@ project('spring-integration-camel') { } } +project('spring-integration-cassandra') { + description = 'Spring Integration Support for Apache Cassandra' + + dependencies { + api project(':spring-integration-core') + api 'org.springframework.data:spring-data-cassandra' + + testImplementation 'org.testcontainers:cassandra' + } +} project('spring-integration-core') { description = 'Spring Integration Core' diff --git a/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/config/xml/CassandraNamespaceHandler.java b/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/config/xml/CassandraNamespaceHandler.java new file mode 100644 index 00000000000..d364f4c41ca --- /dev/null +++ b/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/config/xml/CassandraNamespaceHandler.java @@ -0,0 +1,37 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.cassandra.config.xml; + +import org.springframework.integration.config.xml.AbstractIntegrationNamespaceHandler; + +/** + * The namespace handler for "int-cassandra" namespace. + * + * @author Artem Bilan + * @author Filippo Balicchia + * + * @since 6.0 + */ +public class CassandraNamespaceHandler extends AbstractIntegrationNamespaceHandler { + + @Override + public void init() { + registerBeanDefinitionParser("outbound-channel-adapter", new CassandraOutboundChannelAdapterParser()); + registerBeanDefinitionParser("outbound-gateway", new CassandraOutboundGatewayParser()); + } + +} diff --git a/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/config/xml/CassandraOutboundChannelAdapterParser.java b/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/config/xml/CassandraOutboundChannelAdapterParser.java new file mode 100644 index 00000000000..f640941788f --- /dev/null +++ b/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/config/xml/CassandraOutboundChannelAdapterParser.java @@ -0,0 +1,45 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.cassandra.config.xml; + +import org.w3c.dom.Element; + +import org.springframework.beans.factory.support.AbstractBeanDefinition; +import org.springframework.beans.factory.support.BeanDefinitionBuilder; +import org.springframework.beans.factory.xml.ParserContext; +import org.springframework.integration.cassandra.outbound.CassandraMessageHandler; +import org.springframework.integration.config.xml.AbstractOutboundChannelAdapterParser; + +/** + * The parser for the {@code }. + * + * @author Filippo Balicchia + * @author Artem Bilan + * + * @since 6.0 + */ +public class CassandraOutboundChannelAdapterParser extends AbstractOutboundChannelAdapterParser { + + @Override + protected AbstractBeanDefinition parseConsumer(Element element, ParserContext parserContext) { + BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(CassandraMessageHandler.class); + builder.addPropertyValue("producesReply", false); + CassandraParserUtils.processOutboundTypeAttributes(element, parserContext, builder); + return builder.getBeanDefinition(); + } + +} diff --git a/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/config/xml/CassandraOutboundGatewayParser.java b/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/config/xml/CassandraOutboundGatewayParser.java new file mode 100644 index 00000000000..5be89011d27 --- /dev/null +++ b/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/config/xml/CassandraOutboundGatewayParser.java @@ -0,0 +1,52 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.cassandra.config.xml; + +import org.w3c.dom.Element; + +import org.springframework.beans.factory.support.BeanDefinitionBuilder; +import org.springframework.beans.factory.xml.ParserContext; +import org.springframework.integration.cassandra.outbound.CassandraMessageHandler; +import org.springframework.integration.config.xml.AbstractConsumerEndpointParser; +import org.springframework.integration.config.xml.IntegrationNamespaceUtils; + +/** + * The parser for the {@code }. + * + * @author Filippo Balicchia + * @author Artem Bilan + * + * @since 6.0 + */ +public class CassandraOutboundGatewayParser extends AbstractConsumerEndpointParser { + + + @Override + protected String getInputChannelAttributeName() { + return "request-channel"; + } + + @Override + protected BeanDefinitionBuilder parseHandler(Element element, ParserContext parserContext) { + BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(CassandraMessageHandler.class); + builder.addPropertyValue("producesReply", true); + IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "reply-channel", "outputChannel"); + CassandraParserUtils.processOutboundTypeAttributes(element, parserContext, builder); + return builder; + } + +} diff --git a/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/config/xml/CassandraParserUtils.java b/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/config/xml/CassandraParserUtils.java new file mode 100644 index 00000000000..f5b1e90e34c --- /dev/null +++ b/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/config/xml/CassandraParserUtils.java @@ -0,0 +1,105 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.cassandra.config.xml; + +import java.util.List; + +import org.w3c.dom.Element; + +import org.springframework.beans.factory.config.BeanDefinition; +import org.springframework.beans.factory.support.BeanDefinitionBuilder; +import org.springframework.beans.factory.support.ManagedMap; +import org.springframework.beans.factory.xml.AbstractBeanDefinitionParser; +import org.springframework.beans.factory.xml.ParserContext; +import org.springframework.integration.config.xml.IntegrationNamespaceUtils; +import org.springframework.util.CollectionUtils; +import org.springframework.util.StringUtils; +import org.springframework.util.xml.DomUtils; + +/** + * The {@code int-cassandra} namespace XML parser helper. + * + * @author Filippo Balicchia + * @author Artem Bilan + * + * @since 6.0 + */ +public final class CassandraParserUtils { + + public static void processOutboundTypeAttributes(Element element, ParserContext parserContext, + BeanDefinitionBuilder builder) { + + String cassandraTemplate = element.getAttribute("cassandra-template"); + String mode = element.getAttribute("mode"); + String ingestQuery = element.getAttribute("ingest-query"); + String query = element.getAttribute("query"); + + if (!StringUtils.hasText(cassandraTemplate)) { + parserContext.getReaderContext().error("cassandra-template is required", element); + } + + builder.addConstructorArgReference(cassandraTemplate); + if (StringUtils.hasText(mode)) { + builder.addConstructorArgValue(mode); + } + + BeanDefinition statementExpressionDef = IntegrationNamespaceUtils + .createExpressionDefIfAttributeDefined("statement-expression", element); + + if (statementExpressionDef != null) { + builder.addPropertyValue("statementExpression", statementExpressionDef); + } + + if (!areMutuallyExclusive(query, statementExpressionDef, ingestQuery)) { + parserContext.getReaderContext() + .error("'query', 'ingest-query', 'statement-expression' are mutually exclusive", element); + } + + IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "write-options"); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "ingest-query"); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "query"); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "async"); + + List parameterExpressions = DomUtils.getChildElementsByTagName(element, "parameter-expression"); + if (!CollectionUtils.isEmpty(parameterExpressions)) { + ManagedMap parameterExpressionsMap = new ManagedMap<>(); + for (Element parameterExpressionElement : parameterExpressions) { + String name = parameterExpressionElement.getAttribute(AbstractBeanDefinitionParser.NAME_ATTRIBUTE); + BeanDefinition expression = + IntegrationNamespaceUtils.createExpressionDefIfAttributeDefined( + IntegrationNamespaceUtils.EXPRESSION_ATTRIBUTE, parameterExpressionElement); + if (expression != null) { + parameterExpressionsMap.put(name, expression); + } + } + builder.addPropertyValue("parameterExpressions", parameterExpressionsMap); + } + + } + + public static boolean areMutuallyExclusive(String query, BeanDefinition statementExpressionDef, + String ingestQuery) { + + return !StringUtils.hasText(query) && statementExpressionDef == null && !StringUtils.hasText(ingestQuery) + || !(StringUtils.hasText(query) && statementExpressionDef != null && StringUtils.hasText(ingestQuery)) + && (StringUtils.hasText(query) ^ statementExpressionDef != null) ^ StringUtils.hasText(ingestQuery); + } + + private CassandraParserUtils() { + } + +} diff --git a/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/config/xml/package-info.java b/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/config/xml/package-info.java new file mode 100644 index 00000000000..521362d517c --- /dev/null +++ b/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/config/xml/package-info.java @@ -0,0 +1,20 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Provides classes for Cassandra parsers and namespace handlers. + */ +package org.springframework.integration.cassandra.config.xml; diff --git a/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/dsl/Cassandra.java b/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/dsl/Cassandra.java new file mode 100644 index 00000000000..f5b66b05005 --- /dev/null +++ b/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/dsl/Cassandra.java @@ -0,0 +1,80 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.cassandra.dsl; + +import org.springframework.data.cassandra.core.ReactiveCassandraOperations; +import org.springframework.integration.cassandra.outbound.CassandraMessageHandler; + +/** + * Factory class for Apache Cassandra components DSL. + * + * @author Artem Bilan + * + * @since 6.0 + */ +public final class Cassandra { + + /** + * Create an instance of {@link CassandraMessageHandlerSpec} for the provided {@link ReactiveCassandraOperations}. + * @param cassandraOperations the {@link ReactiveCassandraOperations} to use. + * @return the spec. + */ + public static CassandraMessageHandlerSpec outboundChannelAdapter(ReactiveCassandraOperations cassandraOperations) { + return new CassandraMessageHandlerSpec(cassandraOperations); + } + + /** + * Create an instance of {@link CassandraMessageHandlerSpec} for the provided {@link ReactiveCassandraOperations}. + * @param cassandraOperations the {@link ReactiveCassandraOperations} to use. + * @param queryType the {@link CassandraMessageHandler.Type} to use. + * @return the spec. + */ + public static CassandraMessageHandlerSpec outboundChannelAdapter(ReactiveCassandraOperations cassandraOperations, + CassandraMessageHandler.Type queryType) { + + return new CassandraMessageHandlerSpec(cassandraOperations, queryType); + } + + /** + * Create an instance of {@link CassandraMessageHandlerSpec} for the provided {@link ReactiveCassandraOperations} + * in an outbound gateway mode. + * @param cassandraOperations the {@link ReactiveCassandraOperations} to use. + * @return the spec. + */ + public static CassandraMessageHandlerSpec outboundGateway(ReactiveCassandraOperations cassandraOperations) { + return new CassandraMessageHandlerSpec(cassandraOperations) + .producesReply(true); + } + + /** + * Create an instance of {@link CassandraMessageHandlerSpec} for the provided {@link ReactiveCassandraOperations} + * in an outbound gateway mode. + * @param cassandraOperations the {@link ReactiveCassandraOperations} to use. + * @param queryType the {@link CassandraMessageHandler.Type} to use. + * @return the spec. + */ + public static CassandraMessageHandlerSpec outboundGateway(ReactiveCassandraOperations cassandraOperations, + CassandraMessageHandler.Type queryType) { + + return new CassandraMessageHandlerSpec(cassandraOperations, queryType) + .producesReply(true); + } + + private Cassandra() { + } + +} diff --git a/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/dsl/CassandraMessageHandlerSpec.java b/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/dsl/CassandraMessageHandlerSpec.java new file mode 100644 index 00000000000..5a06c0e5958 --- /dev/null +++ b/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/dsl/CassandraMessageHandlerSpec.java @@ -0,0 +1,160 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.cassandra.dsl; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; + +import org.springframework.data.cassandra.core.ReactiveCassandraOperations; +import org.springframework.data.cassandra.core.cql.WriteOptions; +import org.springframework.expression.Expression; +import org.springframework.integration.cassandra.outbound.CassandraMessageHandler; +import org.springframework.integration.dsl.MessageHandlerSpec; +import org.springframework.integration.expression.FunctionExpression; +import org.springframework.messaging.Message; + +import com.datastax.oss.driver.api.core.cql.Statement; + +/** + * The {@link MessageHandlerSpec} for {@link CassandraMessageHandler}. + * + * @author Artem Bilan + * + * @since 6.0 + */ +public class CassandraMessageHandlerSpec extends + MessageHandlerSpec { + + private final Map parameterExpressions = new HashMap<>(); + + protected CassandraMessageHandlerSpec(ReactiveCassandraOperations cassandraOperations) { + this.target = new CassandraMessageHandler(cassandraOperations); + } + + protected CassandraMessageHandlerSpec(ReactiveCassandraOperations cassandraOperations, + CassandraMessageHandler.Type queryType) { + + this.target = new CassandraMessageHandler(cassandraOperations, queryType); + } + + protected CassandraMessageHandlerSpec producesReply(boolean producesReply) { + this.target.setProducesReply(producesReply); + return _this(); + } + + /** + * Set an ingest query. + * @param ingestQuery ingest query to use. + * @return this spec + */ + public CassandraMessageHandlerSpec ingestQuery(String ingestQuery) { + this.target.setIngestQuery(ingestQuery); + return _this(); + } + + /** + * Set a {@link WriteOptions} for {@code INSERT}, {@code UPDATE} or {@code DELETE} operations. + * @param writeOptions the {@link WriteOptions} to use. + * @return this spec + */ + public CassandraMessageHandlerSpec writeOptions(WriteOptions writeOptions) { + this.target.setWriteOptions(writeOptions); + return _this(); + } + + /** + * Set a SpEL expression to evaluate a {@link Statement} against request message. + * @param statementExpression the SpEL expression to use. + * @return this spec + */ + public CassandraMessageHandlerSpec statementExpression(String statementExpression) { + return statementExpression(PARSER.parseExpression(statementExpression)); + } + + /** + * Set a SpEL expression to evaluate a {@link Statement} against request message. + * @param statementExpression the SpEL expression to use. + * @return this spec + */ + public CassandraMessageHandlerSpec statementExpression(Expression statementExpression) { + this.target.setStatementExpression(statementExpression); + return _this(); + } + + /** + * Set a {@link Function} to evaluate a {@link Statement} against request message. + * @param statementFunction the function to use. + * @return this spec + */ + public CassandraMessageHandlerSpec statementFunction(Function, Statement> statementFunction) { + this.target.setStatementProcessor(statementFunction::apply); + return _this(); + } + + /** + * Set a {@code SELECT} query. + * @param query the CQL query to execute + * @return this spec + */ + public CassandraMessageHandlerSpec query(String query) { + this.target.setQuery(query); + return _this(); + } + + /** + * Set a map for named parameters and expressions for their values against a request message. + * @param parameterExpressions the map to use. + * @return this spec + */ + public CassandraMessageHandlerSpec parameterExpressions(Map parameterExpressions) { + this.target.setParameterExpressions(parameterExpressions); + return _this(); + } + + /** + * Add a named bindable parameter with a SpEL expression to evaluate its value against a request message. + * @param name the name of parameter. + * @param expression the SpEL expression for parameter value. + * @return this spec + */ + public CassandraMessageHandlerSpec parameter(String name, String expression) { + return parameter(name, PARSER.parseExpression(expression)); + } + + /** + * Add a named bindable parameter with a function to evaluate its value against a request message. + * @param name the name of parameter. + * @param function the function for parameter value. + * @return this spec + */ + public CassandraMessageHandlerSpec parameter(String name, Function, ?> function) { + return parameter(name, new FunctionExpression<>(function)); + } + + /** + * Add a named bindable parameter with a SpEL expression to evaluate its value against a request message. + * @param name the name of parameter. + * @param expression the SpEL expression for parameter value. + * @return this spec + */ + public CassandraMessageHandlerSpec parameter(String name, Expression expression) { + this.parameterExpressions.put(name, expression); + return parameterExpressions(this.parameterExpressions); + } + +} diff --git a/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/dsl/package-info.java b/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/dsl/package-info.java new file mode 100644 index 00000000000..2eaba53889e --- /dev/null +++ b/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/dsl/package-info.java @@ -0,0 +1,4 @@ +/** + * Provides Apache Cassandra Components support for the Java DSL. + */ +package org.springframework.integration.cassandra.dsl; diff --git a/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/outbound/CassandraMessageHandler.java b/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/outbound/CassandraMessageHandler.java new file mode 100644 index 00000000000..fd0c03addf8 --- /dev/null +++ b/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/outbound/CassandraMessageHandler.java @@ -0,0 +1,359 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.cassandra.outbound; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.StreamSupport; + +import org.springframework.dao.DataAccessException; +import org.springframework.data.cassandra.ReactiveResultSet; +import org.springframework.data.cassandra.ReactiveSession; +import org.springframework.data.cassandra.core.InsertOptions; +import org.springframework.data.cassandra.core.ReactiveCassandraOperations; +import org.springframework.data.cassandra.core.UpdateOptions; +import org.springframework.data.cassandra.core.WriteResult; +import org.springframework.data.cassandra.core.cql.QueryOptionsUtil; +import org.springframework.data.cassandra.core.cql.ReactiveSessionCallback; +import org.springframework.data.cassandra.core.cql.WriteOptions; +import org.springframework.expression.EvaluationContext; +import org.springframework.expression.Expression; +import org.springframework.expression.TypeLocator; +import org.springframework.expression.spel.support.StandardEvaluationContext; +import org.springframework.expression.spel.support.StandardTypeLocator; +import org.springframework.integration.expression.ExpressionEvalMap; +import org.springframework.integration.expression.ExpressionUtils; +import org.springframework.integration.handler.AbstractReplyProducingMessageHandler; +import org.springframework.integration.handler.ExpressionEvaluatingMessageProcessor; +import org.springframework.integration.handler.MessageProcessor; +import org.springframework.messaging.Message; +import org.springframework.util.Assert; + +import com.datastax.oss.driver.api.core.DriverException; +import com.datastax.oss.driver.api.core.cql.BatchStatementBuilder; +import com.datastax.oss.driver.api.core.cql.BatchType; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import com.datastax.oss.driver.api.core.cql.Statement; +import com.datastax.oss.driver.api.querybuilder.QueryBuilder; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +/** + * An {@link AbstractReplyProducingMessageHandler} implementation for Cassandra outbound operations. + * + * @author Soby Chacko + * @author Artem Bilan + * @author Filippo Balicchia + * + * @since 6.0 + */ +public class CassandraMessageHandler extends AbstractReplyProducingMessageHandler { + + private final Map parameterExpressions = new HashMap<>(); + + private Type mode; + + private final ReactiveCassandraOperations cassandraOperations; + + private boolean producesReply; + + /** + * Prepared statement to use in association with high throughput ingestion. + */ + private String ingestQuery; + + /** + * Various options that can be used for Cassandra writes. + */ + private WriteOptions writeOptions = WriteOptions.empty(); + + private ReactiveSessionMessageCallback sessionMessageCallback; + + private EvaluationContext evaluationContext; + + public CassandraMessageHandler(ReactiveCassandraOperations cassandraOperations) { + this(cassandraOperations, Type.INSERT); + } + + public CassandraMessageHandler(ReactiveCassandraOperations cassandraOperations, + CassandraMessageHandler.Type queryType) { + + Assert.notNull(cassandraOperations, "'cassandraOperations' must not be null."); + Assert.notNull(queryType, "'queryType' must not be null."); + this.cassandraOperations = cassandraOperations; + this.mode = queryType; + setAsync(true); + switch (this.mode) { + + case INSERT: + this.writeOptions = InsertOptions.empty(); + break; + case UPDATE: + this.writeOptions = UpdateOptions.empty(); + break; + } + } + + public void setIngestQuery(String ingestQuery) { + Assert.hasText(ingestQuery, "'ingestQuery' must not be empty"); + this.ingestQuery = ingestQuery; + this.mode = Type.INSERT; + } + + public void setWriteOptions(WriteOptions writeOptions) { + Assert.notNull(writeOptions, "'writeOptions' must not be null"); + this.writeOptions = writeOptions; + } + + public void setProducesReply(boolean producesReply) { + this.producesReply = producesReply; + } + + public void setStatementExpressionString(String statementExpression) { + setStatementExpression(EXPRESSION_PARSER.parseExpression(statementExpression)); + } + + @SuppressWarnings("unchecked") + public void setStatementExpression(Expression statementExpression) { + ExpressionEvaluatingMessageProcessor expressionEvaluatingMessageProcessor = + new ExpressionEvaluatingMessageProcessor<>(statementExpression, Statement.class) { + + @Override + protected StandardEvaluationContext getEvaluationContext() { + return (StandardEvaluationContext) CassandraMessageHandler.this.evaluationContext; + } + + }; + setStatementProcessor((ExpressionEvaluatingMessageProcessor>) expressionEvaluatingMessageProcessor); + } + + public void setQuery(String query) { + Assert.hasText(query, "'query' must not be empty"); + this.sessionMessageCallback = + (session, requestMessage) -> + session.execute(query, + ExpressionEvalMap.from(this.parameterExpressions) + .usingEvaluationContext(this.evaluationContext) + .withRoot(requestMessage) + .build()); + this.mode = Type.STATEMENT; + } + + public void setParameterExpressions(Map parameterExpressions) { + Assert.notEmpty(parameterExpressions, "'parameterExpressions' must not be empty."); + this.parameterExpressions.clear(); + this.parameterExpressions.putAll(parameterExpressions); + } + + public void setStatementProcessor(MessageProcessor> statementProcessor) { + Assert.notNull(statementProcessor, "'statementProcessor' must not be null."); + this.sessionMessageCallback = + (session, requestMessage) -> + session.execute( + QueryOptionsUtil.addQueryOptions(statementProcessor.processMessage(requestMessage), + this.writeOptions)); + this.mode = Type.STATEMENT; + } + + @Override + public String getComponentType() { + return "cassandra:outbound-" + (this.producesReply ? "gateway" : "channel-adapter"); + } + + @Override + protected void doInit() { + super.doInit(); + + this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory()); + TypeLocator typeLocator = this.evaluationContext.getTypeLocator(); + if (typeLocator instanceof StandardTypeLocator) { + /* + * Register the Cassandra Query DSL package so they don't need a FQCN for QueryBuilder, for example. + */ + ((StandardTypeLocator) typeLocator).registerImport(QueryBuilder.class.getPackage().getName()); + } + } + + @Override + protected Object handleRequestMessage(Message requestMessage) { + Object payload = requestMessage.getPayload(); + + Mono result = null; + + Type mode = this.mode; + + if (payload instanceof Statement) { + mode = Type.STATEMENT; + } + + switch (mode) { + case INSERT: + result = handleInsert(payload); + break; + case UPDATE: + result = handleUpdate(payload); + break; + case DELETE: + result = handleDelete(payload); + break; + case STATEMENT: + result = handleStatement(requestMessage); + break; + } + + if (this.producesReply) { + return isAsync() ? result : result.block(); + } + else { + if (isAsync()) { + result.subscribe(); + } + else { + result.block(); + } + return null; + } + } + + @SuppressWarnings("unchecked") + private Mono handleInsert(Object payload) { + if (this.ingestQuery != null) { + Assert.isInstanceOf(List.class, payload, + "to perform 'ingest' the 'payload' must be of 'List>' type."); + List list = (List) payload; + for (Object o : list) { + Assert.isInstanceOf(List.class, o, + "to perform 'ingest' the 'payload' must be of 'List>' type."); + } + List> rows = (List>) payload; + return this.cassandraOperations.getReactiveCqlOperations() + .execute((ReactiveSessionCallback) session -> + session.prepare( + QueryOptionsUtil.addQueryOptions(SimpleStatement.newInstance(this.ingestQuery), + this.writeOptions)) + .flatMapMany(s -> + Flux.fromIterable(rows) + .map(row -> s.bind(row.toArray()))) + .collect(() -> new BatchStatementBuilder(BatchType.UNLOGGED), + BatchStatementBuilder::addStatement) + .map(BatchStatementBuilder::build) + .flatMap(session::execute) + .transform(this::transformToWriteResult)) + .next(); + } + else { + if (payload instanceof List) { + return this.cassandraOperations.batchOps() + .insert((List) payload, this.writeOptions) + .execute(); + } + else { + return this.cassandraOperations.insert(payload, (InsertOptions) this.writeOptions); + } + } + } + + private Mono handleUpdate(Object payload) { + if (payload instanceof List) { + return this.cassandraOperations.batchOps() + .update((List) payload, this.writeOptions) + .execute(); + } + else { + return this.cassandraOperations.update(payload, (UpdateOptions) this.writeOptions); + } + } + + private Mono handleDelete(Object payload) { + if (payload instanceof List) { + return this.cassandraOperations.batchOps() + .delete((List) payload) + .execute(); + } + else { + return this.cassandraOperations.delete(payload, this.writeOptions); + } + } + + private Mono handleStatement(Message requestMessage) { + Object payload = requestMessage.getPayload(); + Mono resultSetMono; + if (payload instanceof Statement) { + resultSetMono = this.cassandraOperations.getReactiveCqlOperations() + .queryForResultSet((Statement) payload); + } + else { + resultSetMono = this.cassandraOperations.getReactiveCqlOperations() + .execute((ReactiveSessionCallback) session -> + this.sessionMessageCallback.doInSession(session, requestMessage)) + .next(); + } + + return resultSetMono.transform(this::transformToWriteResult); + } + + private Mono transformToWriteResult(Mono resultSetMono) { + return resultSetMono.map(ReactiveWriteResult::new); + } + + /** + * The mode for the {@link CassandraMessageHandler}. + */ + public enum Type { + + /** + * Set a {@link CassandraMessageHandler} into an {@code insert} mode. + */ + INSERT, + + /** + * Set a {@link CassandraMessageHandler} into an {@code update} mode. + */ + UPDATE, + + /** + * Set a {@link CassandraMessageHandler} into a {@code delete} mode. + */ + DELETE, + + /** + * Set a {@link CassandraMessageHandler} into a {@code statement} mode. + */ + STATEMENT; + + } + + @FunctionalInterface + private interface ReactiveSessionMessageCallback { + + Mono doInSession(ReactiveSession session, Message requestMessage) + throws DriverException, DataAccessException; + + } + + private final class ReactiveWriteResult extends WriteResult { + + ReactiveWriteResult(ReactiveResultSet reactiveResultSet) { + super(reactiveResultSet.getAllExecutionInfo(), + reactiveResultSet.wasApplied(), + StreamSupport.stream(reactiveResultSet.availableRows().toIterable().spliterator(), false).toList()); + } + + } + +} diff --git a/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/outbound/package-info.java b/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/outbound/package-info.java new file mode 100644 index 00000000000..71c8531c65b --- /dev/null +++ b/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/outbound/package-info.java @@ -0,0 +1,20 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Provides classes supporting Cassandra outbound endpoints. + */ +package org.springframework.integration.cassandra.outbound; diff --git a/spring-integration-cassandra/src/main/resources/META-INF/spring.handlers b/spring-integration-cassandra/src/main/resources/META-INF/spring.handlers new file mode 100644 index 00000000000..48cebcf0835 --- /dev/null +++ b/spring-integration-cassandra/src/main/resources/META-INF/spring.handlers @@ -0,0 +1 @@ +http\://www.springframework.org/schema/integration/cassandra=org.springframework.integration.cassandra.config.xml.CassandraNamespaceHandler diff --git a/spring-integration-cassandra/src/main/resources/META-INF/spring.schemas b/spring-integration-cassandra/src/main/resources/META-INF/spring.schemas new file mode 100644 index 00000000000..7ee584bf09f --- /dev/null +++ b/spring-integration-cassandra/src/main/resources/META-INF/spring.schemas @@ -0,0 +1,3 @@ +http\://www.springframework.org/schema/integration/cassandra/spring-integration-cassandra.xsd=org/springframework/integration/config/xml/spring-integration-cassandra.xsd +https\://www.springframework.org/schema/integration/cassandra/spring-integration-cassandra.xsd=org/springframework/integration/config/xml/spring-integration-cassandra.xsd + diff --git a/spring-integration-cassandra/src/main/resources/META-INF/spring.tooling b/spring-integration-cassandra/src/main/resources/META-INF/spring.tooling new file mode 100644 index 00000000000..7bb885685c5 --- /dev/null +++ b/spring-integration-cassandra/src/main/resources/META-INF/spring.tooling @@ -0,0 +1,4 @@ +# Tooling related information for the integration cassandra namespace +http\://www.springframework.org/schema/integration/cassandra@name=integration cassandra Namespace +http\://www.springframework.org/schema/integration/cassandra@prefix=int-cassandra +http\://www.springframework.org/schema/integration/cassandra@icon=org/springframework/integration/config/xml/spring-integration-cassandra.gif diff --git a/spring-integration-cassandra/src/main/resources/org/springframework/integration/config/xml/spring-integration-cassandra.gif b/spring-integration-cassandra/src/main/resources/org/springframework/integration/config/xml/spring-integration-cassandra.gif new file mode 100644 index 00000000000..750667e608f Binary files /dev/null and b/spring-integration-cassandra/src/main/resources/org/springframework/integration/config/xml/spring-integration-cassandra.gif differ diff --git a/spring-integration-cassandra/src/main/resources/org/springframework/integration/config/xml/spring-integration-cassandra.xsd b/spring-integration-cassandra/src/main/resources/org/springframework/integration/config/xml/spring-integration-cassandra.xsd new file mode 100644 index 00000000000..989bcef2a28 --- /dev/null +++ b/spring-integration-cassandra/src/main/resources/org/springframework/integration/config/xml/spring-integration-cassandra.xsd @@ -0,0 +1,228 @@ + + + + + + + + + + + + + + Defines cassandra outbound channel adapter that + writes the contents of the + Message into Cassandra cluster + + + + + + + + + + Specify an expression for parameter variable placeholder in cql statement. + + + + + + + + + + + + + + Defines cassandra outbound gateway that + writes the contents of the + Message into Cassandra cluster + + + + + + + + + + Specify an expression for parameter variable placeholder in cql statement. + + + + + + + + Message Channel to which replies should be sent after being received from Cassandra + cluster. + + + + + + + + + + + + Unique ID for this gateway. + + + + + + + Message Channel to which Messages should be sent to Cassandra. + + + + + + + + + + + + + + + + + + Common configuration for cassandra adapters. + + + + + + + + + + Reference to an instance of + 'org.springframework.data.cassandra.core.ReactiveCassandraOperations'. + + + + + + + + + + + + Reference to an instance of + 'org.springframework.data.cassandra.core.cql.WriteOptions' + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Statement to use in prepared statement + + + + + + + Statement expression that represent an executable query + + + + + + + ' async, reactive manner on the downstream + 'FluxMessageChannel' subscription or via 'Mono.subscribe()' in the handler, if one-way. + Otherwise the 'Mono.block()' is called immediately before returning from the handler. + ]]> + + + + + + + + + + + + Expression to be evaluated against the Message to replace a query Parameter + + + + + + Name of the placeholder to be replaced + + + + + + + + + + + + + + + + + + + diff --git a/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/CassandraContainerTest.java b/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/CassandraContainerTest.java new file mode 100644 index 00000000000..422dbe42dd6 --- /dev/null +++ b/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/CassandraContainerTest.java @@ -0,0 +1,44 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.cassandra; + +import org.junit.jupiter.api.BeforeAll; +import org.testcontainers.containers.CassandraContainer; +import org.testcontainers.junit.jupiter.Testcontainers; + +/** + * The base contract for JUnit tests based on the container for Apache Cassandra. + * The Testcontainers 'reuse' option must be disabled,so, Ryuk container is started + * and will clean all the containers up from this test suite after JVM exit. + * Since the MqSQL container instance is shared via static property, it is going to be + * started only once per JVM, therefore the target Docker container is reused automatically. + * + * @author Artem Bilan + * + * @since 6.0 + */ +@Testcontainers(disabledWithoutDocker = true) +public interface CassandraContainerTest { + + CassandraContainer CASSANDRA_CONTAINER = new CassandraContainer<>("cassandra:4.1"); + + @BeforeAll + static void startContainer() { + CASSANDRA_CONTAINER.start(); + } + +} diff --git a/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/IntegrationTestConfig.java b/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/IntegrationTestConfig.java new file mode 100644 index 00000000000..a8e773a7b36 --- /dev/null +++ b/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/IntegrationTestConfig.java @@ -0,0 +1,86 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.cassandra; + + +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +import org.springframework.context.annotation.Configuration; +import org.springframework.data.cassandra.config.AbstractReactiveCassandraConfiguration; +import org.springframework.data.cassandra.config.SchemaAction; +import org.springframework.data.cassandra.core.cql.keyspace.CreateKeyspaceSpecification; +import org.springframework.integration.cassandra.test.domain.Book; + +/** + * Setup any spring configuration for unit tests. + * Must be used in combination with {@link CassandraContainerTest}. + * + * @author David Webb + * @author Matthew T. Adams + * @author Artem Bilan + * + * @since 6.0 + */ +@Configuration +public class IntegrationTestConfig extends AbstractReactiveCassandraConfiguration { + + public String keyspaceName = randomKeyspaceName(); + + public static String randomKeyspaceName() { + return "ks" + UUID.randomUUID().toString().replace("-", ""); + } + + @Override + protected String getContactPoints() { + return CassandraContainerTest.CASSANDRA_CONTAINER.getContactPoint().getHostName(); + } + + @Override + protected int getPort() { + return CassandraContainerTest.CASSANDRA_CONTAINER.getContactPoint().getPort(); + } + + @Override + public SchemaAction getSchemaAction() { + return SchemaAction.RECREATE; + } + + @Override + protected String getKeyspaceName() { + return this.keyspaceName; + } + + @Override + protected List getKeyspaceCreations() { + return Collections.singletonList( + CreateKeyspaceSpecification.createKeyspace(getKeyspaceName()) + .withSimpleReplication()); + } + + @Override + protected String getLocalDataCenter() { + return CassandraContainerTest.CASSANDRA_CONTAINER.getLocalDatacenter(); + } + + @Override + public String[] getEntityBasePackages() { + return new String[]{ Book.class.getPackage().getName() }; + } + +} diff --git a/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/config/CassandraOutboundAdapterIntegrationTests-context.xml b/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/config/CassandraOutboundAdapterIntegrationTests-context.xml new file mode 100644 index 00000000000..cf66eb13769 --- /dev/null +++ b/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/config/CassandraOutboundAdapterIntegrationTests-context.xml @@ -0,0 +1,42 @@ + + + + + + + + + + + + + + + + + + + + + diff --git a/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/config/CassandraOutboundAdapterIntegrationTests.java b/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/config/CassandraOutboundAdapterIntegrationTests.java new file mode 100644 index 00000000000..d9d1029a78a --- /dev/null +++ b/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/config/CassandraOutboundAdapterIntegrationTests.java @@ -0,0 +1,156 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.cassandra.config; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.ImportResource; +import org.springframework.data.cassandra.core.ReactiveCassandraTemplate; +import org.springframework.data.cassandra.core.WriteResult; +import org.springframework.integration.cassandra.CassandraContainerTest; +import org.springframework.integration.cassandra.IntegrationTestConfig; +import org.springframework.integration.cassandra.test.domain.Book; +import org.springframework.integration.cassandra.test.domain.BookSampler; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.channel.FluxMessageChannel; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +import com.datastax.oss.driver.api.querybuilder.QueryBuilder; +import com.datastax.oss.driver.api.querybuilder.select.Select; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +/** + * @author Filippo Balicchia + * @author Artem Bilan + * + * @since 6.0 + */ +@SpringJUnitConfig(CassandraOutboundAdapterIntegrationTests.Config.class) +@DirtiesContext +class CassandraOutboundAdapterIntegrationTests implements CassandraContainerTest { + + @Autowired + private ReactiveCassandraTemplate cassandraTemplate; + + @Autowired + private DirectChannel cassandraMessageHandler1; + + @Autowired + private DirectChannel cassandraMessageHandler2; + + @Autowired + private DirectChannel cassandraMessageHandler3; + + @Autowired + private DirectChannel cassandraMessageHandler4; + + @Autowired + private DirectChannel inputChannel; + + @Autowired + private FluxMessageChannel resultChannel; + + @Test + void testBasicCassandraInsert() { + Book b1 = BookSampler.getBook(); + Message message = MessageBuilder.withPayload(b1).build(); + this.cassandraMessageHandler1.send(message); + Select select = QueryBuilder.selectFrom("book").all(); + List books = this.cassandraTemplate.select(select.build(), Book.class).collectList().block(); + assertThat(books).hasSize(1); + this.cassandraTemplate.delete(b1); + } + + @Test + void testCassandraBatchInsertAndSelectStatement() { + List books = BookSampler.getBookList(5); + this.cassandraMessageHandler2.send(new GenericMessage<>(books)); + Message message = MessageBuilder.withPayload("Cassandra Puppy Guru").setHeader("limit", 2).build(); + this.inputChannel.send(message); + + Mono testMono = + Mono.from(this.resultChannel) + .map(Message::getPayload) + .cast(WriteResult.class) + .map(r -> r.getRows().size()); + + StepVerifier.create(testMono) + .expectNext(2) + .expectComplete() + .verify(); + + this.cassandraMessageHandler1.send(new GenericMessage<>(QueryBuilder.truncate("book").build())); + + } + + @Test + void testCassandraBatchIngest() { + List books = BookSampler.getBookList(5); + List> ingestBooks = new ArrayList<>(); + for (Book b : books) { + + List l = new ArrayList<>(); + l.add(b.isbn()); + l.add(b.title()); + l.add(b.author()); + l.add(b.pages()); + l.add(b.saleDate()); + l.add(b.isInStock()); + ingestBooks.add(l); + } + + Message>> message = MessageBuilder.withPayload(ingestBooks).build(); + this.cassandraMessageHandler3.send(message); + Select select = QueryBuilder.selectFrom("book").all(); + books = this.cassandraTemplate.select(select.build(), Book.class).collectList().block(); + assertThat(books).hasSize(5); + this.cassandraTemplate.batchOps().delete(books); + } + + @Test + void testExpressionTruncate() { + Message message = MessageBuilder.withPayload(BookSampler.getBook()).build(); + this.cassandraMessageHandler1.send(message); + Select select = QueryBuilder.selectFrom("book").all(); + List books = this.cassandraTemplate.select(select.build(), Book.class).collectList().block(); + assertThat(books).hasSize(1); + this.cassandraMessageHandler4.send(MessageBuilder.withPayload("Empty").build()); + books = this.cassandraTemplate.select(select.build(), Book.class).collectList().block(); + assertThat(books).hasSize(0); + } + + @Configuration + @EnableIntegration + @ImportResource("org/springframework/integration/cassandra/config/CassandraOutboundAdapterIntegrationTests-context.xml") + public static class Config extends IntegrationTestConfig { + + } + +} diff --git a/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/config/CassandraOutboundAdapterParserTests-context.xml b/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/config/CassandraOutboundAdapterParserTests-context.xml new file mode 100644 index 00000000000..804fe766eb3 --- /dev/null +++ b/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/config/CassandraOutboundAdapterParserTests-context.xml @@ -0,0 +1,59 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/config/CassandraOutboundAdapterParserTests.java b/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/config/CassandraOutboundAdapterParserTests.java new file mode 100644 index 00000000000..e9f686c432f --- /dev/null +++ b/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/config/CassandraOutboundAdapterParserTests.java @@ -0,0 +1,91 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.cassandra.config; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.integration.cassandra.outbound.CassandraMessageHandler; +import org.springframework.integration.test.util.TestUtils; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +/** + * @author Filippo Balicchia + * @author Artem Bilan + * + * @since 6.0 + */ +@SpringJUnitConfig +class CassandraOutboundAdapterParserTests { + + @Autowired + private ApplicationContext context; + + @Test + void minimalConfig() { + CassandraMessageHandler handler = + TestUtils.getPropertyValue(this.context.getBean("outbound1.adapter"), "handler", + CassandraMessageHandler.class); + + assertThat(TestUtils.getPropertyValue(handler, "componentName")).isEqualTo("outbound1.adapter"); + assertThat(TestUtils.getPropertyValue(handler, "mode")).isEqualTo(CassandraMessageHandler.Type.INSERT); + assertThat(TestUtils.getPropertyValue(handler, "cassandraOperations")) + .isSameAs(this.context.getBean("cassandraTemplate")); + assertThat(TestUtils.getPropertyValue(handler, "writeOptions")).isSameAs(this.context.getBean("writeOptions")); + assertThat(TestUtils.getPropertyValue(handler, "async", Boolean.class)).isFalse(); + } + + @Test + void ingestConfig() { + CassandraMessageHandler handler = + TestUtils.getPropertyValue(this.context.getBean("outbound2"), "handler", + CassandraMessageHandler.class); + + assertThat(TestUtils.getPropertyValue(handler, "ingestQuery")) + .isEqualTo("insert into book (isbn, title, author, pages, saleDate, isInStock) " + + "values (?, ?, ?, ?, ?, ?)"); + assertThat(TestUtils.getPropertyValue(handler, "producesReply", Boolean.class)).isFalse(); + } + + @Test + void fullConfig() { + CassandraMessageHandler handler = + TestUtils.getPropertyValue(this.context.getBean("outgateway"), "handler", + CassandraMessageHandler.class); + + assertThat(TestUtils.getPropertyValue(handler, "producesReply", Boolean.class)).isTrue(); + assertThat(TestUtils.getPropertyValue(handler, "mode")).isEqualTo(CassandraMessageHandler.Type.STATEMENT); + assertThat(TestUtils.getPropertyValue(handler, "writeOptions")).isSameAs(this.context.getBean("writeOptions")); + } + + @Test + void statementConfig() { + CassandraMessageHandler handler = + TestUtils.getPropertyValue(this.context.getBean("outbound4.adapter"), "handler", + CassandraMessageHandler.class); + + assertThat(TestUtils.getPropertyValue(handler, "componentName")).isEqualTo("outbound4.adapter"); + assertThat(TestUtils.getPropertyValue(handler, "mode")).isEqualTo(CassandraMessageHandler.Type.STATEMENT); + assertThat(TestUtils.getPropertyValue(handler, "cassandraOperations")) + .isSameAs(this.context.getBean("cassandraTemplate")); + assertThat(TestUtils.getPropertyValue(handler, "writeOptions")).isSameAs(this.context.getBean("writeOptions")); + } + +} diff --git a/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/config/CassandraParserUtilsTests.java b/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/config/CassandraParserUtilsTests.java new file mode 100644 index 00000000000..118e87e0523 --- /dev/null +++ b/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/config/CassandraParserUtilsTests.java @@ -0,0 +1,103 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.cassandra.config; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.config.BeanDefinition; +import org.springframework.beans.factory.support.RootBeanDefinition; +import org.springframework.integration.cassandra.config.xml.CassandraParserUtils; + +/** + * @author Filippo Balicchia + * @author Artem Bilan + * + * @since 6.0 + */ +class CassandraParserUtilsTests { + + @Test + void mutuallyExclusiveCase1() { + String query = ""; + BeanDefinition statementExpressionDef = null; + String ingestQuery = ""; + assertThat(CassandraParserUtils.areMutuallyExclusive(query, statementExpressionDef, ingestQuery)).isTrue(); + } + + @Test + void mutuallyExclusiveCase2() { + String query = ""; + BeanDefinition statementExpressionDef = null; + String ingestQuery = + "insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, ?)"; + assertThat(CassandraParserUtils.areMutuallyExclusive(query, statementExpressionDef, ingestQuery)).isTrue(); + } + + @Test + void mutuallyExclusiveCase3() { + String query = ""; + BeanDefinition statementExpressionDef = new RootBeanDefinition(); + String ingestQuery = ""; + assertThat(CassandraParserUtils.areMutuallyExclusive(query, statementExpressionDef, ingestQuery)).isTrue(); + } + + @Test + void mutuallyExclusiveCase4() { + String query = ""; + BeanDefinition statementExpressionDef = new RootBeanDefinition(); + String ingestQuery = + "insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, ?)"; + assertThat(CassandraParserUtils.areMutuallyExclusive(query, statementExpressionDef, ingestQuery)).isFalse(); + } + + @Test + void mutuallyExclusiveCase5() { + String query = "SELECT * FROM book limit :size"; + BeanDefinition statementExpressionDef = new RootBeanDefinition(); + String ingestQuery = ""; + assertThat(CassandraParserUtils.areMutuallyExclusive(query, statementExpressionDef, ingestQuery)).isFalse(); + } + + @Test + void mutuallyExclusiveCase6() { + String query = "SELECT * FROM book limit :size"; + BeanDefinition statementExpressionDef = new RootBeanDefinition(); + String ingestQuery = + "insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, ?)"; + assertThat(CassandraParserUtils.areMutuallyExclusive(query, statementExpressionDef, ingestQuery)).isFalse(); + } + + @Test + void mutuallyExclusiveCase7() { + String query = "SELECT * FROM book limit :size"; + BeanDefinition statementExpressionDef = new RootBeanDefinition(); + String ingestQuery = ""; + assertThat(CassandraParserUtils.areMutuallyExclusive(query, statementExpressionDef, ingestQuery)).isFalse(); + } + + @Test + void mutuallyExclusiveCase8() { + String query = "SELECT * FROM book limit :size"; + BeanDefinition statementExpressionDef = new RootBeanDefinition(); + String ingestQuery = + "insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, ?)"; + assertThat(CassandraParserUtils.areMutuallyExclusive(query, statementExpressionDef, ingestQuery)).isFalse(); + } + +} diff --git a/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/dsl/CassandraDslTests.java b/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/dsl/CassandraDslTests.java new file mode 100644 index 00000000000..0eaa855fba1 --- /dev/null +++ b/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/dsl/CassandraDslTests.java @@ -0,0 +1,128 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.cassandra.dsl; + +import java.time.Duration; + +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.cassandra.core.InsertOptions; +import org.springframework.data.cassandra.core.ReactiveCassandraOperations; +import org.springframework.data.cassandra.core.WriteResult; +import org.springframework.integration.cassandra.CassandraContainerTest; +import org.springframework.integration.cassandra.IntegrationTestConfig; +import org.springframework.integration.cassandra.test.domain.BookSampler; +import org.springframework.integration.channel.FluxMessageChannel; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.dsl.IntegrationFlow; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +import com.datastax.oss.driver.api.core.ConsistencyLevel; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +/** + * @author Artem Bilan + * + * @since 6.0 + */ +@SpringJUnitConfig +@DirtiesContext +public class CassandraDslTests implements CassandraContainerTest { + + @Autowired + @Qualifier("cassandraTruncateFlow.input") + MessageChannel cassandraTruncateFlowInput; + + @Autowired + @Qualifier("cassandraInsertFlow.input") + MessageChannel cassandraInsertFlowInput; + + @Autowired + @Qualifier("cassandraSelectFlow.input") + MessageChannel cassandraSelectFlowInput; + + @Autowired + FluxMessageChannel resultChannel; + + @Test + void testCassandraDslConfiguration() { + this.cassandraInsertFlowInput.send(new GenericMessage<>(BookSampler.getBookList(5))); + + Mono testMono = + Mono.from(this.resultChannel) + .map(Message::getPayload) + .cast(WriteResult.class) + .map(r -> r.getRows().size()); + + StepVerifier stepVerifier = StepVerifier.create(testMono) + .expectNext(1) + .expectComplete() + .verifyLater(); + + this.cassandraSelectFlowInput.send(MessageBuilder.withPayload("Cassandra Guru").setHeader("limit", 2).build()); + + stepVerifier.verify(Duration.ofSeconds(10)); + + this.cassandraTruncateFlowInput.send(new GenericMessage<>("")); + } + + @Configuration + @EnableIntegration + public static class Config extends IntegrationTestConfig { + + @Bean + IntegrationFlow cassandraTruncateFlow(ReactiveCassandraOperations cassandraOperations) { + return flow -> flow + .handle(Cassandra.outboundChannelAdapter(cassandraOperations) + .statementExpression("T(QueryBuilder).truncate('book').build()"), + e -> e.async(false)); + } + + @Bean + IntegrationFlow cassandraInsertFlow(ReactiveCassandraOperations cassandraOperations) { + return flow -> flow + .handle(Cassandra.outboundChannelAdapter(cassandraOperations) + .writeOptions(InsertOptions.builder() + .ttl(60) + .consistencyLevel(ConsistencyLevel.ONE) + .build()), + e -> e.async(false)); + } + + @Bean + IntegrationFlow cassandraSelectFlow(ReactiveCassandraOperations cassandraOperations) { + return flow -> flow + .handle(Cassandra.outboundGateway(cassandraOperations) + .query("SELECT * FROM book WHERE author = :author limit :size") + .parameter("author", "payload") + .parameter("size", m -> m.getHeaders().get("limit"))) + .channel(c -> c.flux("resultChannel")); + } + + } + +} diff --git a/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/outbound/CassandraMessageHandlerTests.java b/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/outbound/CassandraMessageHandlerTests.java new file mode 100644 index 00000000000..459d978ed13 --- /dev/null +++ b/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/outbound/CassandraMessageHandlerTests.java @@ -0,0 +1,217 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.cassandra.outbound; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.cassandra.core.CassandraOperations; +import org.springframework.data.cassandra.core.InsertOptions; +import org.springframework.data.cassandra.core.ReactiveCassandraOperations; +import org.springframework.data.cassandra.core.WriteResult; +import org.springframework.data.cassandra.core.cql.WriteOptions; +import org.springframework.expression.Expression; +import org.springframework.expression.spel.standard.SpelExpressionParser; +import org.springframework.integration.cassandra.CassandraContainerTest; +import org.springframework.integration.cassandra.IntegrationTestConfig; +import org.springframework.integration.cassandra.test.domain.Book; +import org.springframework.integration.cassandra.test.domain.BookSampler; +import org.springframework.integration.channel.FluxMessageChannel; +import org.springframework.integration.channel.NullChannel; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.PollableChannel; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +import com.datastax.oss.driver.api.core.ConsistencyLevel; +import com.datastax.oss.driver.api.querybuilder.QueryBuilder; +import com.datastax.oss.driver.api.querybuilder.select.Select; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +/** + * @author Soby Chacko + * @author Artem Bilan + * + * @since 6.0 + */ +@SpringJUnitConfig +@DirtiesContext +public class CassandraMessageHandlerTests implements CassandraContainerTest { + + private static final SpelExpressionParser PARSER = new SpelExpressionParser(); + + @Autowired + public MessageHandler cassandraMessageHandler1; + + @Autowired + public MessageHandler cassandraMessageHandler2; + + @Autowired + public MessageHandler cassandraMessageHandler3; + + @Autowired + public MessageHandler cassandraMessageHandler4; + + @Autowired + public CassandraOperations template; + + @Autowired + public FluxMessageChannel resultChannel; + + @Test + void testBasicCassandraInsert() { + Book b1 = BookSampler.getBook(); + + Message message = MessageBuilder.withPayload(b1).build(); + this.cassandraMessageHandler1.handleMessage(message); + + Select select = QueryBuilder.selectFrom("book").all(); + List books = this.template.select(select.build(), Book.class); + assertThat(books).hasSize(1); + + this.template.delete(b1); + } + + @Test + void testCassandraBatchInsertAndSelectStatement() { + List books = BookSampler.getBookList(5); + + this.cassandraMessageHandler2.handleMessage(new GenericMessage<>(books)); + + Message message = MessageBuilder.withPayload("Cassandra Guru").setHeader("limit", 2).build(); + this.cassandraMessageHandler4.handleMessage(message); + + Mono testMono = + Mono.from(this.resultChannel) + .map(Message::getPayload) + .cast(WriteResult.class) + .map(r -> r.getRows().size()); + + StepVerifier.create(testMono) + .expectNext(1) + .expectComplete() + .verify(); + + this.cassandraMessageHandler1.handleMessage(new GenericMessage<>(QueryBuilder.truncate("book").build())); + } + + @Test + void testCassandraBatchIngest() { + List books = BookSampler.getBookList(5); + List> ingestBooks = + books.stream() + .map(book -> + List.of( + book.isbn(), + book.title(), + book.author(), + book.pages(), + book.saleDate(), + book.isInStock())) + .toList(); + + this.cassandraMessageHandler3.handleMessage(MessageBuilder.withPayload(ingestBooks).build()); + + Select select = QueryBuilder.selectFrom("book").all(); + books = this.template.select(select.build(), Book.class); + assertThat(books).hasSize(5); + + this.template.batchOps().delete(books); + } + + @Configuration + @EnableIntegration + public static class Config extends IntegrationTestConfig { + + @Autowired + public ReactiveCassandraOperations template; + + @Bean + public MessageHandler cassandraMessageHandler1() { + CassandraMessageHandler cassandraMessageHandler = new CassandraMessageHandler(this.template); + cassandraMessageHandler.setAsync(false); + return cassandraMessageHandler; + } + + @Bean + public PollableChannel messageChannel() { + return new NullChannel(); + } + + @Bean + public MessageHandler cassandraMessageHandler2() { + CassandraMessageHandler cassandraMessageHandler = new CassandraMessageHandler(this.template); + + WriteOptions options = + InsertOptions.builder() + .ttl(60) + .consistencyLevel(ConsistencyLevel.ONE) + .build(); + + cassandraMessageHandler.setWriteOptions(options); + cassandraMessageHandler.setOutputChannel(messageChannel()); + cassandraMessageHandler.setAsync(false); + return cassandraMessageHandler; + } + + @Bean + public MessageHandler cassandraMessageHandler3() { + CassandraMessageHandler cassandraMessageHandler = new CassandraMessageHandler(this.template); + String cqlIngest = + "insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, ?)"; + cassandraMessageHandler.setIngestQuery(cqlIngest); + cassandraMessageHandler.setAsync(false); + return cassandraMessageHandler; + } + + @Bean + public FluxMessageChannel resultChannel() { + return new FluxMessageChannel(); + } + + @Bean + public MessageHandler cassandraMessageHandler4() { + CassandraMessageHandler cassandraMessageHandler = new CassandraMessageHandler(this.template); + cassandraMessageHandler.setQuery("SELECT * FROM book WHERE author = :author limit :size"); + + Map params = new HashMap<>(); + params.put("author", PARSER.parseExpression("payload")); + params.put("size", PARSER.parseExpression("headers.limit")); + + cassandraMessageHandler.setParameterExpressions(params); + + cassandraMessageHandler.setOutputChannel(resultChannel()); + cassandraMessageHandler.setProducesReply(true); + return cassandraMessageHandler; + } + + } + +} diff --git a/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/test/domain/Book.java b/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/test/domain/Book.java new file mode 100644 index 00000000000..e1194f6349d --- /dev/null +++ b/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/test/domain/Book.java @@ -0,0 +1,43 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.cassandra.test.domain; + +import java.time.LocalDate; + +import org.springframework.data.cassandra.core.mapping.Indexed; +import org.springframework.data.cassandra.core.mapping.PrimaryKey; +import org.springframework.data.cassandra.core.mapping.Table; + + +/** + * Test POJO + * + * @author David Webb + * @author Artem Bilan + * + * @since 6.0 + */ +@Table("book") +public record Book( + @PrimaryKey String isbn, + String title, + @Indexed String author, + Integer pages, + LocalDate saleDate, + Boolean isInStock) { + +} diff --git a/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/test/domain/BookSampler.java b/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/test/domain/BookSampler.java new file mode 100644 index 00000000000..1d2c13028cb --- /dev/null +++ b/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/test/domain/BookSampler.java @@ -0,0 +1,49 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.cassandra.test.domain; + +import java.time.LocalDate; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +/** + * @author Filippo Balicchia + * @author Artem Bilan + * + * @since 6.0 + */ +public final class BookSampler { + + public static List getBookList(int numBooks) { + List books = new ArrayList<>(); + for (int i = 0; i < numBooks - 1; i++) { + books.add(new Book(UUID.randomUUID().toString(), "Spring Data Cassandra Guide", "Cassandra Guru puppy", + i * 10 + 5, LocalDate.now(), true)); + } + books.add(getBook()); + return books; + } + + public static Book getBook() { + return new Book("123456-1", "Spring Integration Cassandra", "Cassandra Guru", 521, LocalDate.now(), true); + } + + private BookSampler() { + } + +} diff --git a/spring-integration-cassandra/src/test/resources/log4j2-test.xml b/spring-integration-cassandra/src/test/resources/log4j2-test.xml new file mode 100644 index 00000000000..c916f70cee0 --- /dev/null +++ b/spring-integration-cassandra/src/test/resources/log4j2-test.xml @@ -0,0 +1,15 @@ + + + + + + + + + + + + + + + diff --git a/src/reference/asciidoc/cassandra.adoc b/src/reference/asciidoc/cassandra.adoc new file mode 100644 index 00000000000..f814daad45b --- /dev/null +++ b/src/reference/asciidoc/cassandra.adoc @@ -0,0 +1,179 @@ +[[cassandra]] +== Apache Cassandra Support + +Spring Integration provides channel adapters (starting with version 6.0) for performing database operations against an Apache Cassandra cluster. +It is fully based on the https://spring.io/projects/spring-data-cassandra[Spring Data for Apache Cassandra] project. + +You need to include this dependency into your project: + +==== +[source, xml, subs="normal", role="primary"] +.Maven +---- + + org.springframework.integration + spring-integration-cassandra + {project-version} + +---- +[source, groovy, subs="normal", role="secondary"] +.Gradle +---- +compile "org.springframework.integration:spring-integration-cassandra:{project-version}" +---- +==== + +[[cassandra-outbound]] +=== Cassandra Outbound Components + +The `CassandraMessageHandler` is an `AbstractReplyProducingMessageHandler` implementation and can work in both one-way (default) and request-reply modes (a `producesReply` option). +It is asynchronous by default (`setAsync(false)` to reset) and performs reactive `INSERT`, `UPDATE`, `DELETE` or `STATEMENT` operations against the provided `ReactiveCassandraOperations`. +The type of operation can be configured via the `CassandraMessageHandler.Type` option. +The `ingestQuery` sets the mode into an `INSERT`; the `query` or `statementExpression`, or `statementProcessor` sets the mode into a `STATEMENT`. + +The following code snippets demonstrate various configuration for this channel adapter or gateway: + +==== +[source, java, role="primary"] +.Java DSL +---- +@Bean +IntegrationFlow cassandraSelectFlow(ReactiveCassandraOperations cassandraOperations) { + return flow -> flow + .handle(Cassandra.outboundGateway(cassandraOperations) + .query("SELECT * FROM book WHERE author = :author limit :size") + .parameter("author", "payload") + .parameter("size", m -> m.getHeaders().get("limit"))) + .channel(c -> c.flux("resultChannel")); +} +---- +[source, kotlin, role="secondary"] +.Kotlin DSL +---- +@Bean +fun outboundReactive(cassandraOperations: ReactiveCassandraOperations) = + integrationFlow { + handle( + Cassandra.outboundChannelAdapter(cassandraOperations) + .statementExpression("T(QueryBuilder).truncate('book').build()") + ) { async(false) } + } +---- +[source, java, role="secondary"] +.Java +---- +@ServiceActivator(inputChannel = "cassandraSelectChannel") +@Bean +public MessageHandler cassandraMessageHandler() { + CassandraMessageHandler cassandraMessageHandler = new CassandraMessageHandler(this.template); + cassandraMessageHandler.setQuery("SELECT * FROM book WHERE author = :author limit :size"); + + Map params = new HashMap<>(); + params.put("author", PARSER.parseExpression("payload")); + params.put("size", PARSER.parseExpression("headers.limit")); + + cassandraMessageHandler.setParameterExpressions(params); + + cassandraMessageHandler.setOutputChannel(resultChannel()); + cassandraMessageHandler.setProducesReply(true); + return cassandraMessageHandler; +} +---- +[source, xml, role="secondary"] +.XML +---- + + + + + + +---- +==== + +If a `CassandraMessageHandler` is used as a gateway in the default async mode, a `Mono` is produced, which is handled according to the provided `MessageChannel` implementation. +For true reactive processing a `FluxMessageChannel` is recommended for the output channel configuration. +In sync mode `Mono.block()` is called to obtain the reply value. + +If `INSERT`, `UPDATE` or `DELETE` operations are performed, an entity (marked `org.springframework.data.cassandra.core.mapping.Table`) is expected in the request message payload. +If the payload is a list of entities, then the respective batch operation is performed. + +The `ingestQuery` mode expects the payload to be present as a matrix of values to insert - `List>`. +For example, if the entity is like this: + +==== +[source,java] +---- +@Table("book") +public record Book(@PrimaryKey String isbn, + String title, + @Indexed String author, + int pages, + LocalDate saleDate, + boolean isInStock) { + +} +---- +==== + +And channel adapter has this configuration: + +==== +[source,java] +---- +@Bean +public MessageHandler cassandraMessageHandler3() { + CassandraMessageHandler cassandraMessageHandler = new CassandraMessageHandler(this.template); + String cqlIngest = "insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, ?)"; + cassandraMessageHandler.setIngestQuery(cqlIngest); + cassandraMessageHandler.setAsync(false); + return cassandraMessageHandler; +} +---- +==== + +The request message payload must be converted like this: + +==== +[source,java] +---- +List> ingestBooks = + payload.stream() + .map(book -> + List.of( + book.isbn(), + book.title(), + book.author(), + book.pages(), + book.saleDate(), + book.isInStock())) + .toList(); +---- +==== + +For more sophisticated use-cases, the payload can be as an instance of `com.datastax.oss.driver.api.core.cql.Statement`. +The `com.datastax.oss.driver.api.querybuilder.QueryBuilder` API is recommended to build various statements to execute against Apache Cassandra. +For example, to remove all the data from the `Book` table, a message with a payload like this can be sent to the `CassandraMessageHandler`: `QueryBuilder.truncate("book").build()`. +Alternatively, for logic based on a request message, a `statementExpression` or `statementProcessor` can be provided for the `CassandraMessageHandler` to build a `Statement` based on that message. +For convenience, a `com.datastax.oss.driver.api.querybuilder` is registered as an `import` into a SpEL evaluation context, so a target expression can be as simple as this: + +==== +[source,xml] +---- +statement-expression="T(QueryBuilder).selectFrom("book").all()" +---- +==== + +The `setParameterExpressions(Map parameterExpressions)` represents bindable named query parameters and is used only with a `setQuery(String query)` option. +See Java and XML samples mentioned above. diff --git a/src/reference/asciidoc/endpoint-summary.adoc b/src/reference/asciidoc/endpoint-summary.adoc index adea5554dd1..ddd078dd034 100644 --- a/src/reference/asciidoc/endpoint-summary.adoc +++ b/src/reference/asciidoc/endpoint-summary.adoc @@ -54,6 +54,12 @@ The following table summarizes the various endpoints with quick links to the app | <<./amqp.adoc#amqp-inbound-gateway,Inbound Gateway>> | <<./amqp.adoc#amqp-outbound-gateway,Outbound Gateway>> +| *Apache Cassandra* +| N +| <<./cassandra.adoc#cassandra-outbound,Outbound Channel Adapter>> +| N +| <<./cassandra.adoc#cassandra-outbound,Outbound Gateway>> + | *Events* | <<./event.adoc#appevent-inbound,Receiving Spring Application Events>> | <<./event.adoc#appevent-outbound,Sending Spring Application Events>> diff --git a/src/reference/asciidoc/index-single.adoc b/src/reference/asciidoc/index-single.adoc index 3b8b0917668..0ae67197e7c 100644 --- a/src/reference/asciidoc/index-single.adoc +++ b/src/reference/asciidoc/index-single.adoc @@ -41,6 +41,8 @@ include::./endpoint-summary.adoc[] include::./amqp.adoc[] +include::./cassandra.adoc[] + include::./event.adoc[] include::./feed.adoc[] diff --git a/src/reference/asciidoc/index.adoc b/src/reference/asciidoc/index.adoc index ddcb9cdaf8b..8aaf4ee6ba8 100644 --- a/src/reference/asciidoc/index.adoc +++ b/src/reference/asciidoc/index.adoc @@ -34,6 +34,7 @@ Welcome to the Spring Integration reference documentation! [horizontal] <<./endpoint-summary.adoc#spring-integration-endpoints,Integration Endpoint Summary>> :: Protocol-specific channel adapters and gateways summary <<./amqp.adoc#amqp,AMQP Support>> :: AMQP channels, adapters and gateways +<<./cassandra.adoc#cassandra,Apache Cassandra Support>> :: Apache Cassandra channel adapters <<./event.adoc#applicationevent,Spring `ApplicationEvent` Support>> :: Handling and consuming Spring application events with channel adapters <<./feed.adoc#feed,Feed Adapter>> :: RSS and Atom channel adapters <<./file.adoc#files,File Support>> :: Channel adapters and gateways for file system support diff --git a/src/reference/asciidoc/reactive-streams.adoc b/src/reference/asciidoc/reactive-streams.adoc index 65feb7c9746..ca86fa3643f 100644 --- a/src/reference/asciidoc/reactive-streams.adoc +++ b/src/reference/asciidoc/reactive-streams.adoc @@ -326,8 +326,7 @@ public class MainFlow { ---- ==== -Currently, Spring Integration provides channel adapter (or gateway) implementations for <<./webflux.adoc#webflux,WebFlux>>, <<./rsocket.adoc#rsocket,RSocket>>, <<./mongodb.adoc#mongodb,MongoDb>>, <<./r2dbc.adoc#r2dbc,R2DBC>>, <<./zeromq.adoc#zeromq,ZeroMQ>>, <<./graphql.adoc#graphql,GraphQL>>. +Currently, Spring Integration provides channel adapter (or gateway) implementations for <<./webflux.adoc#webflux,WebFlux>>, <<./rsocket.adoc#rsocket,RSocket>>, <<./mongodb.adoc#mongodb,MongoDb>>, <<./r2dbc.adoc#r2dbc,R2DBC>>, <<./zeromq.adoc#zeromq,ZeroMQ>>, <<./graphql.adoc#graphql,GraphQL>>, <<./cassandra.adoc#cassandra,Apache Cassandra>>. The <<./redis.adoc#redis-stream-outbound,Redis Stream Channel Adapters>> are also reactive and uses `ReactiveStreamOperations` from Spring Data. -Also, an https://github.com/spring-projects/spring-integration-extensions/tree/main/spring-integration-cassandra[Apache Cassandra Extension] provides a `MessageHandler` implementation for the Cassandra reactive driver. More reactive channel adapters are coming, for example for Apache Kafka in <<./kafka.adoc#kafka,Kafka>> based on the `ReactiveKafkaProducerTemplate` and `ReactiveKafkaConsumerTemplate` from https://spring.io/projects/spring-kafka[Spring for Apache Kafka] etc. For many other non-reactive channel adapters thread pools are recommended to avoid blocking during reactive stream processing. diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index 3a4f5c53a2b..57db0f2fb61 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -70,6 +70,12 @@ The Scripting module now provides a `PolyglotScriptExecutor` implementation base JavaScript support is now based on this executor since its JSR223 implementation has been removed from Java by itself. See <<./scripting.adoc#scripting,Scripting Support>> for more information. +[[x6.0-cassandra]] +==== Apache Cassandra Support + +The Apache Cassandra Spring Integration Extensions project has been migrated as the `spring-integration-cassandra` module. +See <<./cassandra.adoc#cassandra,Apache Cassandra Support>> for more information. + [[x6.0-general]] === General Changes