diff --git a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java index a2efe5d562a..160089ff2fa 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,11 +16,14 @@ package org.springframework.integration.endpoint; +import org.reactivestreams.Publisher; + import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.SmartInitializingSingleton; import org.springframework.core.AttributeAccessor; import org.springframework.integration.IntegrationPattern; import org.springframework.integration.IntegrationPatternType; +import org.springframework.integration.channel.ReactiveStreamsSubscribableChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.core.MessagingTemplate; import org.springframework.integration.history.MessageHistory; @@ -36,6 +39,8 @@ import org.springframework.util.Assert; import org.springframework.util.StringUtils; +import reactor.core.publisher.Flux; + /** * A support class for producer endpoints that provides a setter for the * output channel and a convenience method for sending Messages. @@ -176,7 +181,8 @@ protected void onInit() { } /** - * Takes no action by default. Subclasses may override this if they + * Take no action by default. + * Subclasses may override this if they * need lifecycle-managed behavior. Protected by 'lifecycleLock'. */ @Override @@ -184,7 +190,8 @@ protected void doStart() { } /** - * Takes no action by default. Subclasses may override this if they + * Take no action by default. + * Subclasses may override this if they * need lifecycle-managed behavior. */ @Override @@ -196,13 +203,10 @@ protected void sendMessage(Message messageArg) { if (message == null) { throw new MessagingException("cannot send a null message"); } - if (this.shouldTrack) { - message = MessageHistory.write(message, this, getMessageBuilderFactory()); - } + message = trackMessageIfAny(message); try { - MessageChannel messageChannel = getOutputChannel(); - Assert.state(messageChannel != null, "The 'outputChannel' or `outputChannelName` must be configured"); - this.messagingTemplate.send(messageChannel, message); + MessageChannel outputChannel = getRequiredOutputChannel(); + this.messagingTemplate.send(outputChannel, message); } catch (RuntimeException ex) { if (!sendErrorMessageIfNecessary(message, ex)) { @@ -211,6 +215,33 @@ protected void sendMessage(Message messageArg) { } } + protected void subscribeToPublisher(Publisher> publisher) { + MessageChannel outputChannel = getRequiredOutputChannel(); + + Flux> messageFlux = + Flux.from(publisher) + .map(this::trackMessageIfAny) + .doOnComplete(this::stop) + .doOnCancel(this::stop) + .takeWhile((message) -> isRunning()); + + if (outputChannel instanceof ReactiveStreamsSubscribableChannel) { + ((ReactiveStreamsSubscribableChannel) outputChannel).subscribeTo(messageFlux); + } + else { + messageFlux + .doOnNext((message) -> { + try { + sendMessage(message); + } + catch (Exception ex) { + logger.error("Error sending a message: " + message, ex); + } + }) + .subscribe(); + } + } + /** * Send an error message based on the exception and message. * @param message the message. @@ -218,7 +249,7 @@ protected void sendMessage(Message messageArg) { * @return true if the error channel is available and message sent. * @since 4.3.10 */ - protected final boolean sendErrorMessageIfNecessary(Message message, RuntimeException exception) { + protected final boolean sendErrorMessageIfNecessary(Message message, Exception exception) { MessageChannel channel = getErrorChannel(); if (channel != null) { this.messagingTemplate.send(channel, buildErrorMessage(message, exception)); @@ -235,9 +266,8 @@ protected final boolean sendErrorMessageIfNecessary(Message message, RuntimeE * @return the error message. * @since 4.3.10 */ - protected final ErrorMessage buildErrorMessage(Message message, RuntimeException exception) { - return this.errorMessageStrategy.buildErrorMessage(exception, - getErrorMessageAttributes(message)); + protected final ErrorMessage buildErrorMessage(Message message, Exception exception) { + return this.errorMessageStrategy.buildErrorMessage(exception, getErrorMessageAttributes(message)); } /** @@ -252,4 +282,19 @@ protected AttributeAccessor getErrorMessageAttributes(Message message) { return ErrorMessageUtils.getAttributeAccessor(message, null); } + private MessageChannel getRequiredOutputChannel() { + MessageChannel messageChannel = getOutputChannel(); + Assert.state(messageChannel != null, "The 'outputChannel' or `outputChannelName` must be configured"); + return messageChannel; + } + + private Message trackMessageIfAny(Message message) { + if (this.shouldTrack) { + return MessageHistory.write(message, this, getMessageBuilderFactory()); + } + else { + return message; + } + } + } diff --git a/spring-integration-core/src/test/java/org/springframework/integration/endpoint/ReactiveMessageProducerTests.java b/spring-integration-core/src/test/java/org/springframework/integration/endpoint/ReactiveMessageProducerTests.java new file mode 100644 index 00000000000..fff06a4c5dd --- /dev/null +++ b/spring-integration-core/src/test/java/org/springframework/integration/endpoint/ReactiveMessageProducerTests.java @@ -0,0 +1,92 @@ +/* + * Copyright 2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.endpoint; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.channel.FluxMessageChannel; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; + +/** + * @author Artem Bilan + * + * @since 5.3 + */ +@SpringJUnitConfig +@DirtiesContext +public class ReactiveMessageProducerTests { + + @Autowired + public FluxMessageChannel fluxMessageChannel; + + @Autowired + public MessageProducerSupport producer; + + @Test + public void test() { + assertThat(this.producer.isRunning()).isTrue(); + + StepVerifier.create( + Flux.from(this.fluxMessageChannel) + .map(Message::getPayload) + .cast(String.class)) + .expectNext("test1", "test2") + .thenCancel() + .verify(); + + assertThat(this.producer.isRunning()).isFalse(); + } + + @Configuration + @EnableIntegration + public static class Config { + + @Bean + public FluxMessageChannel fluxMessageChannel() { + return new FluxMessageChannel(); + } + + @Bean + public MessageProducerSupport producer() { + MessageProducerSupport producer = + new MessageProducerSupport() { + + @Override + protected void doStart() { + subscribeToPublisher(Flux.just("test1", "test2").map(GenericMessage::new)); + } + + }; + producer.setOutputChannel(fluxMessageChannel()); + return producer; + } + + } + +} diff --git a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/dsl/MongoDb.java b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/dsl/MongoDb.java index c455ac6b47e..c46de2c582f 100644 --- a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/dsl/MongoDb.java +++ b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/dsl/MongoDb.java @@ -24,6 +24,7 @@ import org.springframework.data.mongodb.core.query.Query; import org.springframework.expression.common.LiteralExpression; import org.springframework.integration.expression.ValueExpression; +import org.springframework.integration.mongodb.inbound.MongoDbChangeStreamMessageProducer; /** * Factory class for building MongoDb components @@ -140,6 +141,18 @@ public static ReactiveMongoDbMessageSourceSpec reactiveInboundChannelAdapter(Rea return new ReactiveMongoDbMessageSourceSpec(mongoTemplate, new ValueExpression<>(query)); } + /** + * Create a {@link MongoDbChangeStreamMessageProducerSpec} builder instance + * based on the provided {@link ReactiveMongoOperations}. + * @param mongoOperations the {@link ReactiveMongoOperations} to use. + * @return the {@link MongoDbChangeStreamMessageProducerSpec} instance + * @since 5.3 + */ + public static MongoDbChangeStreamMessageProducerSpec changeStreamInboundChannelAdapter( + ReactiveMongoOperations mongoOperations) { + + return new MongoDbChangeStreamMessageProducerSpec(new MongoDbChangeStreamMessageProducer(mongoOperations)); + } private MongoDb() { } diff --git a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/dsl/MongoDbChangeStreamMessageProducerSpec.java b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/dsl/MongoDbChangeStreamMessageProducerSpec.java new file mode 100644 index 00000000000..86e764d0d6d --- /dev/null +++ b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/dsl/MongoDbChangeStreamMessageProducerSpec.java @@ -0,0 +1,81 @@ +/* + * Copyright 2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.mongodb.dsl; + +import org.springframework.data.mongodb.core.ChangeStreamOptions; +import org.springframework.integration.dsl.MessageProducerSpec; +import org.springframework.integration.mongodb.inbound.MongoDbChangeStreamMessageProducer; + +/** + * A {@link MessageProducerSpec} for tne {@link MongoDbChangeStreamMessageProducer}. + * + * @author Artem Bilan + * + * @since 5.3 + */ +public class MongoDbChangeStreamMessageProducerSpec + extends MessageProducerSpec { + + /** + * Construct a builder based on an initial {@link MongoDbChangeStreamMessageProducerSpec}. + * @param producer the {@link MongoDbChangeStreamMessageProducerSpec} to use. + */ + public MongoDbChangeStreamMessageProducerSpec(MongoDbChangeStreamMessageProducer producer) { + super(producer); + } + + /** + * Configure a domain type to convert change event body into. + * @param domainType the type to use. + * @return the spec. + */ + public MongoDbChangeStreamMessageProducerSpec domainType(Class domainType) { + this.target.setDomainType(domainType); + return this; + } + + /** + * Configure a collection to subscribe for change events. + * @param collection the collection to use. + * @return the spec. + */ + public MongoDbChangeStreamMessageProducerSpec collection(String collection) { + this.target.setCollection(collection); + return this; + } + + /** + * Configure a {@link ChangeStreamOptions}. + * @param options the {@link ChangeStreamOptions} to use. + * @return the spec. + */ + public MongoDbChangeStreamMessageProducerSpec options(ChangeStreamOptions options) { + this.target.setOptions(options); + return this; + } + + /** + * Configure a flag to extract body from a change event or use event as a payload. + * @param extractBody to extract body or not. + * @return the spec. + */ + public MongoDbChangeStreamMessageProducerSpec extractBody(boolean extractBody) { + this.target.setExtractBody(extractBody); + return this; + } + +} diff --git a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/inbound/MongoDbChangeStreamMessageProducer.java b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/inbound/MongoDbChangeStreamMessageProducer.java new file mode 100644 index 00000000000..7b1eedd00bb --- /dev/null +++ b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/inbound/MongoDbChangeStreamMessageProducer.java @@ -0,0 +1,132 @@ +/* + * Copyright 2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.mongodb.inbound; + +import org.bson.Document; +import org.reactivestreams.Publisher; + +import org.springframework.data.mongodb.core.ChangeStreamEvent; +import org.springframework.data.mongodb.core.ChangeStreamOptions; +import org.springframework.data.mongodb.core.ReactiveMongoOperations; +import org.springframework.integration.endpoint.MessageProducerSupport; +import org.springframework.integration.mongodb.support.MongoHeaders; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.lang.Nullable; +import org.springframework.messaging.Message; +import org.springframework.util.Assert; + +import reactor.core.publisher.Flux; + +/** + * A {@link MessageProducerSupport} for MongoDB Change Stream implementation. + * The functionality is based on the + * {@link ReactiveMongoOperations#changeStream(String, ChangeStreamOptions, Class)} + * and {@link MessageProducerSupport#subscribeToPublisher(Publisher)} consumption. + * + * @author Artem Bilan + * + * @since 5.3 + */ +public class MongoDbChangeStreamMessageProducer extends MessageProducerSupport { + + private final ReactiveMongoOperations mongoOperations; + + private Class domainType = Document.class; + + @Nullable + private String collection; + + private ChangeStreamOptions options = ChangeStreamOptions.empty(); + + private boolean extractBody = true; + + /** + * Create an instance based on the provided {@link ReactiveMongoOperations}. + * @param mongoOperations the {@link ReactiveMongoOperations} to use. + * @see ReactiveMongoOperations#changeStream(String, ChangeStreamOptions, Class) + */ + public MongoDbChangeStreamMessageProducer(ReactiveMongoOperations mongoOperations) { + Assert.notNull(mongoOperations, "'mongoOperations' must not be null"); + this.mongoOperations = mongoOperations; + } + + /** + * Specify an object type to convert an event body to. + * Defaults to {@link Document} class. + * @param domainType the class for event body conversion. + * @see ReactiveMongoOperations#changeStream(String, ChangeStreamOptions, Class) + */ + public void setDomainType(Class domainType) { + Assert.notNull(domainType, "'domainType' must not be null"); + this.domainType = domainType; + } + + /** + * Specify a collection name to track change events from. + * By default tracks all the collection in the {@link #mongoOperations} configured database. + * @param collection a collection to use. + * @see ReactiveMongoOperations#changeStream(String, ChangeStreamOptions, Class) + */ + public void setCollection(String collection) { + this.collection = collection; + } + + /** + * Specify a {@link ChangeStreamOptions}. + * @param options the {@link ChangeStreamOptions} to use. + * @see ReactiveMongoOperations#changeStream(String, ChangeStreamOptions, Class) + */ + public void setOptions(ChangeStreamOptions options) { + Assert.notNull(options, "'options' must not be null"); + this.options = options; + } + + /** + * Configure this channel adapter to build a {@link Message} to produce + * with a payload based on a {@link ChangeStreamEvent#getBody()} (by default) + * or use a whole {@link ChangeStreamEvent} as a payload. + * @param extractBody to extract {@link ChangeStreamEvent#getBody()} or not. + */ + public void setExtractBody(boolean extractBody) { + this.extractBody = extractBody; + } + + @Override + public String getComponentType() { + return "mongo:change-stream-inbound-channel-adapter"; + } + + @Override + protected void doStart() { + Flux> changeStreamFlux = + this.mongoOperations.changeStream(this.collection, this.options, this.domainType) + .map(event -> + MessageBuilder + .withPayload( + !this.extractBody || event.getBody() == null + ? event + : event.getBody()) + .setHeader(MongoHeaders.COLLECTION_NAME, event.getCollectionName()) + .setHeader(MongoHeaders.CHANGE_STREAM_OPERATION_TYPE, event.getOperationType()) + .setHeader(MongoHeaders.CHANGE_STREAM_TIMESTAMP, event.getTimestamp()) + .setHeader(MongoHeaders.CHANGE_STREAM_RESUME_TOKEN, event.getResumeToken()) + .build()); + + subscribeToPublisher(changeStreamFlux); + } + +} diff --git a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/support/MongoHeaders.java b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/support/MongoHeaders.java index ae9c6bf7074..dd6a6b61fb2 100644 --- a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/support/MongoHeaders.java +++ b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/support/MongoHeaders.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,7 @@ * for dealing with headers required by Mongo components * * @author Gary Russell + * @author Artem Bilan * * @since 2.2 */ @@ -29,8 +30,38 @@ public final class MongoHeaders { private MongoHeaders() { } + /** + * A prefix for MongoDb-specific message headers. + */ public static final String PREFIX = "mongo_"; + /** + * The prefix for change stream event headers. + * @since 5.3 + */ + public static final String PREFIX_CHANGE_STREAM = PREFIX + "changeStream_"; + + /** + * The header for MongoDb collection name. + */ public static final String COLLECTION_NAME = PREFIX + "collectionName"; + /** + * The header for change stream event type. + * @since 5.3 + */ + public static final String CHANGE_STREAM_OPERATION_TYPE = PREFIX_CHANGE_STREAM + "operationType"; + + /** + * The header for change stream event timestamp. + * @since 5.3 + */ + public static final String CHANGE_STREAM_TIMESTAMP = PREFIX_CHANGE_STREAM + "timestamp"; + + /** + * The header for change stream event resume token. + * @since 5.3 + */ + public static final String CHANGE_STREAM_RESUME_TOKEN = PREFIX_CHANGE_STREAM + "resumeToken"; + } diff --git a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/Person.java b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/Person.java new file mode 100644 index 00000000000..fb8d08d67ca --- /dev/null +++ b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/Person.java @@ -0,0 +1,115 @@ +/* + * Copyright 2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.mongodb; + +import org.bson.types.ObjectId; + +/** + * @author Christoph Strobl + * @author Artem Bilan + * + * @since 5.3 + */ +public class Person { + + private ObjectId id; + + private String firstName; + + private int age; + + private Person friend; + + public Person() { + this.id = new ObjectId(); + } + + @Override + public String toString() { + return "Person [id=" + this.id + ", firstName=" + this.firstName + ", age=" + this.age + + ", friend=" + this.friend + "]"; + } + + public Person(ObjectId id, String firstName) { + this.id = id; + this.firstName = firstName; + } + + public Person(String firstName, int age) { + this(); + this.firstName = firstName; + this.age = age; + } + + public Person(String firstName) { + this(); + this.firstName = firstName; + } + + public ObjectId getId() { + return this.id; + } + + public String getFirstName() { + return this.firstName; + } + + public void setFirstName(String firstName) { + this.firstName = firstName; + } + + public int getAge() { + return this.age; + } + + public void setAge(int age) { + this.age = age; + } + + public Person getFriend() { + return this.friend; + } + + public void setFriend(Person friend) { + this.friend = friend; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (obj == null) { + return false; + } + + if (!(getClass().equals(obj.getClass()))) { + return false; + } + + Person that = (Person) obj; + + return this.id != null && this.id.equals(that.id); + } + + @Override + public int hashCode() { + return this.id.hashCode(); + } + +} diff --git a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/inbound/MongoDbChangeStreamMessageProducerTests.java b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/inbound/MongoDbChangeStreamMessageProducerTests.java new file mode 100644 index 00000000000..e232ad74ba0 --- /dev/null +++ b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/inbound/MongoDbChangeStreamMessageProducerTests.java @@ -0,0 +1,173 @@ +/* + * Copyright 2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.mongodb.inbound; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.time.Duration; + +import org.assertj.core.api.InstanceOfAssertFactories; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.mongodb.core.ChangeStreamEvent; +import org.springframework.data.mongodb.core.ReactiveMongoOperations; +import org.springframework.data.mongodb.core.ReactiveMongoTemplate; +import org.springframework.integration.channel.FluxMessageChannel; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.dsl.IntegrationFlow; +import org.springframework.integration.dsl.IntegrationFlows; +import org.springframework.integration.dsl.MessageChannels; +import org.springframework.integration.endpoint.MessageProducerSupport; +import org.springframework.integration.mongodb.Person; +import org.springframework.integration.mongodb.dsl.MongoDb; +import org.springframework.integration.mongodb.support.MongoHeaders; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +import com.mongodb.client.model.changestream.OperationType; +import com.mongodb.reactivestreams.client.MongoClient; +import com.mongodb.reactivestreams.client.MongoClients; +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; + +/** + * @author Artem Bilan + * + * @since 5.3 + */ +@Disabled("Requires MongoDb server of version 4.x started with 'replSet' option.") +@SpringJUnitConfig +@DirtiesContext +public class MongoDbChangeStreamMessageProducerTests { + + private static final String CONNECTION_STRING = + "mongodb://127.0.0.1:27017/?replicaSet=rs0&w=majority&uuidrepresentation=javaLegacy"; + + private static MongoClient mongoClient; + + @Autowired + ReactiveMongoOperations mongoTemplate; + + @Autowired + FluxMessageChannel fluxMessageChannel; + + @Autowired + MessageProducerSupport changeStreamMessageProducer; + + @BeforeAll + static void setup() { + mongoClient = MongoClients.create(CONNECTION_STRING); + } + + @AfterAll + static void tearDown() { + mongoClient.close(); + } + + @BeforeEach + void prepare() { + this.mongoTemplate.remove(Person.class).all() + .then() + .onErrorResume(ex -> this.mongoTemplate.createCollection(Person.class).then()) + .block(Duration.ofSeconds(10)); + } + + @AfterEach + void cleanUp() { + this.mongoTemplate.remove(Person.class).all() + .block(Duration.ofSeconds(10)); + } + + @Test + void testChangeStreamMessageProducer() { + Person person1 = new Person("John", 38); + Person person2 = new Person("Josh", 39); + Person person3 = new Person("Jack", 37); + + this.changeStreamMessageProducer.start(); + + StepVerifier stepVerifier = + Flux.from(this.fluxMessageChannel) + .as(StepVerifier::create) + .assertNext((message) -> { + assertThat(message.getPayload()) + .isInstanceOf(ChangeStreamEvent.class) + .extracting("body") + .asInstanceOf(InstanceOfAssertFactories.type(Person.class)) + .isEqualTo(person1); + assertThat(message.getHeaders()) + .containsEntry(MongoHeaders.COLLECTION_NAME, "person") + .containsEntry(MongoHeaders.CHANGE_STREAM_OPERATION_TYPE, OperationType.INSERT) + .containsKeys(MongoHeaders.CHANGE_STREAM_TIMESTAMP, + MongoHeaders.CHANGE_STREAM_RESUME_TOKEN); + }) + .assertNext((message) -> + assertThat(message.getPayload()) + .extracting("body") + .isEqualTo(person2)) + .assertNext((message) -> + assertThat(message.getPayload()) + .extracting("body") + .isEqualTo(person3)) + .thenCancel() + .verifyLater(); + + Flux.concat(this.mongoTemplate.insert(person1), + this.mongoTemplate.insert(person2), + this.mongoTemplate.insert(person3)) + .as(StepVerifier::create) + .expectNextCount(3) + .verifyComplete(); + + stepVerifier.verify(Duration.ofSeconds(10)); + + this.changeStreamMessageProducer.stop(); + } + + @Configuration + @EnableIntegration + static class Config { + + @Bean + ReactiveMongoOperations mongoTemplate() { + return new ReactiveMongoTemplate(mongoClient, "test"); + } + + @Bean + IntegrationFlow changeStreamFlow() { + return IntegrationFlows.from( + MongoDb.changeStreamInboundChannelAdapter(mongoTemplate()) + .domainType(Person.class) + .collection("person") + .extractBody(false) + .autoStartup(false) + .shouldTrack(true)) + .channel(MessageChannels.flux()) + .get(); + } + + } + +} diff --git a/src/reference/asciidoc/mongodb.adoc b/src/reference/asciidoc/mongodb.adoc index 67a8c3827ce..540fc7cd073 100644 --- a/src/reference/asciidoc/mongodb.adoc +++ b/src/reference/asciidoc/mongodb.adoc @@ -348,6 +348,37 @@ If the result of an expression is null or void, no message is generated. For more information about transaction synchronization, see <<./transactions.adoc#transaction-synchronization,Transaction Synchronization>>. +[[mongodb-change-stream-channel-adapter]] +=== MongoDB Change Stream Inbound Channel Adapter + +Starting with version 5.3, `spring-integration-mongodb` modules introduces a `MongoDbChangeStreamMessageProducer` - a reactive `MessageProducerSupport` implementation for Spring Data `ReactiveMongoOperations.changeStream(String, ChangeStreamOptions, Class)` API. +This component produces a `Flux` of messages with a `body` of `ChangeStreamEvent` as a payload by default and some change stream related headers (see `MongoHeaders`). +It is recommended to combine this `MongoDbChangeStreamMessageProducer` with a `FluxMessageChannel` as an `outputChannel` for on demand subscription and events consumption downstream. + +The Java DSL configuration for this channel adapter may look like this: + +==== +[source,java] +---- +@Bean +IntegrationFlow changeStreamFlow(ReactiveMongoOperations mongoTemplate) { + return IntegrationFlows.from( + MongoDb.changeStreamInboundChannelAdapter(mongoTemplate) + .domainType(Person.class) + .collection("person") + .extractBody(false)) + .channel(MessageChannels.flux()) + .get(); +} +---- +==== + +When the `MongoDbChangeStreamMessageProducer` is stopped, or subscription is cancelled downstream, or MongoDb change stream produces an `OperationType.INVALIDATE`, the `Publisher` is completed. +The channel adapter can be started again and a new `Publisher` of source data is created and it is automatically subscribed in the `MessageProducerSupport.subscribeToPublisher(Publisher>)`. +This channel adapter can be reconfigured for new options in between if there is a requirement to consume change stream events from other places. + +See more information about change stream support in Spring Data MongoDb https://docs.spring.io/spring-data/mongodb/docs/current/reference/html/#change-streams[documentation]. + [[mongodb-outbound-channel-adapter]] === MongoDB Outbound Channel Adapter diff --git a/src/reference/asciidoc/reactive-streams.adoc b/src/reference/asciidoc/reactive-streams.adoc index 4889aa04677..a9bbe4ff330 100644 --- a/src/reference/asciidoc/reactive-streams.adoc +++ b/src/reference/asciidoc/reactive-streams.adoc @@ -77,6 +77,18 @@ This way, any `MessageSource` implementation can be turned into a reactive hot s See <<./polling-consumer.adoc#polling-consumer,Polling Consumer>> for more information. +=== Event-Driven Channel Adapter + +The `MessageProducerSupport` is a base class for event-driven channel adapters and typically its `sendMessage(Message)` is used as a listener callback in the producing driver API. +This callback can also be easily plugged into the `doOnNext()` Reactor operator when a message producer implementation builds a `Flux` of messages instead of listener-based functionality. +In fact, this is done in the framework when an `outputChannel` of the message producer is not a `ReactiveStreamsSubscribableChannel`. +However, for better end-user experience and allow to have more back-pressure ready functionality, the `MessageProducerSupport` provides a `subscribeToPublisher(Publisher>)` API to be used in the target implementation when a `Publisher>>` is a source of data from the target system. +Typically it is used from the `doStart()` implementation when target driver API is called for a `Publisher` of source data. +It is recommended to combine a reactive `MessageProducerSupport` implementation with a `FluxMessageChannel` as an `outputChannel` for on demand subscription and events consumption downstream. +The channel adapter goes to stopped state when a subscription to the `Publisher` is cancelled. +Calling `stop()` on such a channel adapter completes a producing from the source `Publisher`. +The channel adapter can be restarted with automatic subscription to a newly created source `Publisher`. + === Splitter and Aggregator When an `AbstractMessageSplitter` gets a `Publisher` for its logic, the process goes naturally over the items in the `Publisher` to map them into messages for sending to the `outputChannel`. diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index 0403e500fac..4839d92b691 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -55,8 +55,9 @@ See <<./handler-advice.adoc#handle-message-advice,Handling Message Advice>> for [[x5.3-mongodb-reactive-channel-adapters]] ==== MongoDB Reactive Channel Adapters -`spring-integration-mongodb` module now provides channel adapter implementations for Reactive MongoDB driver support in Spring Data. -See <<./mongodb.adoc#mongodb-reactive-channel-adapters,MongoDB Reactive Channel Adapters>> for more information. +`spring-integration-mongodb` module now provides channel adapter implementations for Reactive MongoDb driver support in Spring Data. +Also a reactive implementation for MongoDb change stream support is present with a `MongoDbChangeStreamMessageProducer`. +See <<./mongodb.adoc#mongodb,MongoDB Support>> for more information. [[x5.3-general]] === General Changes @@ -81,6 +82,9 @@ See also <<./transactions.adoc#reactive-transactions,Reactive Transactions>>. A new `intercept()` operator to register `ChannelInterceptor` instances without creating explicit channels was added into Java DSL. See <<./dsl.adoc#java-dsl-intercept,Operator intercept()>> for more information. +The `MessageProducerSupport` now has a `subscribeToPublisher(Publisher>)` API to allow to implement message-driven producer endpoints which emit messages via reactive `Publisher`. +See <<./reactive-streams.adoc#reactive-streams,Reactive Streams Support>> for more information. + [[x5.3-amqp]] === AMQP Changes