Skip to content

Commit 285c380

Browse files
authored
Add example to Spring Integration with Reactive Streams
* Add outbound channel adapter example * Add an example for the `CustomReactiveMessageHandler` usage * Emphasize that the examples are for Reactive Streams * Fix a missing letter that caused the text to get inside the code block * Emphasize that the first example is an event driven inbound channel adapter * Fix a redundant question mark from the example code
1 parent f6bb91c commit 285c380

File tree

1 file changed

+139
-4
lines changed

1 file changed

+139
-4
lines changed

src/reference/asciidoc/reactive-streams.adoc

+139-4
Original file line numberDiff line numberDiff line change
@@ -160,12 +160,147 @@ When the target protocol for integration provides a Reactive Streams solution, i
160160
An inbound, event-driven channel adapter implementation is about wrapping a request (if necessary) into a deferred `Mono` or `Flux` and perform a send (and produce reply, if any) only when a protocol component initiates a subscription into a `Mono` returned from the listener method.
161161
This way we have a reactive stream solution encapsulated exactly in this component.
162162
Of course, downstream integration flow subscribed on the output channel should honor Reactive Streams specification and be performed in the on demand, back-pressure ready manner.
163-
This is not always available by the nature (or the current implementation) of `MessageHandler` processor used in the integration flow.
163+
164+
This is not always available by the nature (or with the current implementation) of `MessageHandler` processor used in the integration flow.
164165
This limitation can be handled using thread pools and queues or `FluxMessageChannel` (see above) before and after integration endpoints when there is no reactive implementation.
165166

166-
A reactive outbound channel adapter implementation is about initiation (or continuation) of a reactive stream to interaction with an external system according provided reactive API for the target protocol.
167-
An inbound payload could be a reactive type per se or as an event of the whole integration flow which is a part of reactive stream on top.
168-
A returned reactive type can be subscribed immediately if we are in one-way, fire-and-forget scenario, or it is propagated downstream (request-reply scenarios) for further integration flow or an explicit subscription in the target business logic, but still downstream preserving reactive streams semantics.
167+
An example for a reactive **event-driven** inbound channel adapter:
168+
```java
169+
public class CustomReactiveMessageProducer extends MessageProducerSupport {
170+
171+
private final CustomReactiveSource customReactiveSource;
172+
173+
public CustomReactiveMessageProducer(CustomReactiveSource customReactiveSource) {
174+
Assert.notNull(customReactiveSource, "'customReactiveSource' must not be null");
175+
this.customReactiveSource = customReactiveSource;
176+
}
177+
178+
@Override
179+
protected void doStart() {
180+
Flux<Message<?>> messageFlux =
181+
this.customReactiveSource
182+
.map(event - >
183+
MessageBuilder
184+
.withPayload(event.getBody())
185+
.setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
186+
.build());
187+
188+
subscribeToPublisher(messageFlux);
189+
}
190+
}
191+
```
192+
193+
Usage would look like:
194+
195+
```java
196+
public class MainFlow {
197+
@Autowired
198+
private CustomReactiveMessageProducer customReactiveMessageProducer;
199+
200+
@Bean
201+
public IntegrationFlow buildFlow() {
202+
return IntegrationFlows.from(customReactiveMessageProducer)
203+
.channel(outputChannel)
204+
.get();
205+
}
206+
}
207+
```
208+
Or in a declarative way:
209+
210+
```java
211+
public class MainFlow {
212+
@Bean
213+
public IntegrationFlow buildFlow() {
214+
return IntegrationFlows.from(new CustomReactiveMessageProducer(new CustomReactiveSource()))
215+
.handle(outputChannel)
216+
.get();
217+
}
218+
}
219+
```
220+
Or even without a channel adapter, we can always use the Java DSL in the following way:
221+
```java
222+
public class MainFlow {
223+
@Bean
224+
public IntegrationFlow buildFlow() {
225+
Flux<Message<?>> myFlux = this.customReactiveSource
226+
.map(event - >
227+
MessageBuilder
228+
.withPayload(event.getBody())
229+
.setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
230+
.build());
231+
return IntegrationFlows.from(myFlux)
232+
.handle(outputChannel)
233+
.get();
234+
}
235+
}
236+
```
237+
238+
A reactive outbound channel adapter implementation is about the initiation (or continuation) of a reactive stream to interaction with an external system according to the provided reactive API for the target protocol.
239+
An inbound payload could be a reactive type per se or as an event of the whole integration flow which is a part of the reactive stream on top.
240+
A returned reactive type can be subscribed immediately if we are in a one-way, fire-and-forget scenario, or it is propagated downstream (request-reply scenarios) for further integration flow or an explicit subscription in the target business logic, but still downstream preserving reactive streams semantics.
241+
242+
An example for a reactive outbound channel adapter:
243+
```java
244+
public class CustomReactiveMessageHandler extends AbstractReactiveMessageHandler {
245+
246+
private final CustomEntityOperations customEntityOperations;
247+
248+
public CustomReactiveMessageHandler(CustomEntityOperations customEntityOperations) {
249+
Assert.notNull(customEntityOperations, "'customEntityOperations' must not be null");
250+
this.customEntityOperations = customEntityOperations;
251+
}
252+
253+
@Override
254+
protected Mono<Void> handleMessageInternal(Message<?> message) {
255+
return Mono.fromSupplier(() -> message.getHeaders().get("queryType", Type.class))
256+
.flatMap(mode -> {
257+
switch (mode) {
258+
case INSERT:
259+
return handleInsert(message);
260+
case UPDATE:
261+
return handleUpdate(message);
262+
default:
263+
return Mono.error(new IllegalArgumentException());
264+
}
265+
}).then();
266+
}
267+
268+
private Mono<Void> handleInsert(Message<?> message) {
269+
return this.customEntityOperations.insert(message.getPayload())
270+
.then();
271+
}
272+
273+
private Mono<Void> handleUpdate(Message<?> message) {
274+
return this.r2dbcEntityOperations.update(message.getPayload())
275+
.then();
276+
}
277+
278+
public enum Type {
279+
INSERT,
280+
UPDATE,
281+
}
282+
}
283+
```
284+
285+
We will be able to use both of the channel adatpers:
286+
```java
287+
public class MainFlow {
288+
@Autowired
289+
private CustomReactiveMessageProducer customReactiveMessageProducer;
290+
291+
@Autowired
292+
private CustomReactiveMessageHandler customReactiveMessageHandler;
293+
294+
@Bean
295+
public IntegrationFlow buildFlow() {
296+
return IntegrationFlows.from(customReactiveMessageProducer)
297+
.transform(someOperation)
298+
.handle(customReactiveMessageHandler)
299+
.get();
300+
}
301+
}
302+
```
303+
169304

170305
Currently Spring Integration provides channel adapter (or gateway) implementations for <<./webflux.adoc#webflux,WebFlux>>, <<./rsocket.adoc#rsocket,RSocket>>, <<./mongodb.adoc#mongodb,MongoDb>> and <<./r2dbc.adoc#r2dbc,R2DBC>>.
171306
The <<./redis.adoc#redis-stream-outbound,Redis Stream Channel Adapters>> are also reactive and uses `ReactiveStreamOperations` from Spring Data.

0 commit comments

Comments
 (0)