Skip to content

Commit d8c378b

Browse files
artembilangaryrussell
authored andcommitted
GH-2788: Add MongoDbChangeStreamMessageProducer
Fixes #2788 * Introduce a `MessageProducerSupport.subscribeToPublisher(Publisher<Message<?>>)` for components which produces `Flux` for data from their source * Such a component is auto-stopped when subscription to that `Publisher` is canceled * Implement a `MongoDbChangeStreamMessageProducer` based on the reactive support for in Spring Data MongoDb * Implement a Java DSL for `MongoDbChangeStreamMessageProducer` * Disable a test for change stream since it requires server of version 4.x started with 'replSet' option * Add `MongoHeaders` for change stream events * Change `MessageProducerSupport` to use a `takeWhile((message) -> isRunning())` instead of storing a `subscription` from a callback * Document new features * Remove trailing whitespaces * Doc Polishing.
1 parent 2d7e473 commit d8c378b

File tree

11 files changed

+745
-16
lines changed

11 files changed

+745
-16
lines changed

spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java

Lines changed: 58 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,11 +16,14 @@
1616

1717
package org.springframework.integration.endpoint;
1818

19+
import org.reactivestreams.Publisher;
20+
1921
import org.springframework.beans.factory.BeanFactory;
2022
import org.springframework.beans.factory.SmartInitializingSingleton;
2123
import org.springframework.core.AttributeAccessor;
2224
import org.springframework.integration.IntegrationPattern;
2325
import org.springframework.integration.IntegrationPatternType;
26+
import org.springframework.integration.channel.ReactiveStreamsSubscribableChannel;
2427
import org.springframework.integration.core.MessageProducer;
2528
import org.springframework.integration.core.MessagingTemplate;
2629
import org.springframework.integration.history.MessageHistory;
@@ -36,6 +39,8 @@
3639
import org.springframework.util.Assert;
3740
import org.springframework.util.StringUtils;
3841

42+
import reactor.core.publisher.Flux;
43+
3944
/**
4045
* A support class for producer endpoints that provides a setter for the
4146
* output channel and a convenience method for sending Messages.
@@ -176,15 +181,17 @@ protected void onInit() {
176181
}
177182

178183
/**
179-
* Takes no action by default. Subclasses may override this if they
184+
* Take no action by default.
185+
* Subclasses may override this if they
180186
* need lifecycle-managed behavior. Protected by 'lifecycleLock'.
181187
*/
182188
@Override
183189
protected void doStart() {
184190
}
185191

186192
/**
187-
* Takes no action by default. Subclasses may override this if they
193+
* Take no action by default.
194+
* Subclasses may override this if they
188195
* need lifecycle-managed behavior.
189196
*/
190197
@Override
@@ -196,13 +203,10 @@ protected void sendMessage(Message<?> messageArg) {
196203
if (message == null) {
197204
throw new MessagingException("cannot send a null message");
198205
}
199-
if (this.shouldTrack) {
200-
message = MessageHistory.write(message, this, getMessageBuilderFactory());
201-
}
206+
message = trackMessageIfAny(message);
202207
try {
203-
MessageChannel messageChannel = getOutputChannel();
204-
Assert.state(messageChannel != null, "The 'outputChannel' or `outputChannelName` must be configured");
205-
this.messagingTemplate.send(messageChannel, message);
208+
MessageChannel outputChannel = getRequiredOutputChannel();
209+
this.messagingTemplate.send(outputChannel, message);
206210
}
207211
catch (RuntimeException ex) {
208212
if (!sendErrorMessageIfNecessary(message, ex)) {
@@ -211,14 +215,41 @@ protected void sendMessage(Message<?> messageArg) {
211215
}
212216
}
213217

218+
protected void subscribeToPublisher(Publisher<? extends Message<?>> publisher) {
219+
MessageChannel outputChannel = getRequiredOutputChannel();
220+
221+
Flux<? extends Message<?>> messageFlux =
222+
Flux.from(publisher)
223+
.map(this::trackMessageIfAny)
224+
.doOnComplete(this::stop)
225+
.doOnCancel(this::stop)
226+
.takeWhile((message) -> isRunning());
227+
228+
if (outputChannel instanceof ReactiveStreamsSubscribableChannel) {
229+
((ReactiveStreamsSubscribableChannel) outputChannel).subscribeTo(messageFlux);
230+
}
231+
else {
232+
messageFlux
233+
.doOnNext((message) -> {
234+
try {
235+
sendMessage(message);
236+
}
237+
catch (Exception ex) {
238+
logger.error("Error sending a message: " + message, ex);
239+
}
240+
})
241+
.subscribe();
242+
}
243+
}
244+
214245
/**
215246
* Send an error message based on the exception and message.
216247
* @param message the message.
217248
* @param exception the exception.
218249
* @return true if the error channel is available and message sent.
219250
* @since 4.3.10
220251
*/
221-
protected final boolean sendErrorMessageIfNecessary(Message<?> message, RuntimeException exception) {
252+
protected final boolean sendErrorMessageIfNecessary(Message<?> message, Exception exception) {
222253
MessageChannel channel = getErrorChannel();
223254
if (channel != null) {
224255
this.messagingTemplate.send(channel, buildErrorMessage(message, exception));
@@ -235,9 +266,8 @@ protected final boolean sendErrorMessageIfNecessary(Message<?> message, RuntimeE
235266
* @return the error message.
236267
* @since 4.3.10
237268
*/
238-
protected final ErrorMessage buildErrorMessage(Message<?> message, RuntimeException exception) {
239-
return this.errorMessageStrategy.buildErrorMessage(exception,
240-
getErrorMessageAttributes(message));
269+
protected final ErrorMessage buildErrorMessage(Message<?> message, Exception exception) {
270+
return this.errorMessageStrategy.buildErrorMessage(exception, getErrorMessageAttributes(message));
241271
}
242272

243273
/**
@@ -252,4 +282,19 @@ protected AttributeAccessor getErrorMessageAttributes(Message<?> message) {
252282
return ErrorMessageUtils.getAttributeAccessor(message, null);
253283
}
254284

285+
private MessageChannel getRequiredOutputChannel() {
286+
MessageChannel messageChannel = getOutputChannel();
287+
Assert.state(messageChannel != null, "The 'outputChannel' or `outputChannelName` must be configured");
288+
return messageChannel;
289+
}
290+
291+
private Message<?> trackMessageIfAny(Message<?> message) {
292+
if (this.shouldTrack) {
293+
return MessageHistory.write(message, this, getMessageBuilderFactory());
294+
}
295+
else {
296+
return message;
297+
}
298+
}
299+
255300
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.endpoint;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import org.junit.jupiter.api.Test;
22+
23+
import org.springframework.beans.factory.annotation.Autowired;
24+
import org.springframework.context.annotation.Bean;
25+
import org.springframework.context.annotation.Configuration;
26+
import org.springframework.integration.channel.FluxMessageChannel;
27+
import org.springframework.integration.config.EnableIntegration;
28+
import org.springframework.messaging.Message;
29+
import org.springframework.messaging.support.GenericMessage;
30+
import org.springframework.test.annotation.DirtiesContext;
31+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
32+
33+
import reactor.core.publisher.Flux;
34+
import reactor.test.StepVerifier;
35+
36+
/**
37+
* @author Artem Bilan
38+
*
39+
* @since 5.3
40+
*/
41+
@SpringJUnitConfig
42+
@DirtiesContext
43+
public class ReactiveMessageProducerTests {
44+
45+
@Autowired
46+
public FluxMessageChannel fluxMessageChannel;
47+
48+
@Autowired
49+
public MessageProducerSupport producer;
50+
51+
@Test
52+
public void test() {
53+
assertThat(this.producer.isRunning()).isTrue();
54+
55+
StepVerifier.create(
56+
Flux.from(this.fluxMessageChannel)
57+
.map(Message::getPayload)
58+
.cast(String.class))
59+
.expectNext("test1", "test2")
60+
.thenCancel()
61+
.verify();
62+
63+
assertThat(this.producer.isRunning()).isFalse();
64+
}
65+
66+
@Configuration
67+
@EnableIntegration
68+
public static class Config {
69+
70+
@Bean
71+
public FluxMessageChannel fluxMessageChannel() {
72+
return new FluxMessageChannel();
73+
}
74+
75+
@Bean
76+
public MessageProducerSupport producer() {
77+
MessageProducerSupport producer =
78+
new MessageProducerSupport() {
79+
80+
@Override
81+
protected void doStart() {
82+
subscribeToPublisher(Flux.just("test1", "test2").map(GenericMessage::new));
83+
}
84+
85+
};
86+
producer.setOutputChannel(fluxMessageChannel());
87+
return producer;
88+
}
89+
90+
}
91+
92+
}

spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/dsl/MongoDb.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.springframework.data.mongodb.core.query.Query;
2525
import org.springframework.expression.common.LiteralExpression;
2626
import org.springframework.integration.expression.ValueExpression;
27+
import org.springframework.integration.mongodb.inbound.MongoDbChangeStreamMessageProducer;
2728

2829
/**
2930
* Factory class for building MongoDb components
@@ -140,6 +141,18 @@ public static ReactiveMongoDbMessageSourceSpec reactiveInboundChannelAdapter(Rea
140141
return new ReactiveMongoDbMessageSourceSpec(mongoTemplate, new ValueExpression<>(query));
141142
}
142143

144+
/**
145+
* Create a {@link MongoDbChangeStreamMessageProducerSpec} builder instance
146+
* based on the provided {@link ReactiveMongoOperations}.
147+
* @param mongoOperations the {@link ReactiveMongoOperations} to use.
148+
* @return the {@link MongoDbChangeStreamMessageProducerSpec} instance
149+
* @since 5.3
150+
*/
151+
public static MongoDbChangeStreamMessageProducerSpec changeStreamInboundChannelAdapter(
152+
ReactiveMongoOperations mongoOperations) {
153+
154+
return new MongoDbChangeStreamMessageProducerSpec(new MongoDbChangeStreamMessageProducer(mongoOperations));
155+
}
143156

144157
private MongoDb() {
145158
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.mongodb.dsl;
18+
19+
import org.springframework.data.mongodb.core.ChangeStreamOptions;
20+
import org.springframework.integration.dsl.MessageProducerSpec;
21+
import org.springframework.integration.mongodb.inbound.MongoDbChangeStreamMessageProducer;
22+
23+
/**
24+
* A {@link MessageProducerSpec} for tne {@link MongoDbChangeStreamMessageProducer}.
25+
*
26+
* @author Artem Bilan
27+
*
28+
* @since 5.3
29+
*/
30+
public class MongoDbChangeStreamMessageProducerSpec
31+
extends MessageProducerSpec<MongoDbChangeStreamMessageProducerSpec, MongoDbChangeStreamMessageProducer> {
32+
33+
/**
34+
* Construct a builder based on an initial {@link MongoDbChangeStreamMessageProducerSpec}.
35+
* @param producer the {@link MongoDbChangeStreamMessageProducerSpec} to use.
36+
*/
37+
public MongoDbChangeStreamMessageProducerSpec(MongoDbChangeStreamMessageProducer producer) {
38+
super(producer);
39+
}
40+
41+
/**
42+
* Configure a domain type to convert change event body into.
43+
* @param domainType the type to use.
44+
* @return the spec.
45+
*/
46+
public MongoDbChangeStreamMessageProducerSpec domainType(Class<?> domainType) {
47+
this.target.setDomainType(domainType);
48+
return this;
49+
}
50+
51+
/**
52+
* Configure a collection to subscribe for change events.
53+
* @param collection the collection to use.
54+
* @return the spec.
55+
*/
56+
public MongoDbChangeStreamMessageProducerSpec collection(String collection) {
57+
this.target.setCollection(collection);
58+
return this;
59+
}
60+
61+
/**
62+
* Configure a {@link ChangeStreamOptions}.
63+
* @param options the {@link ChangeStreamOptions} to use.
64+
* @return the spec.
65+
*/
66+
public MongoDbChangeStreamMessageProducerSpec options(ChangeStreamOptions options) {
67+
this.target.setOptions(options);
68+
return this;
69+
}
70+
71+
/**
72+
* Configure a flag to extract body from a change event or use event as a payload.
73+
* @param extractBody to extract body or not.
74+
* @return the spec.
75+
*/
76+
public MongoDbChangeStreamMessageProducerSpec extractBody(boolean extractBody) {
77+
this.target.setExtractBody(extractBody);
78+
return this;
79+
}
80+
81+
}

0 commit comments

Comments
 (0)