15
15
*/
16
16
package org .springframework .data .cassandra .core ;
17
17
18
+ import reactor .core .publisher .Flux ;
19
+ import reactor .core .publisher .Mono ;
20
+ import reactor .core .publisher .SynchronousSink ;
21
+
18
22
import java .util .Collections ;
19
23
import java .util .function .BiConsumer ;
20
24
import java .util .function .Function ;
21
25
22
- import com .datastax .oss .driver .api .core .CqlIdentifier ;
23
- import com .datastax .oss .driver .api .core .DriverException ;
24
- import com .datastax .oss .driver .api .core .config .DefaultDriverOption ;
25
- import com .datastax .oss .driver .api .core .context .DriverContext ;
26
- import com .datastax .oss .driver .api .core .cql .BatchType ;
27
- import com .datastax .oss .driver .api .core .cql .BoundStatement ;
28
- import com .datastax .oss .driver .api .core .cql .PreparedStatement ;
29
- import com .datastax .oss .driver .api .core .cql .Row ;
30
- import com .datastax .oss .driver .api .core .cql .SimpleStatement ;
31
- import com .datastax .oss .driver .api .core .cql .Statement ;
32
- import com .datastax .oss .driver .api .querybuilder .QueryBuilder ;
33
- import com .datastax .oss .driver .api .querybuilder .delete .Delete ;
34
- import com .datastax .oss .driver .api .querybuilder .insert .Insert ;
35
- import com .datastax .oss .driver .api .querybuilder .insert .RegularInsert ;
36
- import com .datastax .oss .driver .api .querybuilder .select .Select ;
37
- import com .datastax .oss .driver .api .querybuilder .truncate .Truncate ;
38
- import com .datastax .oss .driver .api .querybuilder .update .Update ;
39
26
import org .reactivestreams .Publisher ;
40
27
import org .slf4j .Logger ;
41
28
import org .slf4j .LoggerFactory ;
42
- import reactor .core .publisher .Flux ;
43
- import reactor .core .publisher .Mono ;
44
- import reactor .core .publisher .SynchronousSink ;
45
29
46
30
import org .springframework .beans .BeansException ;
47
31
import org .springframework .context .ApplicationContext ;
82
66
import org .springframework .lang .Nullable ;
83
67
import org .springframework .util .Assert ;
84
68
69
+ import com .datastax .oss .driver .api .core .CqlIdentifier ;
70
+ import com .datastax .oss .driver .api .core .DriverException ;
71
+ import com .datastax .oss .driver .api .core .config .DefaultDriverOption ;
72
+ import com .datastax .oss .driver .api .core .context .DriverContext ;
73
+ import com .datastax .oss .driver .api .core .cql .BatchType ;
74
+ import com .datastax .oss .driver .api .core .cql .BoundStatement ;
75
+ import com .datastax .oss .driver .api .core .cql .PreparedStatement ;
76
+ import com .datastax .oss .driver .api .core .cql .Row ;
77
+ import com .datastax .oss .driver .api .core .cql .SimpleStatement ;
78
+ import com .datastax .oss .driver .api .core .cql .Statement ;
79
+ import com .datastax .oss .driver .api .querybuilder .QueryBuilder ;
80
+ import com .datastax .oss .driver .api .querybuilder .delete .Delete ;
81
+ import com .datastax .oss .driver .api .querybuilder .insert .Insert ;
82
+ import com .datastax .oss .driver .api .querybuilder .insert .RegularInsert ;
83
+ import com .datastax .oss .driver .api .querybuilder .select .Select ;
84
+ import com .datastax .oss .driver .api .querybuilder .truncate .Truncate ;
85
+ import com .datastax .oss .driver .api .querybuilder .update .Update ;
86
+
85
87
/**
86
88
* Primary implementation of {@link ReactiveCassandraOperations}. It simplifies the use of Reactive Cassandra usage and
87
89
* helps to avoid common errors. It executes core Cassandra workflow. This class executes CQL queries or updates,
@@ -852,6 +854,19 @@ public ReactiveUpdate update(Class<?> domainType) {
852
854
// Implementation hooks and utility methods
853
855
// -------------------------------------------------------------------------
854
856
857
+ /**
858
+ * Create a new statement-based {@link ReactivePreparedStatementHandler} using the statement passed in.
859
+ * <p>
860
+ * This method allows for the creation to be overridden by subclasses.
861
+ *
862
+ * @param statement the statement to be prepared.
863
+ * @return the new {@link PreparedStatementHandler} to use.
864
+ * @since 3.3.3
865
+ */
866
+ protected ReactivePreparedStatementHandler createPreparedStatementHandler (Statement <?> statement ) {
867
+ return new PreparedStatementHandler (statement );
868
+ }
869
+
855
870
private <T > Mono <EntityWriteResult <T >> executeSave (T entity , CqlIdentifier tableName , SimpleStatement statement ) {
856
871
return executeSave (entity , tableName , statement , (writeResult , sink ) -> sink .next (writeResult ));
857
872
}
@@ -888,7 +903,7 @@ private <T> Flux<T> doQuery(Statement<?> statement, RowMapper<T> rowMapper) {
888
903
889
904
if (PreparedStatementDelegate .canPrepare (isUsePreparedStatements (), statement , logger )) {
890
905
891
- PreparedStatementHandler statementHandler = new PreparedStatementHandler (statement );
906
+ ReactivePreparedStatementHandler statementHandler = createPreparedStatementHandler (statement );
892
907
return getReactiveCqlOperations ().query (statementHandler , statementHandler , rowMapper );
893
908
}
894
909
@@ -899,7 +914,7 @@ private <T> Mono<T> doExecute(Statement<?> statement, Function<ReactiveResultSet
899
914
900
915
if (PreparedStatementDelegate .canPrepare (isUsePreparedStatements (), statement , logger )) {
901
916
902
- PreparedStatementHandler statementHandler = new PreparedStatementHandler (statement );
917
+ ReactivePreparedStatementHandler statementHandler = createPreparedStatementHandler (statement );
903
918
return getReactiveCqlOperations ()
904
919
.query (statementHandler , statementHandler , rs -> Mono .just (mappingFunction .apply (rs ))).next ();
905
920
}
@@ -912,7 +927,7 @@ private <T> Mono<T> doExecuteAndFlatMap(Statement<?> statement,
912
927
913
928
if (PreparedStatementDelegate .canPrepare (isUsePreparedStatements (), statement , logger )) {
914
929
915
- PreparedStatementHandler statementHandler = new PreparedStatementHandler (statement );
930
+ ReactivePreparedStatementHandler statementHandler = createPreparedStatementHandler (statement );
916
931
return getReactiveCqlOperations ().query (statementHandler , statementHandler , mappingFunction ::apply ).next ();
917
932
}
918
933
@@ -948,9 +963,7 @@ public String getCql() {
948
963
}
949
964
}
950
965
951
- return getReactiveCqlOperations ()
952
- .execute (new GetConfiguredPageSize ())
953
- .single ();
966
+ return getReactiveCqlOperations ().execute (new GetConfiguredPageSize ()).single ();
954
967
}
955
968
956
969
@ SuppressWarnings ("unchecked" )
@@ -1020,14 +1033,26 @@ protected <T> Mono<T> maybeCallBeforeSave(T object, CqlIdentifier tableName, Sta
1020
1033
return Mono .just (object );
1021
1034
}
1022
1035
1036
+ /**
1037
+ * General callback interface used to create and bind prepared CQL statements.
1038
+ * <p>
1039
+ * This interface prepares the CQL statement and sets values on a {@link PreparedStatement} as union-type comprised
1040
+ * from {@link ReactivePreparedStatementCreator}, {@link PreparedStatementBinder}, and {@link CqlProvider}.
1041
+ *
1042
+ * @since 3.3.3
1043
+ */
1044
+ public interface ReactivePreparedStatementHandler
1045
+ extends ReactivePreparedStatementCreator , PreparedStatementBinder , CqlProvider {
1046
+
1047
+ }
1048
+
1023
1049
/**
1024
1050
* Utility class to prepare a {@link SimpleStatement} and bind values associated with the statement to a
1025
1051
* {@link BoundStatement}.
1026
1052
*
1027
1053
* @since 3.2
1028
1054
*/
1029
- private static class PreparedStatementHandler
1030
- implements ReactivePreparedStatementCreator , PreparedStatementBinder , CqlProvider {
1055
+ public static class PreparedStatementHandler implements ReactivePreparedStatementHandler {
1031
1056
1032
1057
private final SimpleStatement statement ;
1033
1058
0 commit comments