Skip to content

Commit 3f6821f

Browse files
committed
Adopt to Reactor 2022.0.0-M4 changes.
Closes #4100
1 parent 1a868ae commit 3f6821f

File tree

3 files changed

+67
-38
lines changed

3 files changed

+67
-38
lines changed

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractReactiveMongoQuery.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ private Publisher<Object> executeDeferred(Object[] parameters) {
123123

124124
ReactiveMongoParameterAccessor parameterAccessor = new ReactiveMongoParameterAccessor(method, parameters);
125125

126-
return execute(parameterAccessor);
126+
return parameterAccessor.resolveParameters().flatMapMany(this::execute);
127127
}
128128

129129
private Publisher<Object> execute(MongoParameterAccessor parameterAccessor) {

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/MongoParametersParameterAccessor.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
*/
3939
public class MongoParametersParameterAccessor extends ParametersParameterAccessor implements MongoParameterAccessor {
4040

41-
private final MongoQueryMethod method;
41+
final MongoQueryMethod method;
4242

4343
/**
4444
* Creates a new {@link MongoParametersParameterAccessor}.

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/ReactiveMongoParameterAccessor.java

+65-36
Original file line numberDiff line numberDiff line change
@@ -20,57 +20,32 @@
2020

2121
import java.util.ArrayList;
2222
import java.util.List;
23+
import java.util.Map;
24+
import java.util.Optional;
25+
import java.util.concurrent.ConcurrentHashMap;
26+
27+
import org.reactivestreams.Publisher;
2328

2429
import org.springframework.data.repository.util.ReactiveWrapperConverters;
2530
import org.springframework.data.repository.util.ReactiveWrappers;
2631

2732
/**
2833
* Reactive {@link org.springframework.data.repository.query.ParametersParameterAccessor} implementation that subscribes
29-
* to reactive parameter wrapper types upon creation. This class performs synchronization when acessing parameters.
34+
* to reactive parameter wrapper types upon creation. This class performs synchronization when accessing parameters.
3035
*
3136
* @author Mark Paluch
3237
* @author Christoph Strobl
3338
* @since 2.0
3439
*/
3540
class ReactiveMongoParameterAccessor extends MongoParametersParameterAccessor {
3641

37-
private final List<Mono<?>> subscriptions;
42+
private final Object[] values;
3843

3944
public ReactiveMongoParameterAccessor(MongoQueryMethod method, Object[] values) {
4045

4146
super(method, values);
47+
this.values = values;
4248

43-
this.subscriptions = new ArrayList<>(values.length);
44-
45-
for (int i = 0; i < values.length; i++) {
46-
47-
Object value = values[i];
48-
49-
if (value == null || !ReactiveWrappers.supports(value.getClass())) {
50-
subscriptions.add(null);
51-
continue;
52-
}
53-
54-
if (ReactiveWrappers.isSingleValueType(value.getClass())) {
55-
subscriptions.add(ReactiveWrapperConverters.toWrapper(value, Mono.class).share());
56-
} else {
57-
subscriptions.add(ReactiveWrapperConverters.toWrapper(value, Flux.class).collectList().share());
58-
}
59-
}
60-
}
61-
62-
/* (non-Javadoc)
63-
* @see org.springframework.data.repository.query.ParametersParameterAccessor#getValue(int)
64-
*/
65-
@SuppressWarnings("unchecked")
66-
@Override
67-
protected <T> T getValue(int index) {
68-
69-
if (subscriptions.get(index) != null) {
70-
return (T) subscriptions.get(index).block();
71-
}
72-
73-
return super.getValue(index);
7449
}
7550

7651
/* (non-Javadoc)
@@ -86,10 +61,64 @@ public Object[] getValues() {
8661
return result;
8762
}
8863

89-
/* (non-Javadoc)
90-
* @see org.springframework.data.repository.query.ParametersParameterAccessor#getBindableValue(int)
91-
*/
9264
public Object getBindableValue(int index) {
9365
return getValue(getParameters().getBindableParameter(index).getIndex());
9466
}
67+
68+
/**
69+
* Resolve parameters that were provided through reactive wrapper types. Flux is collected into a list, values from
70+
* Mono's are used directly.
71+
*
72+
* @return
73+
*/
74+
@SuppressWarnings("unchecked")
75+
public Mono<ReactiveMongoParameterAccessor> resolveParameters() {
76+
77+
boolean hasReactiveWrapper = false;
78+
79+
for (Object value : values) {
80+
if (value == null || !ReactiveWrappers.supports(value.getClass())) {
81+
continue;
82+
}
83+
84+
hasReactiveWrapper = true;
85+
break;
86+
}
87+
88+
if (!hasReactiveWrapper) {
89+
return Mono.just(this);
90+
}
91+
92+
Object[] resolved = new Object[values.length];
93+
Map<Integer, Optional<?>> holder = new ConcurrentHashMap<>();
94+
List<Publisher<?>> publishers = new ArrayList<>();
95+
96+
for (int i = 0; i < values.length; i++) {
97+
98+
Object value = resolved[i] = values[i];
99+
if (value == null || !ReactiveWrappers.supports(value.getClass())) {
100+
continue;
101+
}
102+
103+
if (ReactiveWrappers.isSingleValueType(value.getClass())) {
104+
105+
int index = i;
106+
publishers.add(ReactiveWrapperConverters.toWrapper(value, Mono.class) //
107+
.map(Optional::of) //
108+
.defaultIfEmpty(Optional.empty()) //
109+
.doOnNext(it -> holder.put(index, (Optional<?>) it)));
110+
} else {
111+
112+
int index = i;
113+
publishers.add(ReactiveWrapperConverters.toWrapper(value, Flux.class) //
114+
.collectList() //
115+
.doOnNext(it -> holder.put(index, Optional.of(it))));
116+
}
117+
}
118+
119+
return Flux.merge(publishers).then().thenReturn(resolved).map(values -> {
120+
holder.forEach((index, v) -> values[index] = v.orElse(null));
121+
return new ReactiveMongoParameterAccessor(method, values);
122+
});
123+
}
95124
}

0 commit comments

Comments
 (0)