@@ -253,28 +253,31 @@ private static Flux<BackendMessage> fetchCursoredWithFlush(ExtendedFlowOperator
253
253
}
254
254
255
255
private static BiConsumer <BackendMessage , SynchronousSink <BackendMessage >> handleReprepare (Sinks .Many <FrontendMessage > requests , ExtendedFlowOperator operator , MessageFactory messageFactory ) {
256
-
257
256
AtomicBoolean reprepared = new AtomicBoolean ();
258
257
259
258
return (message , sink ) -> {
260
259
261
- if (message instanceof ErrorResponse && requiresReprepare ((ErrorResponse ) message ) && reprepared . compareAndSet ( false , true ) ) {
260
+ if (message instanceof ErrorResponse && requiresReprepare ((ErrorResponse ) message )) {
262
261
263
262
operator .evictCachedStatement ();
264
263
265
- List <FrontendMessage .DirectEncoder > messages = messageFactory .createMessages ();
266
- if (!messages .contains (Sync .INSTANCE )) {
267
- messages .add (0 , Sync .INSTANCE );
264
+ if (reprepared .compareAndSet (false , true )) {
265
+
266
+ List <FrontendMessage .DirectEncoder > messages = messageFactory .createMessages ();
267
+ if (!messages .contains (Sync .INSTANCE )) {
268
+ messages .add (0 , Sync .INSTANCE );
269
+ }
270
+ requests .emitNext (new CompositeFrontendMessage (messages ), Sinks .EmitFailureHandler .FAIL_FAST );
271
+
272
+ return ;
268
273
}
269
- requests .emitNext (new CompositeFrontendMessage (messages ), Sinks .EmitFailureHandler .FAIL_FAST );
270
- } else {
271
- sink .next (message );
272
274
}
275
+
276
+ sink .next (message );
273
277
};
274
278
}
275
279
276
280
private static boolean requiresReprepare (ErrorResponse errorResponse ) {
277
-
278
281
ErrorDetails details = new ErrorDetails (errorResponse .getFields ());
279
282
String code = details .getCode ();
280
283
@@ -304,7 +307,7 @@ interface MessageFactory {
304
307
/**
305
308
* Operator to encapsulate common activity around the extended flow. Subclasses {@link AtomicInteger} to capture the number of ReadyForQuery frames.
306
309
*/
307
- static class ExtendedFlowOperator extends AtomicInteger {
310
+ static class ExtendedFlowOperator extends AtomicInteger implements Predicate < BackendMessage > {
308
311
309
312
private final String sql ;
310
313
@@ -328,7 +331,6 @@ public ExtendedFlowOperator(String sql, Binding binding, StatementCache cache, L
328
331
this .values = values ;
329
332
this .portal = portal ;
330
333
this .forceBinary = forceBinary ;
331
- set (1 );
332
334
}
333
335
334
336
public void close (Sinks .Many <FrontendMessage > requests ) {
@@ -337,9 +339,6 @@ public void close(Sinks.Many<FrontendMessage> requests) {
337
339
}
338
340
339
341
public void evictCachedStatement () {
340
-
341
- incrementAndGet ();
342
-
343
342
synchronized (this ) {
344
343
this .name = null ;
345
344
}
@@ -351,14 +350,16 @@ public void hydrateStatementCache() {
351
350
}
352
351
353
352
public Predicate <BackendMessage > takeUntil () {
354
- return m -> {
353
+ return this ;
354
+ }
355
355
356
- if (m instanceof ReadyForQuery ) {
357
- return decrementAndGet () <= 0 ;
358
- }
356
+ @ Override
357
+ public boolean test (BackendMessage backendMessage ) {
358
+ if (backendMessage instanceof ReadyForQuery ) {
359
+ return decrementAndGet () <= 0 ;
360
+ }
359
361
360
- return false ;
361
- };
362
+ return false ;
362
363
}
363
364
364
365
private boolean isPrepareRequired () {
@@ -376,6 +377,7 @@ public String getStatementName() {
376
377
}
377
378
378
379
public List <FrontendMessage .DirectEncoder > getMessages (Collection <FrontendMessage .DirectEncoder > append ) {
380
+ incrementAndGet ();
379
381
List <FrontendMessage .DirectEncoder > messagesToSend = new ArrayList <>(6 );
380
382
381
383
if (isPrepareRequired ()) {
0 commit comments