17
17
18
18
import reactor .core .publisher .Flux ;
19
19
import reactor .core .publisher .Mono ;
20
- import reactor .core .publisher .MonoProcessor ;
20
+ import reactor .core .publisher .Sinks ;
21
21
22
22
import java .nio .ByteBuffer ;
23
23
import java .util .Arrays ;
@@ -106,7 +106,7 @@ private Mono<Void> doDestroy() {
106
106
return Mono .empty ();
107
107
}
108
108
109
- ReactiveRedisConnection connection = this . connection ;
109
+ ReactiveRedisConnection connection = getRequiredConnection () ;
110
110
111
111
Flux <Void > terminationSignals = null ;
112
112
while (!subscriptions .isEmpty ()) {
@@ -136,7 +136,7 @@ private Mono<Void> doDestroy() {
136
136
public Collection <ReactiveSubscription > getActiveSubscriptions () {
137
137
138
138
return subscriptions .entrySet ().stream ().filter (entry -> entry .getValue ().hasRegistration ())
139
- .map (entry -> entry . getKey () ).collect (Collectors .toList ());
139
+ .map (Map . Entry :: getKey ).collect (Collectors .toList ());
140
140
}
141
141
142
142
/**
@@ -223,8 +223,8 @@ public <C, B> Flux<Message<C, B>> receive(Iterable<? extends Topic> topics, Seri
223
223
* subscription/unsubscription and can be used for synchronization.
224
224
*
225
225
* @param topics the channels to subscribe.
226
- * @param channelSerializer
227
- * @param messageSerializer
226
+ * @param channelSerializer serialization pair to decode the channel/pattern name.
227
+ * @param messageSerializer serialization pair to decode the message body.
228
228
* @param subscriptionListener listener to receive subscription/unsubscription notifications.
229
229
* @return the message stream.
230
230
* @see #receive(Iterable, SerializationPair, SerializationPair)
@@ -249,7 +249,7 @@ public <C, B> Flux<Message<C, B>> receive(Iterable<? extends Topic> topics, Seri
249
249
}
250
250
251
251
return doReceive (channelSerializer , messageSerializer ,
252
- connection .pubSubCommands ().createSubscription (subscriptionListener ), patterns ,
252
+ getRequiredConnection () .pubSubCommands ().createSubscription (subscriptionListener ), patterns ,
253
253
channels );
254
254
}
255
255
@@ -261,7 +261,7 @@ private <C, B> Flux<Message<C, B>> doReceive(SerializationPair<C> channelSeriali
261
261
262
262
Mono <Void > subscribe = subscribe (patterns , channels , it );
263
263
264
- MonoProcessor < ChannelMessage <ByteBuffer , ByteBuffer >> terminalProcessor = MonoProcessor . create ();
264
+ Sinks . One < Message <ByteBuffer , ByteBuffer >> terminalSink = Sinks . one ();
265
265
return it .receive ().mergeWith (subscribe .then (Mono .defer (() -> {
266
266
267
267
getSubscribers (it ).registered ();
@@ -272,9 +272,9 @@ private <C, B> Flux<Message<C, B>> doReceive(SerializationPair<C> channelSeriali
272
272
Subscribers subscribers = getSubscribers (it );
273
273
if (subscribers .unregister ()) {
274
274
subscriptions .remove (it );
275
- it .cancel ().subscribe (v -> terminalProcessor . onComplete (), terminalProcessor :: onError );
275
+ it .cancel ().subscribe (v -> terminalSink . tryEmitEmpty (), terminalSink :: tryEmitError );
276
276
}
277
- }).mergeWith (terminalProcessor );
277
+ }).mergeWith (terminalSink . asMono () );
278
278
});
279
279
280
280
return messageStream
@@ -303,7 +303,7 @@ private static Mono<Void> subscribe(ByteBuffer[] patterns, ByteBuffer[] channels
303
303
}
304
304
}
305
305
306
- return subscribe ;
306
+ return subscribe == null ? Mono . empty () : subscribe ;
307
307
}
308
308
309
309
private boolean isActive () {
@@ -330,13 +330,12 @@ private ByteBuffer[] getTargets(Iterable<? extends Topic> topics, Class<?> class
330
330
.toArray (ByteBuffer []::new );
331
331
}
332
332
333
- @ SuppressWarnings ("unchecked" )
334
333
private <C , B > Message <C , B > readMessage (RedisElementReader <C > channelSerializer ,
335
334
RedisElementReader <B > messageSerializer , Message <ByteBuffer , ByteBuffer > message ) {
336
335
337
336
if (message instanceof PatternMessage ) {
338
337
339
- PatternMessage <ByteBuffer , ByteBuffer , ByteBuffer > patternMessage = (PatternMessage ) message ;
338
+ PatternMessage <ByteBuffer , ByteBuffer , ByteBuffer > patternMessage = (PatternMessage < ByteBuffer , ByteBuffer , ByteBuffer > ) message ;
340
339
341
340
String pattern = read (stringSerializationPair .getReader (), patternMessage .getPattern ());
342
341
C channel = read (channelSerializer , patternMessage .getChannel ());
@@ -351,6 +350,17 @@ private <C, B> Message<C, B> readMessage(RedisElementReader<C> channelSerializer
351
350
return new ChannelMessage <>(channel , body );
352
351
}
353
352
353
+ private ReactiveRedisConnection getRequiredConnection () {
354
+
355
+ ReactiveRedisConnection connection = this .connection ;
356
+
357
+ if (connection == null ) {
358
+ throw new IllegalStateException ("Connection no longer available" );
359
+ }
360
+
361
+ return connection ;
362
+ }
363
+
354
364
private static <C > C read (RedisElementReader <C > reader , ByteBuffer buffer ) {
355
365
356
366
try {
0 commit comments