19
19
import reactor .core .publisher .Mono ;
20
20
21
21
import java .util .Map ;
22
- import java .util .Optional ;
23
22
import java .util .function .Function ;
23
+ import java .util .function .Supplier ;
24
24
25
25
import org .reactivestreams .Publisher ;
26
26
30
30
import org .springframework .data .cassandra .ReactiveSession ;
31
31
import org .springframework .data .cassandra .ReactiveSessionFactory ;
32
32
import org .springframework .data .cassandra .core .cql .session .DefaultReactiveSessionFactory ;
33
+ import org .springframework .data .util .Lazy ;
33
34
import org .springframework .lang .Nullable ;
34
35
import org .springframework .util .Assert ;
35
36
@@ -303,7 +304,7 @@ public <T> Flux<T> execute(ReactiveSessionCallback<T> action) throws DataAccessE
303
304
304
305
Assert .notNull (action , "Callback object must not be null" );
305
306
306
- return createFlux (action ).onErrorMap (translateException ("ReactiveSessionCallback" , toCql (action )));
307
+ return createFlux (action ).onErrorMap (translateException ("ReactiveSessionCallback" , () -> toCql (action )));
307
308
}
308
309
309
310
// -------------------------------------------------------------------------
@@ -437,14 +438,16 @@ public <T> Flux<T> query(Statement<?> statement, ReactiveResultSetExtractor<T> r
437
438
Assert .notNull (statement , "CQL Statement must not be null" );
438
439
Assert .notNull (rse , "ReactiveResultSetExtractor must not be null" );
439
440
441
+ Lazy <String > cql = Lazy .of (() -> toCql (statement ));
442
+
440
443
return createFlux (statement , (session , stmt ) -> {
441
444
442
445
if (logger .isDebugEnabled ()) {
443
- logger .debug (String .format ("Executing statement [%s]" , toCql ( statement )));
446
+ logger .debug (String .format ("Executing statement [%s]" , cql . get ( )));
444
447
}
445
448
446
449
return session .execute (applyStatementSettings (statement )).flatMapMany (rse ::extractData );
447
- }).onErrorMap (translateException ("Query" , toCql ( statement ) ));
450
+ }).onErrorMap (translateException ("Query" , cql ));
448
451
}
449
452
450
453
/* (non-Javadoc)
@@ -504,20 +507,22 @@ public Mono<ReactiveResultSet> queryForResultSet(Statement<?> statement) throws
504
507
505
508
Assert .notNull (statement , "CQL Statement must not be null" );
506
509
510
+ Lazy <String > cql = Lazy .of (() -> toCql (statement ));
511
+
507
512
return createMono (statement , (session , executedStatement ) -> {
508
513
509
514
if (logger .isDebugEnabled ()) {
510
- logger .debug (String .format ("Executing statement [%s]" , toCql ( statement )));
515
+ logger .debug (String .format ("Executing statement [%s]" , cql . get ( )));
511
516
}
512
517
513
518
return session .execute (applyStatementSettings (executedStatement ));
514
- }).onErrorMap (translateException ("QueryForResultSet" , toCql ( statement ) ));
519
+ }).onErrorMap (translateException ("QueryForResultSet" , cql ));
515
520
}
516
521
517
522
@ Override
518
523
public Flux <Row > queryForRows (Statement <?> statement ) throws DataAccessException {
519
524
return queryForResultSet (statement ).flatMapMany (ReactiveResultSet ::rows )
520
- .onErrorMap (translateException ("QueryForRows" , toCql (statement )));
525
+ .onErrorMap (translateException ("QueryForRows" , () -> toCql (statement )));
521
526
}
522
527
523
528
// -------------------------------------------------------------------------
@@ -534,14 +539,16 @@ public <T> Flux<T> execute(ReactivePreparedStatementCreator psc, ReactivePrepare
534
539
Assert .notNull (psc , "ReactivePreparedStatementCreator must not be null" );
535
540
Assert .notNull (action , "ReactivePreparedStatementCallback object must not be null" );
536
541
542
+ Lazy <String > cql = Lazy .of (() -> toCql (psc ));
543
+
537
544
return createFlux (session -> {
538
545
539
546
if (logger .isDebugEnabled ()) {
540
- logger .debug (String .format ("Preparing statement [%s] using %s" , toCql ( psc ), psc ));
547
+ logger .debug (String .format ("Preparing statement [%s] using %s" , cql . get ( ), psc ));
541
548
}
542
549
543
550
return psc .createPreparedStatement (session ).flatMapMany (ps -> action .doInPreparedStatement (session , ps ));
544
- }).onErrorMap (translateException ("ReactivePreparedStatementCallback" , toCql ( psc ) ));
551
+ }).onErrorMap (translateException ("ReactivePreparedStatementCallback" , cql ));
545
552
}
546
553
547
554
/* (non-Javadoc)
@@ -580,7 +587,7 @@ public <T> Flux<T> query(ReactivePreparedStatementCreator psc,
580
587
: preparedStatement .bind ());
581
588
582
589
return session .execute (applyStatementSettings (boundStatement ));
583
- }).flatMap (rse ::extractData )).onErrorMap (translateException ("Query" , toCql (psc )));
590
+ }).flatMap (rse ::extractData )).onErrorMap (translateException ("Query" , () -> toCql (psc )));
584
591
}
585
592
586
593
/* (non-Javadoc)
@@ -820,7 +827,20 @@ protected <T> Flux<T> createFlux(ReactiveSessionCallback<T> callback) {
820
827
* @see CqlProvider
821
828
*/
822
829
protected Function <Throwable , Throwable > translateException (String task , @ Nullable String cql ) {
823
- return throwable -> throwable instanceof DriverException ? translate (task , cql , (DriverException ) throwable )
830
+ return translateException (task , () -> cql );
831
+ }
832
+
833
+ /**
834
+ * Exception translation {@link Function} intended for {@link Mono#onErrorMap(Function)} usage.
835
+ *
836
+ * @param task readable text describing the task being attempted
837
+ * @param cql supplier of CQL query or update that caused the problem (may be {@literal null})
838
+ * @return the exception translation {@link Function}
839
+ * @since 3.2.10
840
+ * @see CqlProvider
841
+ */
842
+ protected Function <Throwable , Throwable > translateException (String task , Supplier <String > cql ) {
843
+ return throwable -> throwable instanceof DriverException ? translate (task , cql .get (), (DriverException ) throwable )
824
844
: throwable ;
825
845
}
826
846
0 commit comments