Skip to content

Commit 03531a2

Browse files
DATAMONGO-2089 - Refactor and add Kotlin extension.
1 parent d76456f commit 03531a2

File tree

8 files changed

+173
-89
lines changed

8 files changed

+173
-89
lines changed

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperation.java

+25-42
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,7 @@
2222

2323
import org.bson.BsonTimestamp;
2424
import org.bson.BsonValue;
25-
import org.bson.Document;
2625
import org.springframework.data.mongodb.core.ChangeStreamOptions.ChangeStreamOptionsBuilder;
27-
import org.springframework.data.mongodb.core.ReactiveFindOperation.ReactiveFind;
2826
import org.springframework.data.mongodb.core.aggregation.Aggregation;
2927
import org.springframework.data.mongodb.core.query.CriteriaDefinition;
3028

@@ -54,22 +52,13 @@
5452
*/
5553
public interface ReactiveChangeStreamOperation {
5654

57-
/**
58-
* Start creating a change stream operation.watching all collections within the database. <br />
59-
* Consider limiting events be defining a {@link ChangeStreamWithCollection#watchCollection(String) collection} and/or
60-
* {@link ChangeStreamWithFilter#filter(CriteriaDefinition) filter}.
61-
*
62-
* @return new instance of {@link ReactiveFind}. Never {@literal null}.
63-
*/
64-
ReactiveChangeStream<Document> changeStream();
65-
6655
/**
6756
* Start creating a change stream operation for the given {@literal domainType} watching all collections within the
6857
* database. <br />
6958
* Consider limiting events be defining a {@link ChangeStreamWithCollection#watchCollection(String) collection} and/or
70-
* {@link ChangeStreamWithFilter#filter(CriteriaDefinition) filter}.
59+
* {@link ChangeStreamWithFilterAndProjection#filter(CriteriaDefinition) filter}.
7160
*
72-
* @param domainType must not be {@literal null}.
61+
* @param domainType must not be {@literal null}. Use {@link org.bson.Document} to obtain raw elements.
7362
* @return new instance of {@link ReactiveChangeStream}. Never {@literal null}.
7463
* @throws IllegalArgumentException if domainType is {@literal null}.
7564
*/
@@ -102,48 +91,42 @@ interface ChangeStreamWithCollection<T> {
10291
* @return new instance of {@link ChangeStreamWithCollection}.
10392
* @throws IllegalArgumentException if collection is {@literal null}.
10493
*/
105-
ChangeStreamWithProjection<T> watchCollection(String collection);
106-
}
107-
108-
/**
109-
* Result type override (optional).
110-
*/
111-
interface ChangeStreamWithProjection<T> extends ChangeStreamWithFilter<T> {
112-
113-
/**
114-
* Define the target type fields should be mapped to.
115-
*
116-
* @param resultType must not be {@literal null}.
117-
* @param <R> result type.
118-
* @return new instance of {@link ChangeStreamWithProjection}.
119-
* @throws IllegalArgumentException if resultType is {@literal null}.
120-
*/
121-
<R> ChangeStreamWithFilter<R> as(Class<R> resultType);
94+
ChangeStreamWithFilterAndProjection<T> watchCollection(String collection);
12295
}
12396

12497
/**
12598
* Provide a filter for limiting results (optional).
12699
*/
127-
interface ChangeStreamWithFilter<T> extends ResumingChangeStream<T>, TerminatingChangeStream<T> {
100+
interface ChangeStreamWithFilterAndProjection<T> extends ResumingChangeStream<T>, TerminatingChangeStream<T> {
128101

129102
/**
130103
* Use an {@link Aggregation} to filter matching events.
131104
*
132105
* @param by must not be {@literal null}.
133-
* @return new instance of {@link ChangeStreamWithProjection}.
106+
* @return new instance of {@link ChangeStreamWithFilterAndProjection}.
134107
* @throws IllegalArgumentException if the given {@link Aggregation} is {@literal null}.
135108
*/
136-
ChangeStreamWithProjection<T> filter(Aggregation by);
109+
ChangeStreamWithFilterAndProjection<T> filter(Aggregation by);
137110

138111
/**
139112
* Use a {@link CriteriaDefinition critera} to filter matching events via an
140113
* {@link org.springframework.data.mongodb.core.aggregation.MatchOperation}.
141114
*
142115
* @param by must not be {@literal null}.
143-
* @return new instance of {@link ChangeStreamWithProjection}.
116+
* @return new instance of {@link ChangeStreamWithFilterAndProjection}.
144117
* @throws IllegalArgumentException if the given {@link CriteriaDefinition} is {@literal null}.
145118
*/
146-
ChangeStreamWithProjection<T> filter(CriteriaDefinition by);
119+
ChangeStreamWithFilterAndProjection<T> filter(CriteriaDefinition by);
120+
121+
/**
122+
* Define the target type fields should be mapped to.
123+
*
124+
* @param resultType must not be {@literal null}.
125+
* @param <R> result type.
126+
* @return new instance of {@link ChangeStreamWithFilterAndProjection}.
127+
* @throws IllegalArgumentException if resultType is {@literal null}.
128+
*/
129+
<R> ChangeStreamWithFilterAndProjection<R> as(Class<R> resultType);
147130
}
148131

149132
/**
@@ -154,34 +137,34 @@ interface ResumingChangeStream<T> extends TerminatingChangeStream<T> {
154137
/**
155138
* Resume the change stream at a given point.
156139
*
157-
* @param beacon an {@link Instant} or {@link BsonTimestamp}
140+
* @param token an {@link Instant} or {@link BsonTimestamp}
158141
* @return new instance of {@link TerminatingChangeStream}.
159142
* @see ChangeStreamOptionsBuilder#resumeAt(Instant)
160143
* @see ChangeStreamOptionsBuilder#resumeAt(BsonTimestamp)
161144
* @throws IllegalArgumentException if the given beacon is neither {@link Instant} nor {@link BsonTimestamp}.
162145
*/
163-
TerminatingChangeStream<T> resumeAt(Object beacon);
146+
TerminatingChangeStream<T> resumeAt(Object token);
164147

165148
/**
166149
* Resume the change stream after a given point.
167150
*
168-
* @param beacon an {@link Instant} or {@link BsonTimestamp}
151+
* @param token an {@link Instant} or {@link BsonTimestamp}
169152
* @return new instance of {@link TerminatingChangeStream}.
170153
* @see ChangeStreamOptionsBuilder#resumeAfter(BsonValue)
171154
* @see ChangeStreamOptionsBuilder#resumeToken(BsonValue)
172155
* @throws IllegalArgumentException if the given beacon not a {@link BsonValue}.
173156
*/
174-
TerminatingChangeStream<T> resumeAfter(Object beacon);
157+
TerminatingChangeStream<T> resumeAfter(Object token);
175158

176159
/**
177160
* Start the change stream after a given point.
178161
*
179-
* @param beacon an {@link Instant} or {@link BsonTimestamp}
162+
* @param token an {@link Instant} or {@link BsonTimestamp}
180163
* @return new instance of {@link TerminatingChangeStream}.
181164
* @see ChangeStreamOptionsBuilder#startAfter(BsonValue) (BsonValue)
182165
* @throws IllegalArgumentException if the given beacon not a {@link BsonValue}.
183166
*/
184-
TerminatingChangeStream<T> startAfter(Object beacon);
167+
TerminatingChangeStream<T> startAfter(Object token);
185168
}
186169

187170
/**
@@ -204,5 +187,5 @@ interface ChangeStreamWithOptions<T> {
204187
* {@link ReactiveChangeStream} provides methods for constructing change stream operations in a fluent way.
205188
*/
206189
interface ReactiveChangeStream<T> extends ChangeStreamWithOptions<T>, ChangeStreamWithCollection<T>,
207-
TerminatingChangeStream<T>, ResumingChangeStream<T>, ChangeStreamWithFilter<T> {}
190+
TerminatingChangeStream<T>, ResumingChangeStream<T>, ChangeStreamWithFilterAndProjection<T> {}
208191
}

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupport.java

+21-27
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,6 @@ class ReactiveChangeStreamOperationSupport implements ReactiveChangeStreamOperat
4646
this.template = template;
4747
}
4848

49-
/*
50-
* (non-Javadoc)
51-
* @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation#changeStream()
52-
*/
53-
@Override
54-
public ReactiveChangeStream<Document> changeStream() {
55-
return new ReactiveChangeStreamSupport(template, null, Document.class, null, null);
56-
}
57-
5849
/*
5950
* (non-Javadoc)
6051
* @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation#changeStream(java.lang.Class)
@@ -67,7 +58,7 @@ public <T> ReactiveChangeStream<T> changeStream(Class<T> domainType) {
6758
}
6859

6960
static class ReactiveChangeStreamSupport<T>
70-
implements ReactiveChangeStream<T>, ChangeStreamWithProjection<T>, ChangeStreamWithFilter<T> {
61+
implements ReactiveChangeStream<T>, ChangeStreamWithFilterAndProjection<T> {
7162

7263
private final ReactiveMongoTemplate template;
7364
private final @Nullable Class<?> domainType;
@@ -90,7 +81,7 @@ private ReactiveChangeStreamSupport(ReactiveMongoTemplate template, Class<?> dom
9081
* @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ChangeStreamWithCollection#watchCollection(java.lang.String)
9182
*/
9283
@Override
93-
public ChangeStreamWithProjection<T> watchCollection(String collection) {
84+
public ChangeStreamWithFilterAndProjection<T> watchCollection(String collection) {
9485

9586
Assert.hasText(collection, "Collection name must not be null nor empty!");
9687
return new ReactiveChangeStreamSupport<>(template, domainType, returnType, collection, options);
@@ -101,14 +92,14 @@ public ChangeStreamWithProjection<T> watchCollection(String collection) {
10192
* @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ResumingChangeStream#resumeAt(java.lang.Object)
10293
*/
10394
@Override
104-
public TerminatingChangeStream<T> resumeAt(Object beacon) {
95+
public TerminatingChangeStream<T> resumeAt(Object token) {
10596

10697
return withOptions(builder -> {
10798

108-
if (beacon instanceof Instant) {
109-
builder.resumeAt((Instant) beacon);
110-
} else if (beacon instanceof BsonTimestamp) {
111-
builder.resumeAt((BsonTimestamp) beacon);
99+
if (token instanceof Instant) {
100+
builder.resumeAt((Instant) token);
101+
} else if (token instanceof BsonTimestamp) {
102+
builder.resumeAt((BsonTimestamp) token);
112103
}
113104
});
114105
}
@@ -118,21 +109,21 @@ public TerminatingChangeStream<T> resumeAt(Object beacon) {
118109
* @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ResumingChangeStream#resumeAfter(java.lang.Object)
119110
*/
120111
@Override
121-
public TerminatingChangeStream<T> resumeAfter(Object beacon) {
112+
public TerminatingChangeStream<T> resumeAfter(Object token) {
122113

123-
Assert.isInstanceOf(BsonValue.class, beacon, "Beacon must be a BsonValue");
124-
return withOptions(builder -> builder.resumeAfter((BsonValue) beacon));
114+
Assert.isInstanceOf(BsonValue.class, token, "Token must be a BsonValue");
115+
return withOptions(builder -> builder.resumeAfter((BsonValue) token));
125116
}
126117

127118
/*
128119
* (non-Javadoc)
129120
* @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ResumingChangeStream#startAfter(java.lang.Object)
130121
*/
131122
@Override
132-
public TerminatingChangeStream<T> startAfter(Object beacon) {
123+
public TerminatingChangeStream<T> startAfter(Object token) {
133124

134-
Assert.isInstanceOf(BsonValue.class, beacon, "Beacon must be a BsonValue");
135-
return withOptions(builder -> builder.startAfter((BsonValue) beacon));
125+
Assert.isInstanceOf(BsonValue.class, token, "Token must be a BsonValue");
126+
return withOptions(builder -> builder.startAfter((BsonValue) token));
136127
}
137128

138129
/*
@@ -153,7 +144,9 @@ public ReactiveChangeStreamSupport<T> withOptions(Consumer<ChangeStreamOptionsBu
153144
* @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ChangeStreamWithProjection#as(java.lang.Class)
154145
*/
155146
@Override
156-
public <R> ChangeStreamWithFilter<R> as(Class<R> resultType) {
147+
public <R> ChangeStreamWithFilterAndProjection<R> as(Class<R> resultType) {
148+
149+
Assert.notNull(resultType, "ResultType must not be null!");
157150
return new ReactiveChangeStreamSupport<>(template, domainType, resultType, collection, options);
158151
}
159152

@@ -162,19 +155,20 @@ public <R> ChangeStreamWithFilter<R> as(Class<R> resultType) {
162155
* @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ChangeStreamWithFilter#filter(org.springframework.data.mongodb.core.aggregation.Aggregation)
163156
*/
164157
@Override
165-
public ChangeStreamWithProjection<T> filter(Aggregation aggregation) {
166-
return withOptions(builder -> builder.filter(aggregation));
158+
public ChangeStreamWithFilterAndProjection<T> filter(Aggregation filter) {
159+
return withOptions(builder -> builder.filter(filter));
167160
}
168161

169162
/*
170163
* (non-Javadoc)
171164
* @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ChangeStreamWithFilter#filter(org.springframework.data.mongodb.core.query.CriteriaDefinition)
172165
*/
173166
@Override
174-
public ChangeStreamWithProjection<T> filter(CriteriaDefinition by) {
167+
public ChangeStreamWithFilterAndProjection<T> filter(CriteriaDefinition by) {
175168

176169
MatchOperation $match = Aggregation.match(by);
177-
Aggregation aggregation = domainType != null ? Aggregation.newAggregation(domainType, $match)
170+
Aggregation aggregation = domainType != null && !Document.class.equals(domainType)
171+
? Aggregation.newAggregation(domainType, $match)
178172
: Aggregation.newAggregation($match);
179173
return filter(aggregation);
180174
}

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java

-9
Original file line numberDiff line numberDiff line change
@@ -2269,15 +2269,6 @@ public <T> ReactiveMapReduce<T> mapReduce(Class<T> domainType) {
22692269
return new ReactiveMapReduceOperationSupport(this).mapReduce(domainType);
22702270
}
22712271

2272-
/*
2273-
* (non-Javadoc)
2274-
* @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation#changeStream()
2275-
*/
2276-
@Override
2277-
public ReactiveChangeStream<Document> changeStream() {
2278-
return new ReactiveChangeStreamOperationSupport(this).changeStream();
2279-
}
2280-
22812272
/*
22822273
* (non-Javadoc)
22832274
* @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation#changeStream(java.lang.Class)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.mongodb.core
17+
18+
import kotlinx.coroutines.FlowPreview
19+
import kotlinx.coroutines.flow.Flow
20+
import kotlinx.coroutines.reactive.flow.asFlow
21+
22+
23+
/**
24+
* Extension for [RactiveChangeStreamOperation. changeStream] leveraging reified type parameters.
25+
*
26+
* @author Christoph Strobl
27+
* @since 2.2
28+
*/
29+
inline fun <reified T : Any> ReactiveChangeStreamOperation.changeStream(): ReactiveChangeStreamOperation.ReactiveChangeStream<T> =
30+
changeStream(T::class.java)
31+
32+
/**
33+
* Extension for [ReactiveChangeStreamOperation.ChangeStreamWithFilterAndProjection. as] leveraging reified type parameters.
34+
*
35+
* @author Christoph Strobl
36+
* @since 2.2
37+
*/
38+
inline fun <reified T : Any> ReactiveChangeStreamOperation.ChangeStreamWithFilterAndProjection<*>.asType(): ReactiveChangeStreamOperation.ChangeStreamWithFilterAndProjection<T> =
39+
`as`(T::class.java)
40+
41+
/**
42+
* Coroutines [Flow] variant of [ReactiveChangeStreamOperation.TerminatingChangeStream. listen].
43+
*
44+
* Backpressure is controlled by [batchSize] parameter that controls the size of in-flight elements
45+
* and [org.reactivestreams.Subscription.request] size.
46+
*
47+
* @author Christoph Strobl
48+
* @since 2.2
49+
*/
50+
@FlowPreview
51+
fun <T : Any> ReactiveChangeStreamOperation.TerminatingChangeStream<T>.flow(batchSize: Int = 1): Flow<ChangeStreamEvent<T>> =
52+
listen().asFlow(batchSize)
53+

spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupportTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public void changeStreamEventsShouldBeEmittedCorrectly() {
7070

7171
BlockingQueue<ChangeStreamEvent<Document>> documents = new LinkedBlockingQueue<>(100);
7272

73-
Disposable disposable = template.changeStream() //
73+
Disposable disposable = template.changeStream(Document.class) //
7474
.watchCollection("person") //
7575
.listen() //
7676
.doOnNext(documents::add).subscribe();

spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupportUnitTests.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public void setUp() {
5959
@Test // DATAMONGO-2089
6060
public void listenWithoutDomainTypeUsesDocumentAsDefault() {
6161

62-
changeStreamSupport.changeStream().listen().subscribe();
62+
changeStreamSupport.changeStream(Document.class).listen().subscribe();
6363

6464
verify(template).changeStream(isNull(), eq(ChangeStreamOptions.empty()), eq(Document.class));
6565
}
@@ -104,7 +104,7 @@ public void listenWithDomainTypeCreatesTypedAggregation() {
104104
public void listenWithoutDomainTypeCreatesUntypedAggregation() {
105105

106106
Criteria criteria = where("operationType").is("insert");
107-
changeStreamSupport.changeStream().filter(criteria).listen().subscribe();
107+
changeStreamSupport.changeStream(Document.class).filter(criteria).listen().subscribe();
108108

109109
ArgumentCaptor<ChangeStreamOptions> optionsArgumentCaptor = ArgumentCaptor.forClass(ChangeStreamOptions.class);
110110
verify(template).changeStream(isNull(), optionsArgumentCaptor.capture(), eq(Document.class));
@@ -126,7 +126,7 @@ public void optionsShouldBePassedOnCorrectly() {
126126

127127
Document filter = new Document("$match", new Document("operationType", "insert"));
128128

129-
changeStreamSupport.changeStream().withOptions(options -> {
129+
changeStreamSupport.changeStream(Document.class).withOptions(options -> {
130130
options.filter(filter);
131131
}).listen().subscribe();
132132

@@ -144,7 +144,7 @@ public void optionsShouldBeCombinedCorrectly() {
144144
Document filter = new Document("$match", new Document("operationType", "insert"));
145145
Instant resumeTimestamp = Instant.now();
146146

147-
changeStreamSupport.changeStream().withOptions(options -> {
147+
changeStreamSupport.changeStream(Document.class).withOptions(options -> {
148148
options.filter(filter);
149149
}).resumeAt(resumeTimestamp).listen().subscribe();
150150

0 commit comments

Comments
 (0)