20
20
21
21
import java .util .Map ;
22
22
import java .util .function .Function ;
23
+ import java .util .function .Supplier ;
23
24
24
25
import org .reactivestreams .Publisher ;
25
26
29
30
import org .springframework .data .cassandra .ReactiveSession ;
30
31
import org .springframework .data .cassandra .ReactiveSessionFactory ;
31
32
import org .springframework .data .cassandra .core .cql .session .DefaultReactiveSessionFactory ;
33
+ import org .springframework .data .util .Lazy ;
32
34
import org .springframework .lang .Nullable ;
33
35
import org .springframework .util .Assert ;
34
36
@@ -302,7 +304,7 @@ public <T> Flux<T> execute(ReactiveSessionCallback<T> action) throws DataAccessE
302
304
303
305
Assert .notNull (action , "Callback object must not be null" );
304
306
305
- return createFlux (action ).onErrorMap (translateException ("ReactiveSessionCallback" , toCql (action )));
307
+ return createFlux (action ).onErrorMap (translateException ("ReactiveSessionCallback" , () -> toCql (action )));
306
308
}
307
309
308
310
// -------------------------------------------------------------------------
@@ -436,14 +438,16 @@ public <T> Flux<T> query(Statement<?> statement, ReactiveResultSetExtractor<T> r
436
438
Assert .notNull (statement , "CQL Statement must not be null" );
437
439
Assert .notNull (rse , "ReactiveResultSetExtractor must not be null" );
438
440
441
+ Lazy <String > cql = Lazy .of (() -> toCql (statement ));
442
+
439
443
return createFlux (statement , (session , stmt ) -> {
440
444
441
445
if (logger .isDebugEnabled ()) {
442
- logger .debug (String .format ("Executing statement [%s]" , toCql ( statement )));
446
+ logger .debug (String .format ("Executing statement [%s]" , cql . get ( )));
443
447
}
444
448
445
449
return session .execute (applyStatementSettings (statement )).flatMapMany (rse ::extractData );
446
- }).onErrorMap (translateException ("Query" , toCql ( statement ) ));
450
+ }).onErrorMap (translateException ("Query" , cql ));
447
451
}
448
452
449
453
/* (non-Javadoc)
@@ -503,20 +507,22 @@ public Mono<ReactiveResultSet> queryForResultSet(Statement<?> statement) throws
503
507
504
508
Assert .notNull (statement , "CQL Statement must not be null" );
505
509
510
+ Lazy <String > cql = Lazy .of (() -> toCql (statement ));
511
+
506
512
return createMono (statement , (session , executedStatement ) -> {
507
513
508
514
if (logger .isDebugEnabled ()) {
509
- logger .debug (String .format ("Executing statement [%s]" , toCql ( statement )));
515
+ logger .debug (String .format ("Executing statement [%s]" , cql . get ( )));
510
516
}
511
517
512
518
return session .execute (applyStatementSettings (executedStatement ));
513
- }).onErrorMap (translateException ("QueryForResultSet" , toCql ( statement ) ));
519
+ }).onErrorMap (translateException ("QueryForResultSet" , cql ));
514
520
}
515
521
516
522
@ Override
517
523
public Flux <Row > queryForRows (Statement <?> statement ) throws DataAccessException {
518
524
return queryForResultSet (statement ).flatMapMany (ReactiveResultSet ::rows )
519
- .onErrorMap (translateException ("QueryForRows" , toCql (statement )));
525
+ .onErrorMap (translateException ("QueryForRows" , () -> toCql (statement )));
520
526
}
521
527
522
528
// -------------------------------------------------------------------------
@@ -533,14 +539,16 @@ public <T> Flux<T> execute(ReactivePreparedStatementCreator psc, ReactivePrepare
533
539
Assert .notNull (psc , "ReactivePreparedStatementCreator must not be null" );
534
540
Assert .notNull (action , "ReactivePreparedStatementCallback object must not be null" );
535
541
542
+ Lazy <String > cql = Lazy .of (() -> toCql (psc ));
543
+
536
544
return createFlux (session -> {
537
545
538
546
if (logger .isDebugEnabled ()) {
539
- logger .debug (String .format ("Preparing statement [%s] using %s" , toCql ( psc ), psc ));
547
+ logger .debug (String .format ("Preparing statement [%s] using %s" , cql . get ( ), psc ));
540
548
}
541
549
542
550
return psc .createPreparedStatement (session ).flatMapMany (ps -> action .doInPreparedStatement (session , ps ));
543
- }).onErrorMap (translateException ("ReactivePreparedStatementCallback" , toCql ( psc ) ));
551
+ }).onErrorMap (translateException ("ReactivePreparedStatementCallback" , cql ));
544
552
}
545
553
546
554
/* (non-Javadoc)
@@ -579,7 +587,7 @@ public <T> Flux<T> query(ReactivePreparedStatementCreator psc,
579
587
: preparedStatement .bind ());
580
588
581
589
return session .execute (applyStatementSettings (boundStatement ));
582
- }).flatMap (rse ::extractData )).onErrorMap (translateException ("Query" , toCql (psc )));
590
+ }).flatMap (rse ::extractData )).onErrorMap (translateException ("Query" , () -> toCql (psc )));
583
591
}
584
592
585
593
/* (non-Javadoc)
@@ -819,7 +827,20 @@ protected <T> Flux<T> createFlux(ReactiveSessionCallback<T> callback) {
819
827
* @see CqlProvider
820
828
*/
821
829
protected Function <Throwable , Throwable > translateException (String task , @ Nullable String cql ) {
822
- return throwable -> throwable instanceof DriverException ? translate (task , cql , (DriverException ) throwable )
830
+ return translateException (task , () -> cql );
831
+ }
832
+
833
+ /**
834
+ * Exception translation {@link Function} intended for {@link Mono#onErrorMap(Function)} usage.
835
+ *
836
+ * @param task readable text describing the task being attempted
837
+ * @param cql supplier of CQL query or update that caused the problem (may be {@literal null})
838
+ * @return the exception translation {@link Function}
839
+ * @since 3.3.3
840
+ * @see CqlProvider
841
+ */
842
+ protected Function <Throwable , Throwable > translateException (String task , Supplier <String > cql ) {
843
+ return throwable -> throwable instanceof DriverException ? translate (task , cql .get (), (DriverException ) throwable )
823
844
: throwable ;
824
845
}
825
846
0 commit comments