Skip to content

Commit bb2b7a4

Browse files
committed
Add support for Batch-level QueryOptions.
Closes #1192
1 parent 7d0a718 commit bb2b7a4

File tree

6 files changed

+102
-11
lines changed

6 files changed

+102
-11
lines changed

spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/CassandraBatchOperations.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import java.util.Collections;
1919

20+
import org.springframework.data.cassandra.core.cql.QueryOptions;
2021
import org.springframework.data.cassandra.core.cql.WriteOptions;
2122
import org.springframework.util.Assert;
2223

@@ -59,6 +60,16 @@ public interface CassandraBatchOperations {
5960
*/
6061
CassandraBatchOperations withTimestamp(long timestamp);
6162

63+
/**
64+
* Apply given {@link QueryOptions} to the whole batch statement.
65+
*
66+
* @param options the options to apply.
67+
* @return {@code this} {@link CassandraBatchOperations}.
68+
* @throws IllegalStateException if the batch was already executed.
69+
* @since 4.4
70+
*/
71+
CassandraBatchOperations withQueryOptions(QueryOptions options);
72+
6273
/**
6374
* Add a {@link BatchableStatement statement} to the batch.
6475
*

spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/CassandraBatchTemplate.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.springframework.data.cassandra.core.convert.CassandraConverter;
2222
import org.springframework.data.cassandra.core.cql.QueryOptions;
23+
import org.springframework.data.cassandra.core.cql.QueryOptionsUtil;
2324
import org.springframework.data.cassandra.core.cql.WriteOptions;
2425
import org.springframework.data.cassandra.core.mapping.BasicCassandraPersistentEntity;
2526
import org.springframework.data.cassandra.core.mapping.CassandraMappingContext;
@@ -58,6 +59,8 @@ class CassandraBatchTemplate implements CassandraBatchOperations {
5859

5960
private final StatementFactory statementFactory;
6061

62+
private QueryOptions options = QueryOptions.empty();
63+
6164
/**
6265
* Create a new {@link CassandraBatchTemplate} given {@link CassandraOperations} and {@link BatchType}.
6366
*
@@ -114,7 +117,8 @@ protected StatementFactory getStatementFactory() {
114117
public WriteResult execute() {
115118

116119
if (this.executed.compareAndSet(false, true)) {
117-
return WriteResult.of(this.operations.getCqlOperations().queryForResultSet(batch.build()));
120+
BatchStatement statement = QueryOptionsUtil.addQueryOptions(batch.build(), this.options);
121+
return WriteResult.of(this.operations.getCqlOperations().queryForResultSet(statement));
118122
}
119123

120124
throw new IllegalStateException("This Cassandra Batch was already executed");
@@ -130,9 +134,21 @@ public CassandraBatchOperations withTimestamp(long timestamp) {
130134
return this;
131135
}
132136

137+
@Override
138+
public CassandraBatchOperations withQueryOptions(QueryOptions options) {
139+
140+
assertNotExecuted();
141+
Assert.notNull(options, "QueryOptions must not be null");
142+
143+
this.options = options;
144+
145+
return this;
146+
}
147+
133148
@Override
134149
public CassandraBatchOperations addStatement(BatchableStatement<?> statement) {
135150

151+
assertNotExecuted();
136152
Assert.notNull(statement, "Statement must not be null");
137153

138154
this.batch.addStatement(statement);
@@ -143,6 +159,7 @@ public CassandraBatchOperations addStatement(BatchableStatement<?> statement) {
143159
@Override
144160
public CassandraBatchOperations addStatements(BatchableStatement<?>... statements) {
145161

162+
assertNotExecuted();
146163
Assert.notNull(statements, "Statements must not be null");
147164

148165
this.batch.addStatements(statements);
@@ -154,6 +171,7 @@ public CassandraBatchOperations addStatements(BatchableStatement<?>... statement
154171
@SuppressWarnings("unchecked")
155172
public CassandraBatchOperations addStatements(Iterable<? extends BatchableStatement<?>> statements) {
156173

174+
assertNotExecuted();
157175
Assert.notNull(statements, "Statements must not be null");
158176

159177
this.batch.addStatements((Iterable<BatchableStatement<?>>) statements);

spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveCassandraBatchOperations.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import org.reactivestreams.Subscriber;
2424

25+
import org.springframework.data.cassandra.core.cql.QueryOptions;
2526
import org.springframework.data.cassandra.core.cql.WriteOptions;
2627
import org.springframework.util.Assert;
2728

@@ -65,6 +66,16 @@ public interface ReactiveCassandraBatchOperations {
6566
*/
6667
ReactiveCassandraBatchOperations withTimestamp(long timestamp);
6768

69+
/**
70+
* Apply given {@link QueryOptions} to the whole batch statement.
71+
*
72+
* @param options the options to apply.
73+
* @return {@code this} {@link CassandraBatchOperations}.
74+
* @throws IllegalStateException if the batch was already executed.
75+
* @since 4.4
76+
*/
77+
ReactiveCassandraBatchOperations withQueryOptions(QueryOptions options);
78+
6879
/**
6980
* Add a {@link BatchableStatement statement} to the batch.
7081
*

spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveCassandraBatchTemplate.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
import org.springframework.data.cassandra.core.convert.CassandraConverter;
2929
import org.springframework.data.cassandra.core.cql.QueryOptions;
30+
import org.springframework.data.cassandra.core.cql.QueryOptionsUtil;
3031
import org.springframework.data.cassandra.core.cql.WriteOptions;
3132
import org.springframework.data.cassandra.core.mapping.BasicCassandraPersistentEntity;
3233
import org.springframework.data.cassandra.core.mapping.CassandraMappingContext;
@@ -66,6 +67,8 @@ class ReactiveCassandraBatchTemplate implements ReactiveCassandraBatchOperations
6667

6768
private final StatementFactory statementFactory;
6869

70+
private QueryOptions options = QueryOptions.empty();
71+
6972
/**
7073
* Create a new {@link CassandraBatchTemplate} given {@link CassandraOperations} and {@link BatchType}.
7174
*
@@ -141,7 +144,8 @@ public Mono<WriteResult> execute() {
141144

142145
this.batch.addStatements((List<BatchableStatement<?>>) statements);
143146

144-
return this.operations.getReactiveCqlOperations().queryForResultSet(this.batch.build());
147+
return this.operations.getReactiveCqlOperations()
148+
.queryForResultSet(QueryOptionsUtil.addQueryOptions(this.batch.build(), this.options));
145149
}) //
146150
.flatMap(resultSet -> resultSet.rows().collectList()
147151
.map(rows -> new WriteResult(resultSet.getAllExecutionInfo(), resultSet.wasApplied(), rows)));
@@ -160,9 +164,21 @@ public ReactiveCassandraBatchOperations withTimestamp(long timestamp) {
160164
return this;
161165
}
162166

167+
@Override
168+
public ReactiveCassandraBatchOperations withQueryOptions(QueryOptions options) {
169+
170+
assertNotExecuted();
171+
Assert.notNull(options, "QueryOptions must not be null");
172+
173+
this.options = options;
174+
175+
return this;
176+
}
177+
163178
@Override
164179
public ReactiveCassandraBatchOperations addStatement(Mono<? extends BatchableStatement<?>> statement) {
165180

181+
assertNotExecuted();
166182
Assert.notNull(statement, "Statement mono must not be null");
167183

168184
this.batchMonos.add(statement.map(List::of));
@@ -174,6 +190,7 @@ public ReactiveCassandraBatchOperations addStatement(Mono<? extends BatchableSta
174190
public ReactiveCassandraBatchOperations addStatements(
175191
Mono<? extends Iterable<? extends BatchableStatement<?>>> statements) {
176192

193+
assertNotExecuted();
177194
Assert.notNull(statements, "Statements mono must not be null");
178195

179196
this.batchMonos.add(statements);

spring-data-cassandra/src/test/java/org/springframework/data/cassandra/core/CassandraBatchTemplateIntegrationTests.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,17 @@
2424
import org.junit.jupiter.api.BeforeEach;
2525
import org.junit.jupiter.api.Test;
2626

27+
import org.springframework.data.cassandra.CassandraConnectionFailureException;
28+
import org.springframework.data.cassandra.core.cql.QueryOptions;
2729
import org.springframework.data.cassandra.core.cql.WriteOptions;
2830
import org.springframework.data.cassandra.domain.FlatGroup;
2931
import org.springframework.data.cassandra.domain.Group;
3032
import org.springframework.data.cassandra.domain.GroupKey;
3133
import org.springframework.data.cassandra.repository.support.SchemaTestUtils;
3234
import org.springframework.data.cassandra.test.util.AbstractKeyspaceCreatingIntegrationTests;
3335

36+
import com.datastax.oss.driver.api.core.AllNodesFailedException;
37+
import com.datastax.oss.driver.api.core.ConsistencyLevel;
3438
import com.datastax.oss.driver.api.core.cql.BatchType;
3539
import com.datastax.oss.driver.api.core.cql.ResultSet;
3640
import com.datastax.oss.driver.api.core.cql.Row;
@@ -297,7 +301,19 @@ void shouldApplyTimestampToAllEntities() {
297301
}
298302
}
299303

300-
@Test // DATACASS-288
304+
@Test // GH-1192
305+
void shouldApplyQueryOptions() {
306+
307+
QueryOptions options = QueryOptions.builder().consistencyLevel(ConsistencyLevel.THREE).build();
308+
309+
CassandraBatchOperations batchOperations = new CassandraBatchTemplate(template, BatchType.LOGGED);
310+
CassandraBatchOperations ops = batchOperations.insert(walter).withQueryOptions(options);
311+
312+
assertThatExceptionOfType(CassandraConnectionFailureException.class).isThrownBy(ops::execute)
313+
.withRootCauseInstanceOf(AllNodesFailedException.class);
314+
}
315+
316+
@Test // GH-1192
301317
void shouldNotExecuteTwice() {
302318

303319
CassandraBatchOperations batchOperations = new CassandraBatchTemplate(template, BatchType.LOGGED);

spring-data-cassandra/src/test/java/org/springframework/data/cassandra/core/ReactiveCassandraBatchTemplateIntegrationTests.java

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,10 @@
3030
import org.junit.jupiter.api.BeforeEach;
3131
import org.junit.jupiter.api.Test;
3232

33+
import org.springframework.data.cassandra.CassandraConnectionFailureException;
3334
import org.springframework.data.cassandra.ReactiveResultSet;
3435
import org.springframework.data.cassandra.core.convert.MappingCassandraConverter;
36+
import org.springframework.data.cassandra.core.cql.QueryOptions;
3537
import org.springframework.data.cassandra.core.cql.ReactiveCqlTemplate;
3638
import org.springframework.data.cassandra.core.cql.WriteOptions;
3739
import org.springframework.data.cassandra.core.cql.session.DefaultBridgedReactiveSession;
@@ -41,6 +43,8 @@
4143
import org.springframework.data.cassandra.repository.support.SchemaTestUtils;
4244
import org.springframework.data.cassandra.test.util.AbstractKeyspaceCreatingIntegrationTests;
4345

46+
import com.datastax.oss.driver.api.core.AllNodesFailedException;
47+
import com.datastax.oss.driver.api.core.ConsistencyLevel;
4448
import com.datastax.oss.driver.api.core.cql.BatchType;
4549
import com.datastax.oss.driver.api.core.cql.Row;
4650
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
@@ -153,8 +157,7 @@ void shouldInsertCollectionOfEntitiesWithTtl() {
153157
.then(template.getReactiveCqlOperations().queryForResultSet("SELECT TTL(email), email FROM group;"));
154158

155159
resultSet.flatMapMany(ReactiveResultSet::availableRows) //
156-
.collectList()
157-
.as(StepVerifier::create) //
160+
.collectList().as(StepVerifier::create) //
158161
.assertNext(rows -> {
159162

160163
for (Row row : rows) {
@@ -252,8 +255,7 @@ void shouldUpdateCollectionOfEntitiesWithTtl() {
252255
.then(template.getReactiveCqlOperations().queryForResultSet("SELECT TTL(email), email FROM group;"));
253256

254257
resultSet.flatMapMany(ReactiveResultSet::availableRows) //
255-
.collectList()
256-
.as(StepVerifier::create) //
258+
.collectList().as(StepVerifier::create) //
257259
.assertNext(rows -> {
258260

259261
for (Row row : rows) {
@@ -388,6 +390,20 @@ void shouldApplyTimestampToAllEntities() {
388390
.assertNext(row -> assertThat(row.getLong(0)).isEqualTo(timestamp)).verifyComplete();
389391
}
390392

393+
@Test // GH-1192
394+
void shouldApplyQueryOptions() {
395+
396+
QueryOptions options = QueryOptions.builder().consistencyLevel(ConsistencyLevel.THREE).build();
397+
398+
ReactiveCassandraBatchOperations batchOperations = new ReactiveCassandraBatchTemplate(template, BatchType.LOGGED);
399+
Mono<WriteResult> execute = batchOperations.insert(walter).insert(mike).withQueryOptions(options).execute();
400+
401+
execute.as(StepVerifier::create).verifyErrorSatisfies(e -> {
402+
assertThat(e).isInstanceOf(CassandraConnectionFailureException.class)
403+
.hasRootCauseInstanceOf(AllNodesFailedException.class);
404+
});
405+
}
406+
391407
@Test // DATACASS-574
392408
void shouldNotExecuteTwice() {
393409

@@ -428,10 +444,12 @@ void shouldSupportMultithreadedMerge() {
428444

429445
for (int i = 0; i < 100; i++) {
430446

431-
batchOperations.insert(Mono.just(Arrays.asList(new Group(new GroupKey("users", "0x1", "walter" + random.longs())),
432-
new Group(new GroupKey("users", "0x1", "walter" + random.longs())),
433-
new Group(new GroupKey("users", "0x1", "walter" + random.longs())),
434-
new Group(new GroupKey("users", "0x1", "walter" + random.longs())))).publishOn(Schedulers.boundedElastic()));
447+
batchOperations.insert(Mono
448+
.just(Arrays.asList(new Group(new GroupKey("users", "0x1", "walter" + random.longs())),
449+
new Group(new GroupKey("users", "0x1", "walter" + random.longs())),
450+
new Group(new GroupKey("users", "0x1", "walter" + random.longs())),
451+
new Group(new GroupKey("users", "0x1", "walter" + random.longs()))))
452+
.publishOn(Schedulers.boundedElastic()));
435453
}
436454

437455
batchOperations.execute()

0 commit comments

Comments
 (0)