Skip to content

Commit 799fec7

Browse files
committed
DATACOUCH-538 - Add scan consistency to analytics query template
1 parent 405df74 commit 799fec7

File tree

5 files changed

+67
-12
lines changed

5 files changed

+67
-12
lines changed

src/main/java/org/springframework/data/couchbase/core/ExecutableFindByAnalyticsOperation.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import java.util.Optional;
2020
import java.util.stream.Stream;
2121

22+
import com.couchbase.client.java.analytics.AnalyticsScanConsistency;
23+
import com.couchbase.client.java.query.QueryScanConsistency;
2224
import org.springframework.dao.IncorrectResultSizeDataAccessException;
2325
import org.springframework.data.couchbase.core.query.AnalyticsQuery;
2426
import org.springframework.lang.Nullable;
@@ -112,6 +114,17 @@ interface FindByAnalyticsWithQuery<T> extends TerminatingFindByAnalytics<T> {
112114

113115
}
114116

115-
interface ExecutableFindByAnalytics<T> extends FindByAnalyticsWithQuery<T> {}
117+
interface FindByAnalyticsConsistentWith<T> extends FindByAnalyticsWithQuery<T> {
118+
119+
/**
120+
* Allows to override the default scan consistency.
121+
*
122+
* @param scanConsistency the custom scan consistency to use for this analytics query.
123+
*/
124+
FindByAnalyticsWithQuery<T> consistentWith(AnalyticsScanConsistency scanConsistency);
125+
126+
}
127+
128+
interface ExecutableFindByAnalytics<T> extends FindByAnalyticsConsistentWith<T> {}
116129

117130
}

src/main/java/org/springframework/data/couchbase/core/ExecutableFindByAnalyticsOperationSupport.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@
1818
import java.util.List;
1919
import java.util.stream.Stream;
2020

21+
import com.couchbase.client.java.analytics.AnalyticsScanConsistency;
22+
import com.couchbase.client.java.query.QueryScanConsistency;
2123
import org.springframework.data.couchbase.core.query.AnalyticsQuery;
24+
import org.springframework.data.couchbase.core.query.Query;
2225

2326
public class ExecutableFindByAnalyticsOperationSupport implements ExecutableFindByAnalyticsOperation {
2427

@@ -32,21 +35,25 @@ public ExecutableFindByAnalyticsOperationSupport(final CouchbaseTemplate templat
3235

3336
@Override
3437
public <T> ExecutableFindByAnalytics<T> findByAnalytics(final Class<T> domainType) {
35-
return new ExecutableFindByAnalyticsSupport<>(template, domainType, ALL_QUERY);
38+
return new ExecutableFindByAnalyticsSupport<>(template, domainType, ALL_QUERY, AnalyticsScanConsistency.NOT_BOUNDED);
3639
}
3740

3841
static class ExecutableFindByAnalyticsSupport<T> implements ExecutableFindByAnalytics<T> {
3942

4043
private final CouchbaseTemplate template;
4144
private final Class<T> domainType;
4245
private final ReactiveFindByAnalyticsOperationSupport.ReactiveFindByAnalyticsSupport<T> reactiveSupport;
46+
private final AnalyticsQuery query;
47+
private final AnalyticsScanConsistency scanConsistency;
4348

4449
ExecutableFindByAnalyticsSupport(final CouchbaseTemplate template, final Class<T> domainType,
45-
final AnalyticsQuery query) {
50+
final AnalyticsQuery query, final AnalyticsScanConsistency scanConsistency) {
4651
this.template = template;
4752
this.domainType = domainType;
53+
this.query = query;
4854
this.reactiveSupport = new ReactiveFindByAnalyticsOperationSupport.ReactiveFindByAnalyticsSupport<>(
49-
template.reactive(), domainType, query);
55+
template.reactive(), domainType, query, scanConsistency);
56+
this.scanConsistency = scanConsistency;
5057
}
5158

5259
@Override
@@ -66,7 +73,12 @@ public List<T> all() {
6673

6774
@Override
6875
public TerminatingFindByAnalytics<T> matching(final AnalyticsQuery query) {
69-
return new ExecutableFindByAnalyticsSupport<>(template, domainType, query);
76+
return new ExecutableFindByAnalyticsSupport<>(template, domainType, query, scanConsistency);
77+
}
78+
79+
@Override
80+
public FindByAnalyticsWithQuery<T> consistentWith(final AnalyticsScanConsistency scanConsistency) {
81+
return new ExecutableFindByAnalyticsSupport<>(template, domainType, query, scanConsistency);
7082
}
7183

7284
@Override

src/main/java/org/springframework/data/couchbase/core/ExecutableFindByQueryOperationSupport.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ static class ExecutableFindByQuerySupport<T> implements ExecutableFindByQuery<T>
5050
this.template = template;
5151
this.domainType = domainType;
5252
this.query = query;
53-
this.reactiveSupport = new ReactiveFindByQueryOperationSupport.ReactiveFindByQuerySupport<T>(template.reactive(),
53+
this.reactiveSupport = new ReactiveFindByQueryOperationSupport.ReactiveFindByQuerySupport<>(template.reactive(),
5454
domainType, query, scanConsistency);
5555
this.scanConsistency = scanConsistency;
5656
}

src/main/java/org/springframework/data/couchbase/core/ReactiveFindByAnalyticsOperation.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package org.springframework.data.couchbase.core;
1717

18+
import com.couchbase.client.java.analytics.AnalyticsScanConsistency;
1819
import org.springframework.dao.IncorrectResultSizeDataAccessException;
1920
import reactor.core.publisher.Flux;
2021
import reactor.core.publisher.Mono;
@@ -87,6 +88,17 @@ interface FindByAnalyticsWithQuery<T> extends TerminatingFindByAnalytics<T> {
8788

8889
}
8990

90-
interface ReactiveFindByAnalytics<T> extends FindByAnalyticsWithQuery<T> {}
91+
interface FindByAnalyticsConsistentWith<T> extends FindByAnalyticsWithQuery<T> {
92+
93+
/**
94+
* Allows to override the default scan consistency.
95+
*
96+
* @param scanConsistency the custom scan consistency to use for this analytics query.
97+
*/
98+
FindByAnalyticsWithQuery<T> consistentWith(AnalyticsScanConsistency scanConsistency);
99+
100+
}
101+
102+
interface ReactiveFindByAnalytics<T> extends FindByAnalyticsConsistentWith<T> {}
91103

92104
}

src/main/java/org/springframework/data/couchbase/core/ReactiveFindByAnalyticsOperationSupport.java

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
*/
1616
package org.springframework.data.couchbase.core;
1717

18+
import com.couchbase.client.java.analytics.AnalyticsOptions;
19+
import com.couchbase.client.java.analytics.AnalyticsScanConsistency;
20+
import com.couchbase.client.java.query.QueryOptions;
1821
import reactor.core.publisher.Flux;
1922
import reactor.core.publisher.Mono;
2023

@@ -34,25 +37,32 @@ public ReactiveFindByAnalyticsOperationSupport(final ReactiveCouchbaseTemplate t
3437

3538
@Override
3639
public <T> ReactiveFindByAnalytics<T> findByAnalytics(final Class<T> domainType) {
37-
return new ReactiveFindByAnalyticsSupport<>(template, domainType, ALL_QUERY);
40+
return new ReactiveFindByAnalyticsSupport<>(template, domainType, ALL_QUERY, AnalyticsScanConsistency.NOT_BOUNDED);
3841
}
3942

4043
static class ReactiveFindByAnalyticsSupport<T> implements ReactiveFindByAnalytics<T> {
4144

4245
private final ReactiveCouchbaseTemplate template;
4346
private final Class<T> domainType;
4447
private final AnalyticsQuery query;
48+
private final AnalyticsScanConsistency scanConsistency;
4549

4650
ReactiveFindByAnalyticsSupport(final ReactiveCouchbaseTemplate template, final Class<T> domainType,
47-
final AnalyticsQuery query) {
51+
final AnalyticsQuery query, final AnalyticsScanConsistency scanConsistency) {
4852
this.template = template;
4953
this.domainType = domainType;
5054
this.query = query;
55+
this.scanConsistency = scanConsistency;
5156
}
5257

5358
@Override
5459
public TerminatingFindByAnalytics<T> matching(AnalyticsQuery query) {
55-
return new ReactiveFindByAnalyticsSupport<>(template, domainType, query);
60+
return new ReactiveFindByAnalyticsSupport<>(template, domainType, query, scanConsistency);
61+
}
62+
63+
@Override
64+
public FindByAnalyticsWithQuery<T> consistentWith(AnalyticsScanConsistency scanConsistency) {
65+
return new ReactiveFindByAnalyticsSupport<>(template, domainType, query, scanConsistency);
5666
}
5767

5868
@Override
@@ -69,7 +79,7 @@ public Mono<T> first() {
6979
public Flux<T> all() {
7080
return Flux.defer(() -> {
7181
String statement = assembleEntityQuery(false);
72-
return template.getCouchbaseClientFactory().getCluster().reactive().analyticsQuery(statement)
82+
return template.getCouchbaseClientFactory().getCluster().reactive().analyticsQuery(statement, buildAnalyticsOptions())
7383
.onErrorMap(throwable -> {
7484
if (throwable instanceof RuntimeException) {
7585
return template.potentiallyConvertRuntimeException((RuntimeException) throwable);
@@ -90,7 +100,7 @@ public Flux<T> all() {
90100
public Mono<Long> count() {
91101
return Mono.defer(() -> {
92102
String statement = assembleEntityQuery(true);
93-
return template.getCouchbaseClientFactory().getCluster().reactive().analyticsQuery(statement)
103+
return template.getCouchbaseClientFactory().getCluster().reactive().analyticsQuery(statement, buildAnalyticsOptions())
94104
.onErrorMap(throwable -> {
95105
if (throwable instanceof RuntimeException) {
96106
return template.potentiallyConvertRuntimeException((RuntimeException) throwable);
@@ -123,6 +133,14 @@ private String assembleEntityQuery(final boolean count) {
123133
query.appendSkipAndLimit(statement);
124134
return statement.toString();
125135
}
136+
137+
private AnalyticsOptions buildAnalyticsOptions() {
138+
final AnalyticsOptions options = AnalyticsOptions.analyticsOptions();
139+
if (scanConsistency != null) {
140+
options.scanConsistency(scanConsistency);
141+
}
142+
return options;
143+
}
126144
}
127145

128146
}

0 commit comments

Comments
 (0)