Skip to content

Commit f416c2c

Browse files
committed
Adopt to Reactor 2022 changes.
Closes #1283
1 parent b5bd1c0 commit f416c2c

File tree

4 files changed

+73
-33
lines changed

4 files changed

+73
-33
lines changed

spring-data-cassandra/src/main/java/org/springframework/data/cassandra/repository/query/AbstractReactiveCassandraQuery.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
*/
1616
package org.springframework.data.cassandra.repository.query;
1717

18-
import reactor.core.publisher.Flux;
1918
import reactor.core.publisher.Mono;
2019

2120
import org.reactivestreams.Publisher;
@@ -72,14 +71,17 @@ public ReactiveCassandraQueryMethod getQueryMethod() {
7271

7372
@Override
7473
public Object execute(Object[] parameters) {
75-
return Flux.defer(() -> executeLater(parameters));
76-
}
77-
78-
private Publisher<Object> executeLater(Object[] parameters) {
7974

8075
ReactiveCassandraParameterAccessor parameterAccessor = new ReactiveCassandraParameterAccessor(getQueryMethod(),
8176
parameters);
8277

78+
Mono<ReactiveCassandraParameterAccessor> resolved = parameterAccessor.resolveParameters();
79+
80+
return resolved.flatMapMany(this::executeLater);
81+
}
82+
83+
private Publisher<Object> executeLater(ReactiveCassandraParameterAccessor parameterAccessor) {
84+
8385
CassandraParameterAccessor convertingParameterAccessor = new ConvertingParameterAccessor(
8486
getRequiredConverter(getReactiveCassandraOperations()), parameterAccessor);
8587

spring-data-cassandra/src/main/java/org/springframework/data/cassandra/repository/query/ReactiveCassandraParameterAccessor.java

+64-25
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,14 @@
1717

1818
import reactor.core.publisher.Flux;
1919
import reactor.core.publisher.Mono;
20-
import reactor.core.publisher.MonoProcessor;
2120

2221
import java.util.ArrayList;
2322
import java.util.List;
23+
import java.util.Map;
24+
import java.util.Optional;
25+
import java.util.concurrent.ConcurrentHashMap;
26+
27+
import org.reactivestreams.Publisher;
2428

2529
import org.springframework.data.repository.util.ReactiveWrapperConverters;
2630
import org.springframework.data.repository.util.ReactiveWrappers;
@@ -35,35 +39,13 @@
3539
class ReactiveCassandraParameterAccessor extends CassandraParametersParameterAccessor {
3640

3741
private final Object[] values;
42+
private final CassandraQueryMethod method;
3843

39-
private final List<MonoProcessor<?>> subscriptions;
40-
41-
@SuppressWarnings("ConstantConditions")
4244
ReactiveCassandraParameterAccessor(CassandraQueryMethod method, Object[] values) {
4345

4446
super(method, values);
45-
47+
this.method = method;
4648
this.values = values;
47-
this.subscriptions = new ArrayList<>(values.length);
48-
49-
for (Object value : values) {
50-
if (value == null || !ReactiveWrappers.supports(value.getClass())) {
51-
subscriptions.add(null);
52-
continue;
53-
}
54-
55-
if (ReactiveWrappers.isSingleValueType(value.getClass())) {
56-
subscriptions.add(ReactiveWrapperConverters.toWrapper(value, Mono.class).toProcessor());
57-
} else {
58-
subscriptions.add(ReactiveWrapperConverters.toWrapper(value, Flux.class).collectList().toProcessor());
59-
}
60-
}
61-
}
62-
63-
@SuppressWarnings({ "unchecked", "ConstantConditions" })
64-
@Override
65-
protected <T> T getValue(int index) {
66-
return (subscriptions.get(index) != null ? (T) subscriptions.get(index).block() : super.getValue(index));
6749
}
6850

6951
@Override
@@ -81,4 +63,61 @@ public Object[] getValues() {
8163
public Object getBindableValue(int index) {
8264
return getValue(getParameters().getBindableParameter(index).getIndex());
8365
}
66+
67+
/**
68+
* Resolve parameters that were provided through reactive wrapper types. Flux is collected into a list, values from
69+
* Mono's are used directly.
70+
*
71+
* @return
72+
*/
73+
@SuppressWarnings("unchecked")
74+
public Mono<ReactiveCassandraParameterAccessor> resolveParameters() {
75+
76+
boolean hasReactiveWrapper = false;
77+
78+
for (Object value : values) {
79+
if (value == null || !ReactiveWrappers.supports(value.getClass())) {
80+
continue;
81+
}
82+
83+
hasReactiveWrapper = true;
84+
break;
85+
}
86+
87+
if (!hasReactiveWrapper) {
88+
return Mono.just(this);
89+
}
90+
91+
Object[] resolved = new Object[values.length];
92+
Map<Integer, Optional<?>> holder = new ConcurrentHashMap<>();
93+
List<Publisher<?>> publishers = new ArrayList<>();
94+
95+
for (int i = 0; i < values.length; i++) {
96+
97+
Object value = resolved[i] = values[i];
98+
if (value == null || !ReactiveWrappers.supports(value.getClass())) {
99+
continue;
100+
}
101+
102+
if (ReactiveWrappers.isSingleValueType(value.getClass())) {
103+
104+
int index = i;
105+
publishers.add(ReactiveWrapperConverters.toWrapper(value, Mono.class) //
106+
.map(Optional::of) //
107+
.defaultIfEmpty(Optional.empty()) //
108+
.doOnNext(it -> holder.put(index, (Optional<?>) it)));
109+
} else {
110+
111+
int index = i;
112+
publishers.add(ReactiveWrapperConverters.toWrapper(value, Flux.class) //
113+
.collectList() //
114+
.doOnNext(it -> holder.put(index, Optional.of(it))));
115+
}
116+
}
117+
118+
return Flux.merge(publishers).then().thenReturn(resolved).map(values -> {
119+
holder.forEach((index, v) -> values[index] = v.orElse(null));
120+
return new ReactiveCassandraParameterAccessor(method, values);
121+
});
122+
}
84123
}

spring-data-cassandra/src/test/java/org/springframework/data/cassandra/core/cql/CassandraExceptionTranslatorUnitTests.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ void shouldTranslateWithCqlMessage() {
241241
InvalidConfigurationInQueryException cx = new InvalidConfigurationInQueryException(node, "err");
242242
DataAccessException dax = sut.translate("Query", "SELECT * FROM person", cx);
243243

244-
assertThat(dax).hasRootCauseInstanceOf(InvalidConfigurationInQueryException.class).hasMessage(
245-
"Query; CQL [SELECT * FROM person]; err; nested exception is com.datastax.oss.driver.api.core.servererrors.InvalidConfigurationInQueryException: err");
244+
assertThat(dax).hasRootCauseInstanceOf(InvalidConfigurationInQueryException.class)
245+
.hasMessageContaining("Query; CQL [SELECT * FROM person]; err");
246246
}
247247
}

spring-data-cassandra/src/test/java/org/springframework/data/cassandra/repository/query/ReactiveCassandraParameterAccessorUnitTests.java

-1
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,6 @@ void shouldReturnNoTypeForComplexTypes() throws Exception {
7373
getCassandraQueryMethod(method), new Object[] { Flux.just(LocalDateTime.of(2000, 10, 11, 12, 13, 14)) });
7474

7575
assertThat(accessor.getDataType(0)).isNull();
76-
7776
}
7877

7978
@Test // DATACASS-335

0 commit comments

Comments
 (0)