1
1
/*
2
- * Copyright 2017-2019 the original author or authors.
2
+ * Copyright 2017-2020 the original author or authors.
3
3
*
4
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
5
* you may not use this file except in compliance with the License.
@@ -102,7 +102,7 @@ public final class ReactorNettyClient implements Client {
102
102
103
103
private final FluxSink <Publisher <FrontendMessage >> requests = this .requestProcessor .sink ();
104
104
105
- private final Queue <ResponseReceiver > responseReceivers = Queues .<ResponseReceiver >unbounded ().get ();
105
+ private final Queue <Conversation > conversations = Queues .<Conversation >unbounded ().get ();
106
106
107
107
private final DirectProcessor <NotificationResponse > notificationProcessor = DirectProcessor .create ();
108
108
@@ -139,11 +139,11 @@ private ReactorNettyClient(Connection connection) {
139
139
handleConnectionError (throwable );
140
140
})
141
141
.handle ((message , sink ) -> {
142
- ResponseReceiver receiver = this .responseReceivers .peek ();
142
+ Conversation receiver = this .conversations .peek ();
143
143
if (receiver != null ) {
144
144
if (receiver .takeUntil .test (message )) {
145
145
receiver .sink .complete ();
146
- this .responseReceivers .poll ();
146
+ this .conversations .poll ();
147
147
} else {
148
148
receiver .sink .next (message );
149
149
}
@@ -174,6 +174,7 @@ private ReactorNettyClient(Connection connection) {
174
174
175
175
@ Override
176
176
public Flux <BackendMessage > exchange (Predicate <BackendMessage > takeUntil , Publisher <FrontendMessage > requests ) {
177
+ Assert .requireNonNull (takeUntil , "takeUntil must not be null" );
177
178
Assert .requireNonNull (requests , "requests must not be null" );
178
179
179
180
return Flux
@@ -183,7 +184,7 @@ public Flux<BackendMessage> exchange(Predicate<BackendMessage> takeUntil, Publis
183
184
return ;
184
185
}
185
186
synchronized (this ) {
186
- this .responseReceivers .add (new ResponseReceiver (sink , takeUntil ));
187
+ this .conversations .add (new Conversation (sink , takeUntil ));
187
188
this .requests .next (Flux .from (requests ).doOnNext (m -> {
188
189
if (!isConnected ()) {
189
190
sink .error (new PostgresConnectionClosedException ("Cannot exchange messages because the connection is closed" ));
@@ -379,9 +380,9 @@ public Mono<Void> close() {
379
380
}
380
381
381
382
private void drainError (Supplier <? extends Throwable > supplier ) {
382
- ResponseReceiver receiver ;
383
+ Conversation receiver ;
383
384
384
- while ((receiver = this .responseReceivers .poll ()) != null ) {
385
+ while ((receiver = this .conversations .poll ()) != null ) {
385
386
receiver .sink .error (supplier .get ());
386
387
}
387
388
}
@@ -452,13 +453,19 @@ private void handleConnectionError(Throwable error) {
452
453
drainError (() -> new PostgresConnectionException (error ));
453
454
}
454
455
455
- private static class ResponseReceiver {
456
+ /**
457
+ * Value object representing a single conversation. The driver permits a single conversation at a time to ensure that request messages get routed to the proper response receiver and do not leak
458
+ * into other conversations. A conversation must be finished in the sense that the {@link Publisher} of {@link FrontendMessage} has completed before the next conversation is started.
459
+ * <p>
460
+ * A single conversation can make use of pipelining.
461
+ */
462
+ private static class Conversation {
456
463
457
464
private final FluxSink <BackendMessage > sink ;
458
465
459
466
private final Predicate <BackendMessage > takeUntil ;
460
467
461
- private ResponseReceiver (FluxSink <BackendMessage > sink , Predicate <BackendMessage > takeUntil ) {
468
+ private Conversation (FluxSink <BackendMessage > sink , Predicate <BackendMessage > takeUntil ) {
462
469
this .sink = sink ;
463
470
this .takeUntil = takeUntil ;
464
471
}
0 commit comments