29
29
import java .util .concurrent .Future ;
30
30
import java .util .concurrent .TimeUnit ;
31
31
import java .util .concurrent .atomic .AtomicBoolean ;
32
- import java .util .function .Function ;
33
32
import java .util .stream .Collectors ;
34
- import java .util .stream .IntStream ;
35
- import java .util .stream .StreamSupport ;
36
33
37
34
import org .junit .jupiter .api .Test ;
38
35
import org .reactivestreams .Publisher ;
39
36
40
- import org .springframework .aop .ThrowsAdvice ;
41
37
import org .springframework .beans .factory .annotation .Autowired ;
42
38
import org .springframework .beans .factory .annotation .Qualifier ;
43
39
import org .springframework .context .Lifecycle ;
44
40
import org .springframework .context .annotation .Bean ;
45
41
import org .springframework .context .annotation .Configuration ;
46
42
import org .springframework .integration .IntegrationMessageHeaderAccessor ;
47
- import org .springframework .integration .annotation .Reactive ;
48
- import org .springframework .integration .annotation .ServiceActivator ;
49
43
import org .springframework .integration .channel .QueueChannel ;
50
44
import org .springframework .integration .config .EnableIntegration ;
51
45
import org .springframework .integration .dsl .IntegrationFlow ;
57
51
import org .springframework .messaging .Message ;
58
52
import org .springframework .messaging .MessageChannel ;
59
53
import org .springframework .messaging .support .GenericMessage ;
60
- import org .springframework .scheduling .annotation .Schedules ;
61
54
import org .springframework .test .annotation .DirtiesContext ;
62
55
import org .springframework .test .context .junit .jupiter .SpringJUnitConfig ;
63
56
64
- import io .vavr .collection .Stream ;
65
57
import reactor .core .publisher .Flux ;
66
58
import reactor .core .scheduler .Schedulers ;
67
59
@@ -123,7 +115,7 @@ void testReactiveFlow() throws Exception {
123
115
this .messageSource .start ();
124
116
assertThat (latch .await (10 , TimeUnit .SECONDS )).isTrue ();
125
117
String [] strings = results .toArray (new String [0 ]);
126
- assertThat (strings ).isEqualTo (new String [] { "A" , "B" , "C" , "D" , "E" , "F" });
118
+ assertThat (strings ).isEqualTo (new String []{ "A" , "B" , "C" , "D" , "E" , "F" });
127
119
this .messageSource .stop ();
128
120
}
129
121
@@ -246,22 +238,6 @@ void fixedSubscriberChannelFlowTest() throws InterruptedException {
246
238
assertThat (latch .await (10 , TimeUnit .SECONDS )).isTrue ();
247
239
}
248
240
249
- @ Autowired
250
- MessageChannel directChannel ;
251
-
252
- @ Test
253
- void testReactiveCustomizer () throws InterruptedException {
254
- Flux .fromStream (IntStream .range (1 , 100 ).boxed ())
255
-
256
- .subscribe (value -> System .out .println (Thread .currentThread ().getName () + ": " + value ));
257
-
258
- /*for (int i = 0; i < 100; i++) {
259
- this.directChannel.send(new GenericMessage<>(i));
260
- }
261
- */
262
- Thread .sleep (1000 );
263
- }
264
-
265
241
@ Configuration
266
242
@ EnableIntegration
267
243
public static class ContextConfiguration {
@@ -281,12 +257,15 @@ public Publisher<Message<String>> reactiveFlow() {
281
257
}
282
258
283
259
@ Bean
284
- public IntegrationFlow reactiveEndpointFlow () {
260
+ public Publisher < Message < Integer >> pollableReactiveFlow () {
285
261
return IntegrationFlows
286
262
.from ("inputChannel" )
263
+ .split (s -> s .delimiters ("," ))
287
264
.<String , Integer >transform (Integer ::parseInt ,
288
- e -> e .reactive (flux -> flux .publishOn (Schedulers .parallel ())))
289
- .get ();
265
+ e -> e .reactive (flux -> flux .publishOn (Schedulers .parallel ())).id ("reactiveTransformer" ))
266
+ .channel (MessageChannels .queue ())
267
+ .log ()
268
+ .toReactivePublisher ();
290
269
}
291
270
292
271
@ Bean
@@ -305,16 +284,6 @@ public Publisher<Message<String>> fixedSubscriberChannelFlow() {
305
284
.toReactivePublisher ();
306
285
}
307
286
308
- @ Bean
309
- public Function <Flux <?>, Flux <?>> publishOnCustomizer () {
310
- return flux -> flux .publishOn (Schedulers .parallel ());
311
- }
312
-
313
- @ ServiceActivator (inputChannel = "directChannel" , reactive = @ Reactive ("publishOnCustomizer" ))
314
- public void handleReactive (String payload ) {
315
- System .out .println (Thread .currentThread ().getName () + ": " + payload );
316
- }
317
-
318
287
}
319
288
320
289
}
0 commit comments