@@ -257,28 +257,31 @@ private static Flux<BackendMessage> fetchCursoredWithFlush(ExtendedFlowOperator
257
257
}
258
258
259
259
private static BiConsumer <BackendMessage , SynchronousSink <BackendMessage >> handleReprepare (FluxSink <FrontendMessage > requests , ExtendedFlowOperator operator , MessageFactory messageFactory ) {
260
-
261
260
AtomicBoolean reprepared = new AtomicBoolean ();
262
261
263
262
return (message , sink ) -> {
264
263
265
- if (message instanceof ErrorResponse && requiresReprepare ((ErrorResponse ) message ) && reprepared . compareAndSet ( false , true ) ) {
264
+ if (message instanceof ErrorResponse && requiresReprepare ((ErrorResponse ) message )) {
266
265
267
266
operator .evictCachedStatement ();
268
267
269
- List <FrontendMessage .DirectEncoder > messages = messageFactory .createMessages ();
270
- if (!messages .contains (Sync .INSTANCE )) {
271
- messages .add (0 , Sync .INSTANCE );
268
+ if (reprepared .compareAndSet (false , true )) {
269
+
270
+ List <FrontendMessage .DirectEncoder > messages = messageFactory .createMessages ();
271
+ if (!messages .contains (Sync .INSTANCE )) {
272
+ messages .add (0 , Sync .INSTANCE );
273
+ }
274
+ requests .next (new CompositeFrontendMessage (messages ));
275
+
276
+ return ;
272
277
}
273
- requests .next (new CompositeFrontendMessage (messages ));
274
- } else {
275
- sink .next (message );
276
278
}
279
+
280
+ sink .next (message );
277
281
};
278
282
}
279
283
280
284
private static boolean requiresReprepare (ErrorResponse errorResponse ) {
281
-
282
285
ErrorDetails details = new ErrorDetails (errorResponse .getFields ());
283
286
String code = details .getCode ();
284
287
@@ -308,7 +311,7 @@ interface MessageFactory {
308
311
/**
309
312
* Operator to encapsulate common activity around the extended flow. Subclasses {@link AtomicInteger} to capture the number of ReadyForQuery frames.
310
313
*/
311
- static class ExtendedFlowOperator extends AtomicInteger {
314
+ static class ExtendedFlowOperator extends AtomicInteger implements Predicate < BackendMessage > {
312
315
313
316
private final String sql ;
314
317
@@ -332,7 +335,6 @@ public ExtendedFlowOperator(String sql, Binding binding, StatementCache cache, L
332
335
this .values = values ;
333
336
this .portal = portal ;
334
337
this .forceBinary = forceBinary ;
335
- set (1 );
336
338
}
337
339
338
340
public void close (FluxSink <FrontendMessage > requests ) {
@@ -341,9 +343,6 @@ public void close(FluxSink<FrontendMessage> requests) {
341
343
}
342
344
343
345
public void evictCachedStatement () {
344
-
345
- incrementAndGet ();
346
-
347
346
synchronized (this ) {
348
347
this .name = null ;
349
348
}
@@ -355,14 +354,16 @@ public void hydrateStatementCache() {
355
354
}
356
355
357
356
public Predicate <BackendMessage > takeUntil () {
358
- return m -> {
357
+ return this ;
358
+ }
359
359
360
- if (m instanceof ReadyForQuery ) {
361
- return decrementAndGet () <= 0 ;
362
- }
360
+ @ Override
361
+ public boolean test (BackendMessage backendMessage ) {
362
+ if (backendMessage instanceof ReadyForQuery ) {
363
+ return decrementAndGet () <= 0 ;
364
+ }
363
365
364
- return false ;
365
- };
366
+ return false ;
366
367
}
367
368
368
369
private boolean isPrepareRequired () {
@@ -380,6 +381,7 @@ public String getStatementName() {
380
381
}
381
382
382
383
public List <FrontendMessage .DirectEncoder > getMessages (Collection <FrontendMessage .DirectEncoder > append ) {
384
+ incrementAndGet ();
383
385
List <FrontendMessage .DirectEncoder > messagesToSend = new ArrayList <>(6 );
384
386
385
387
if (isPrepareRequired ()) {
0 commit comments