52
52
import reactor .core .publisher .Flux ;
53
53
import reactor .core .publisher .FluxSink ;
54
54
import reactor .core .publisher .Mono ;
55
- import reactor .core .publisher .MonoSink ;
56
55
import reactor .core .publisher .SynchronousSink ;
57
56
import reactor .netty .Connection ;
58
57
import reactor .netty .resources .ConnectionProvider ;
76
75
import java .util .concurrent .atomic .AtomicInteger ;
77
76
import java .util .concurrent .atomic .AtomicReference ;
78
77
import java .util .function .Consumer ;
79
- import java .util .function .Function ;
78
+ import java .util .function .Predicate ;
80
79
import java .util .function .Supplier ;
81
80
82
81
import static io .r2dbc .postgresql .client .TransactionStatus .IDLE ;
@@ -104,7 +103,7 @@ public final class ReactorNettyClient implements Client {
104
103
105
104
private final FluxSink <FrontendMessage > requests = this .requestProcessor .sink ();
106
105
107
- private final Queue <MonoSink < Flux < BackendMessage >>> responseReceivers = Queues .<MonoSink < Flux < BackendMessage >> >unbounded ().get ();
106
+ private final Queue <ResponseReceiver > responseReceivers = Queues .<ResponseReceiver >unbounded ().get ();
108
107
109
108
private final DirectProcessor <NotificationResponse > notificationProcessor = DirectProcessor .create ();
110
109
@@ -140,21 +139,15 @@ private ReactorNettyClient(Connection connection) {
140
139
receiveError .set (throwable );
141
140
handleConnectionError (throwable );
142
141
})
143
- .windowWhile (it -> it .getClass () != ReadyForQuery .class )
144
- .doOnNext (fluxOfMessages -> {
145
- MonoSink <Flux <BackendMessage >> receiver = this .responseReceivers .poll ();
142
+ .handle ((message , sink ) -> {
143
+ ResponseReceiver receiver = this .responseReceivers .peek ();
146
144
if (receiver != null ) {
147
- receiver .success (fluxOfMessages .doOnComplete (() -> {
148
-
149
- Throwable throwable = receiveError .get ();
150
- if (throwable != null ) {
151
- throw new PostgresConnectionException (throwable );
152
- }
153
-
154
- if (!isConnected ()) {
155
- throw EXPECTED .get ();
156
- }
157
- }));
145
+ if (receiver .takeUntil .test (message )) {
146
+ receiver .sink .complete ();
147
+ this .responseReceivers .poll ();
148
+ } else {
149
+ receiver .sink .next (message );
150
+ }
158
151
}
159
152
})
160
153
.doOnComplete (this ::handleClose )
@@ -179,6 +172,50 @@ private ReactorNettyClient(Connection connection) {
179
172
.subscribe ();
180
173
}
181
174
175
+ @ Override
176
+ public Flux <BackendMessage > exchange (Predicate <BackendMessage > takeUntil , Publisher <FrontendMessage > requests ) {
177
+ Assert .requireNonNull (requests , "requests must not be null" );
178
+
179
+ return Flux
180
+ .create (sink -> {
181
+
182
+ final AtomicInteger once = new AtomicInteger ();
183
+
184
+ Flux .from (requests )
185
+ .subscribe (message -> {
186
+
187
+ if (!isConnected ()) {
188
+ ReferenceCountUtil .safeRelease (message );
189
+ sink .error (new PostgresConnectionClosedException ("Cannot exchange messages because the connection is closed" ));
190
+ return ;
191
+ }
192
+
193
+ if (once .get () == 0 && once .compareAndSet (0 , 1 )) {
194
+ synchronized (this ) {
195
+ this .responseReceivers .add (new ResponseReceiver (sink , takeUntil ));
196
+ this .requests .next (message );
197
+ }
198
+ } else {
199
+ this .requests .next (message );
200
+ }
201
+
202
+ }, this .requests ::error , () -> {
203
+
204
+ if (!isConnected ()) {
205
+ sink .error (new PostgresConnectionClosedException ("Cannot exchange messages because the connection is closed" ));
206
+ }
207
+ });
208
+
209
+ });
210
+ }
211
+
212
+ @ Override
213
+ public void send (FrontendMessage message ) {
214
+ Assert .requireNonNull (message , "requests must not be null" );
215
+
216
+ this .requests .next (message );
217
+ }
218
+
182
219
private Mono <Void > resumeError (Throwable throwable ) {
183
220
184
221
handleConnectionError (throwable );
@@ -357,42 +394,12 @@ public Mono<Void> close() {
357
394
});
358
395
}
359
396
360
- @ Override
361
- public Flux <BackendMessage > exchange (Publisher <FrontendMessage > requests ) {
362
- Assert .requireNonNull (requests , "requests must not be null" );
363
-
364
- return Mono
365
- .<Flux <BackendMessage >>create (sink -> {
366
-
367
- final AtomicInteger once = new AtomicInteger ();
368
-
369
- Flux .from (requests )
370
- .subscribe (message -> {
371
-
372
- if (!isConnected ()) {
373
- ReferenceCountUtil .safeRelease (message );
374
- sink .error (new PostgresConnectionClosedException ("Cannot exchange messages because the connection is closed" ));
375
- return ;
376
- }
377
-
378
- if (once .get () == 0 && once .compareAndSet (0 , 1 )) {
379
- synchronized (this ) {
380
- this .responseReceivers .add (sink );
381
- this .requests .next (message );
382
- }
383
- } else {
384
- this .requests .next (message );
385
- }
386
-
387
- }, this .requests ::error , () -> {
388
-
389
- if (!isConnected ()) {
390
- sink .error (new PostgresConnectionClosedException ("Cannot exchange messages because the connection is closed" ));
391
- }
392
- });
397
+ private void drainError (Supplier <? extends Throwable > supplier ) {
398
+ ResponseReceiver receiver ;
393
399
394
- })
395
- .flatMapMany (Function .identity ());
400
+ while ((receiver = this .responseReceivers .poll ()) != null ) {
401
+ receiver .sink .error (supplier .get ());
402
+ }
396
403
}
397
404
398
405
@ Override
@@ -461,11 +468,15 @@ private void handleConnectionError(Throwable error) {
461
468
drainError (() -> new PostgresConnectionException (error ));
462
469
}
463
470
464
- private void drainError (Supplier <? extends Throwable > supplier ) {
465
- MonoSink <Flux <BackendMessage >> receiver ;
471
+ private static class ResponseReceiver {
466
472
467
- while ((receiver = this .responseReceivers .poll ()) != null ) {
468
- receiver .error (supplier .get ());
473
+ private final FluxSink <BackendMessage > sink ;
474
+
475
+ private final Predicate <BackendMessage > takeUntil ;
476
+
477
+ private ResponseReceiver (FluxSink <BackendMessage > sink , Predicate <BackendMessage > takeUntil ) {
478
+ this .sink = sink ;
479
+ this .takeUntil = takeUntil ;
469
480
}
470
481
}
471
482
0 commit comments