20
20
21
21
import java .util .Map ;
22
22
import java .util .function .Function ;
23
+ import java .util .function .Supplier ;
23
24
24
25
import org .reactivestreams .Publisher ;
25
26
29
30
import org .springframework .data .cassandra .ReactiveSession ;
30
31
import org .springframework .data .cassandra .ReactiveSessionFactory ;
31
32
import org .springframework .data .cassandra .core .cql .session .DefaultReactiveSessionFactory ;
33
+ import org .springframework .data .util .Lazy ;
32
34
import org .springframework .lang .Nullable ;
33
35
import org .springframework .util .Assert ;
34
36
@@ -299,7 +301,7 @@ public <T> Flux<T> execute(ReactiveSessionCallback<T> action) throws DataAccessE
299
301
300
302
Assert .notNull (action , "Callback object must not be null" );
301
303
302
- return createFlux (action ).onErrorMap (translateException ("ReactiveSessionCallback" , toCql (action )));
304
+ return createFlux (action ).onErrorMap (translateException ("ReactiveSessionCallback" , () -> toCql (action )));
303
305
}
304
306
305
307
// -------------------------------------------------------------------------
@@ -394,14 +396,16 @@ public <T> Flux<T> query(Statement<?> statement, ReactiveResultSetExtractor<T> r
394
396
Assert .notNull (statement , "CQL Statement must not be null" );
395
397
Assert .notNull (rse , "ReactiveResultSetExtractor must not be null" );
396
398
399
+ Lazy <String > cql = Lazy .of (() -> toCql (statement ));
400
+
397
401
return createFlux (statement , (session , stmt ) -> {
398
402
399
403
if (logger .isDebugEnabled ()) {
400
- logger .debug (String .format ("Executing statement [%s]" , toCql ( statement )));
404
+ logger .debug (String .format ("Executing statement [%s]" , cql . get ( )));
401
405
}
402
406
403
407
return session .execute (applyStatementSettings (statement )).flatMapMany (rse ::extractData );
404
- }).onErrorMap (translateException ("Query" , toCql ( statement ) ));
408
+ }).onErrorMap (translateException ("Query" , cql ));
405
409
}
406
410
407
411
@ Override
@@ -440,20 +444,22 @@ public Mono<ReactiveResultSet> queryForResultSet(Statement<?> statement) throws
440
444
441
445
Assert .notNull (statement , "CQL Statement must not be null" );
442
446
447
+ Lazy <String > cql = Lazy .of (() -> toCql (statement ));
448
+
443
449
return createMono (statement , (session , executedStatement ) -> {
444
450
445
451
if (logger .isDebugEnabled ()) {
446
- logger .debug (String .format ("Executing statement [%s]" , toCql ( statement )));
452
+ logger .debug (String .format ("Executing statement [%s]" , cql . get ( )));
447
453
}
448
454
449
455
return session .execute (applyStatementSettings (executedStatement ));
450
- }).onErrorMap (translateException ("QueryForResultSet" , toCql ( statement ) ));
456
+ }).onErrorMap (translateException ("QueryForResultSet" , cql ));
451
457
}
452
458
453
459
@ Override
454
460
public Flux <Row > queryForRows (Statement <?> statement ) throws DataAccessException {
455
461
return queryForResultSet (statement ).flatMapMany (ReactiveResultSet ::rows )
456
- .onErrorMap (translateException ("QueryForRows" , toCql (statement )));
462
+ .onErrorMap (translateException ("QueryForRows" , () -> toCql (statement )));
457
463
}
458
464
459
465
// -------------------------------------------------------------------------
@@ -467,14 +473,16 @@ public <T> Flux<T> execute(ReactivePreparedStatementCreator psc, ReactivePrepare
467
473
Assert .notNull (psc , "ReactivePreparedStatementCreator must not be null" );
468
474
Assert .notNull (action , "ReactivePreparedStatementCallback object must not be null" );
469
475
476
+ Lazy <String > cql = Lazy .of (() -> toCql (psc ));
477
+
470
478
return createFlux (session -> {
471
479
472
480
if (logger .isDebugEnabled ()) {
473
- logger .debug (String .format ("Preparing statement [%s] using %s" , toCql ( psc ), psc ));
481
+ logger .debug (String .format ("Preparing statement [%s] using %s" , cql . get ( ), psc ));
474
482
}
475
483
476
484
return psc .createPreparedStatement (session ).flatMapMany (ps -> action .doInPreparedStatement (session , ps ));
477
- }).onErrorMap (translateException ("ReactivePreparedStatementCallback" , toCql ( psc ) ));
485
+ }).onErrorMap (translateException ("ReactivePreparedStatementCallback" , cql ));
478
486
}
479
487
480
488
@ Override
@@ -510,7 +518,7 @@ public <T> Flux<T> query(ReactivePreparedStatementCreator psc,
510
518
: preparedStatement .bind ());
511
519
512
520
return session .execute (applyStatementSettings (boundStatement ));
513
- }).flatMap (rse ::extractData )).onErrorMap (translateException ("Query" , toCql (psc )));
521
+ }).flatMap (rse ::extractData )).onErrorMap (translateException ("Query" , () -> toCql (psc )));
514
522
}
515
523
516
524
@ Override
@@ -696,7 +704,20 @@ protected <T> Flux<T> createFlux(ReactiveSessionCallback<T> callback) {
696
704
* @see CqlProvider
697
705
*/
698
706
protected Function <Throwable , Throwable > translateException (String task , @ Nullable String cql ) {
699
- return throwable -> throwable instanceof DriverException ? translate (task , cql , (DriverException ) throwable )
707
+ return translateException (task , () -> cql );
708
+ }
709
+
710
+ /**
711
+ * Exception translation {@link Function} intended for {@link Mono#onErrorMap(Function)} usage.
712
+ *
713
+ * @param task readable text describing the task being attempted
714
+ * @param cql supplier of CQL query or update that caused the problem (may be {@literal null})
715
+ * @return the exception translation {@link Function}
716
+ * @since 3.3.3
717
+ * @see CqlProvider
718
+ */
719
+ protected Function <Throwable , Throwable > translateException (String task , Supplier <String > cql ) {
720
+ return throwable -> throwable instanceof DriverException ? translate (task , cql .get (), (DriverException ) throwable )
700
721
: throwable ;
701
722
}
702
723
0 commit comments