Skip to content

Commit ada6eb8

Browse files
committed
DATAMONGO-2089 - Polishing.
Add watchCollection(…) accepting an entity class. Use static import for assertions. Tweak javadoc. Original pull request: #751.
1 parent 4a17048 commit ada6eb8

File tree

8 files changed

+69
-39
lines changed

8 files changed

+69
-39
lines changed

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperation.java

+15-6
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828

2929
/**
3030
* {@link ReactiveChangeStreamOperation} allows creation and execution of reactive MongoDB
31-
* <a href="https://docs.mongodb.com/manual/changeStreams/">Change Stream</a> operations in a fluent API * style. <br />
31+
* <a href="https://docs.mongodb.com/manual/changeStreams/">Change Stream</a> operations in a fluent API style. <br />
3232
* The starting {@literal domainType} is used for mapping a potentially given
3333
* {@link org.springframework.data.mongodb.core.aggregation.TypedAggregation} used for filtering. By default, the
3434
* originating {@literal domainType} is also used for mapping back the result from the {@link org.bson.Document}.
@@ -38,15 +38,14 @@
3838
*
3939
* <pre>
4040
* <code>
41-
* changeStream(Human.class)
41+
* changeStream(Jedi.class)
4242
* .watchCollection("star-wars")
4343
* .filter(where("operationType").is("insert"))
44-
* .as(Jedi.class)
4544
* .resumeAt(Instant.now())
4645
* .listen();
4746
* </code>
4847
* </pre>
49-
*
48+
*
5049
* @author Christoph Strobl
5150
* @since 2.2
5251
*/
@@ -88,10 +87,20 @@ interface ChangeStreamWithCollection<T> {
8887
* Skip this step to watch all collections within the database.
8988
*
9089
* @param collection must not be {@literal null} nor {@literal empty}.
91-
* @return new instance of {@link ChangeStreamWithCollection}.
92-
* @throws IllegalArgumentException if collection is {@literal null}.
90+
* @return new instance of {@link ChangeStreamWithFilterAndProjection}.
91+
* @throws IllegalArgumentException if {@code collection} is {@literal null}.
9392
*/
9493
ChangeStreamWithFilterAndProjection<T> watchCollection(String collection);
94+
95+
/**
96+
* Set the the collection to watch. Collection name is derived from the {@link Class entityClass}.<br />
97+
* Skip this step to watch all collections within the database.
98+
*
99+
* @param entityClass must not be {@literal null}.
100+
* @return new instance of {@link ChangeStreamWithFilterAndProjection}.
101+
* @throws IllegalArgumentException if {@code entityClass} is {@literal null}.
102+
*/
103+
ChangeStreamWithFilterAndProjection<T> watchCollection(Class<?> entityClass);
95104
}
96105

97106
/**

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupport.java

+20-5
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,14 @@ class ReactiveChangeStreamOperationSupport implements ReactiveChangeStreamOperat
5454
public <T> ReactiveChangeStream<T> changeStream(Class<T> domainType) {
5555

5656
Assert.notNull(domainType, "DomainType must not be null!");
57-
return new ReactiveChangeStreamSupport(template, domainType, domainType, null, null);
57+
return new ReactiveChangeStreamSupport<>(template, domainType, domainType, null, null);
5858
}
5959

6060
static class ReactiveChangeStreamSupport<T>
6161
implements ReactiveChangeStream<T>, ChangeStreamWithFilterAndProjection<T> {
6262

6363
private final ReactiveMongoTemplate template;
64-
private final @Nullable Class<?> domainType;
64+
private final Class<?> domainType;
6565
private final Class<T> returnType;
6666
private final @Nullable String collection;
6767
private final @Nullable ChangeStreamOptions options;
@@ -84,9 +84,22 @@ private ReactiveChangeStreamSupport(ReactiveMongoTemplate template, Class<?> dom
8484
public ChangeStreamWithFilterAndProjection<T> watchCollection(String collection) {
8585

8686
Assert.hasText(collection, "Collection name must not be null nor empty!");
87+
8788
return new ReactiveChangeStreamSupport<>(template, domainType, returnType, collection, options);
8889
}
8990

91+
/*
92+
* (non-Javadoc)
93+
* @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ChangeStreamWithCollection#watchCollection(java.lang.Class)
94+
*/
95+
@Override
96+
public ChangeStreamWithFilterAndProjection<T> watchCollection(Class<?> entityClass) {
97+
98+
Assert.notNull(entityClass, "Collection type not be null!");
99+
100+
return watchCollection(template.getCollectionName(entityClass));
101+
}
102+
90103
/*
91104
* (non-Javadoc)
92105
* @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ResumingChangeStream#resumeAt(java.lang.Object)
@@ -112,6 +125,7 @@ public TerminatingChangeStream<T> resumeAt(Object token) {
112125
public TerminatingChangeStream<T> resumeAfter(Object token) {
113126

114127
Assert.isInstanceOf(BsonValue.class, token, "Token must be a BsonValue");
128+
115129
return withOptions(builder -> builder.resumeAfter((BsonValue) token));
116130
}
117131

@@ -123,6 +137,7 @@ public TerminatingChangeStream<T> resumeAfter(Object token) {
123137
public TerminatingChangeStream<T> startAfter(Object token) {
124138

125139
Assert.isInstanceOf(BsonValue.class, token, "Token must be a BsonValue");
140+
126141
return withOptions(builder -> builder.startAfter((BsonValue) token));
127142
}
128143

@@ -147,6 +162,7 @@ public ReactiveChangeStreamSupport<T> withOptions(Consumer<ChangeStreamOptionsBu
147162
public <R> ChangeStreamWithFilterAndProjection<R> as(Class<R> resultType) {
148163

149164
Assert.notNull(resultType, "ResultType must not be null!");
165+
150166
return new ReactiveChangeStreamSupport<>(template, domainType, resultType, collection, options);
151167
}
152168

@@ -167,8 +183,7 @@ public ChangeStreamWithFilterAndProjection<T> filter(Aggregation filter) {
167183
public ChangeStreamWithFilterAndProjection<T> filter(CriteriaDefinition by) {
168184

169185
MatchOperation $match = Aggregation.match(by);
170-
Aggregation aggregation = domainType != null && !Document.class.equals(domainType)
171-
? Aggregation.newAggregation(domainType, $match)
186+
Aggregation aggregation = !Document.class.equals(domainType) ? Aggregation.newAggregation(domainType, $match)
172187
: Aggregation.newAggregation($match);
173188
return filter(aggregation);
174189
}
@@ -208,8 +223,8 @@ private ChangeStreamOptionsBuilder initOptionsBuilder() {
208223
options.getResumeTimestamp().ifPresent(builder::resumeAt);
209224
options.getResumeBsonTimestamp().ifPresent(builder::resumeAt);
210225
}
211-
return builder;
212226

227+
return builder;
213228
}
214229
}
215230
}

spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationExtensions.kt

+3-4
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,8 @@ import kotlinx.coroutines.FlowPreview
1919
import kotlinx.coroutines.flow.Flow
2020
import kotlinx.coroutines.reactive.flow.asFlow
2121

22-
2322
/**
24-
* Extension for [RactiveChangeStreamOperation. changeStream] leveraging reified type parameters.
23+
* Extension for [RactiveChangeStreamOperation.changeStream] leveraging reified type parameters.
2524
*
2625
* @author Christoph Strobl
2726
* @since 2.2
@@ -30,7 +29,7 @@ inline fun <reified T : Any> ReactiveChangeStreamOperation.changeStream(): React
3029
changeStream(T::class.java)
3130

3231
/**
33-
* Extension for [ReactiveChangeStreamOperation.ChangeStreamWithFilterAndProjection. as] leveraging reified type parameters.
32+
* Extension for [ReactiveChangeStreamOperation.ChangeStreamWithFilterAndProjection.as] leveraging reified type parameters.
3433
*
3534
* @author Christoph Strobl
3635
* @since 2.2
@@ -39,7 +38,7 @@ inline fun <reified T : Any> ReactiveChangeStreamOperation.ChangeStreamWithFilte
3938
`as`(T::class.java)
4039

4140
/**
42-
* Coroutines [Flow] variant of [ReactiveChangeStreamOperation.TerminatingChangeStream. listen].
41+
* Coroutines [Flow] variant of [ReactiveChangeStreamOperation.TerminatingChangeStream.listen].
4342
*
4443
* Backpressure is controlled by [batchSize] parameter that controls the size of in-flight elements
4544
* and [org.reactivestreams.Subscription.request] size.

spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupportTests.java

+12-10
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package org.springframework.data.mongodb.core;
1717

18+
import static org.assertj.core.api.Assertions.*;
1819
import static org.springframework.data.mongodb.core.query.Criteria.*;
1920

2021
import lombok.SneakyThrows;
@@ -27,18 +28,20 @@
2728
import java.util.concurrent.LinkedBlockingQueue;
2829
import java.util.stream.Collectors;
2930

30-
import org.assertj.core.api.Assertions;
3131
import org.bson.Document;
3232
import org.junit.After;
3333
import org.junit.Before;
3434
import org.junit.ClassRule;
3535
import org.junit.Test;
36+
3637
import org.springframework.data.mongodb.test.util.MongoTestUtils;
3738
import org.springframework.data.mongodb.test.util.ReplicaSet;
3839

3940
import com.mongodb.reactivestreams.client.MongoClient;
4041

4142
/**
43+
* Tests for {@link ReactiveChangeStreamOperation}.
44+
*
4245
* @author Christoph Strobl
4346
* @currentRead Dawn Cook - The Decoy Princess
4447
*/
@@ -91,8 +94,8 @@ public void changeStreamEventsShouldBeEmittedCorrectly() {
9194
Thread.sleep(500); // just give it some time to link receive all events
9295

9396
try {
94-
Assertions.assertThat(documents.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList())).hasSize(3)
95-
.allMatch(val -> val instanceof Document);
97+
assertThat(documents.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList())).hasSize(3)
98+
.allMatch(Document.class::isInstance);
9699
} finally {
97100
disposable.dispose();
98101
}
@@ -122,7 +125,7 @@ public void changeStreamEventsShouldBeConvertedCorrectly() throws InterruptedExc
122125
Thread.sleep(500); // just give it some time to link receive all events
123126

124127
try {
125-
Assertions.assertThat(documents.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList()))
128+
assertThat(documents.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList()))
126129
.containsExactly(person1, person2, person3);
127130
} finally {
128131
disposable.dispose();
@@ -135,7 +138,7 @@ public void changeStreamEventsShouldBeFilteredCorrectly() throws InterruptedExce
135138
BlockingQueue<ChangeStreamEvent<Person>> documents = new LinkedBlockingQueue<>(100);
136139

137140
Disposable disposable = template.changeStream(Person.class) //
138-
.watchCollection("person") //
141+
.watchCollection(Person.class) //
139142
.filter(where("age").gte(38)) //
140143
.listen() //
141144
.doOnNext(documents::add).subscribe();
@@ -146,18 +149,17 @@ public void changeStreamEventsShouldBeFilteredCorrectly() throws InterruptedExce
146149
Person person2 = new Person("Data", 37);
147150
Person person3 = new Person("MongoDB", 39);
148151

149-
Flux.merge(template.save(person1).delayElement(Duration.ofMillis(2)),
150-
template.save(person2).delayElement(Duration.ofMillis(2)),
151-
template.save(person3).delayElement(Duration.ofMillis(2))) //
152+
Flux.merge(template.save(person1), template.save(person2).delayElement(Duration.ofMillis(50)),
153+
template.save(person3).delayElement(Duration.ofMillis(100))) //
152154
.as(StepVerifier::create) //
153155
.expectNextCount(3) //
154156
.verifyComplete();
155157

156158
Thread.sleep(500); // just give it some time to link receive all events
157159

158160
try {
159-
Assertions.assertThat(documents.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList()))
160-
.containsExactly(person1, person3);
161+
assertThat(documents.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList())).containsOnly(person1,
162+
person3);
161163
} finally {
162164
disposable.dispose();
163165
}

spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupportUnitTests.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
package org.springframework.data.mongodb.core;
1717

1818
import static org.assertj.core.api.Assertions.*;
19-
import static org.mockito.ArgumentMatchers.any;
19+
import static org.mockito.ArgumentMatchers.*;
2020
import static org.mockito.Mockito.*;
2121
import static org.mockito.Mockito.eq;
2222
import static org.mockito.Mockito.isNull;
@@ -36,11 +36,14 @@
3636
import org.mockito.ArgumentCaptor;
3737
import org.mockito.Mock;
3838
import org.mockito.junit.MockitoJUnitRunner;
39+
3940
import org.springframework.data.mongodb.core.aggregation.Aggregation;
4041
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
4142
import org.springframework.data.mongodb.core.query.Criteria;
4243

4344
/**
45+
* Unit tests for {@link ReactiveChangeStreamOperationSupport}.
46+
*
4447
* @author Christoph Strobl
4548
* @currentRead Dawn Cook - The Decoy Princess
4649
*/

spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateTests.java

+9-9
Original file line numberDiff line numberDiff line change
@@ -1390,7 +1390,7 @@ public void changeStreamEventsShouldBeEmittedCorrectly() throws InterruptedExcep
13901390
Thread.sleep(500); // just give it some time to link receive all events
13911391

13921392
try {
1393-
Assertions.assertThat(documents.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList())).hasSize(3)
1393+
assertThat(documents.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList())).hasSize(3)
13941394
.allMatch(val -> val instanceof Document);
13951395
} finally {
13961396
disposable.dispose();
@@ -1422,7 +1422,7 @@ public void changeStreamEventsShouldBeConvertedCorrectly() throws InterruptedExc
14221422
Thread.sleep(500); // just give it some time to link receive all events
14231423

14241424
try {
1425-
Assertions.assertThat(documents.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList()))
1425+
assertThat(documents.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList()))
14261426
.containsExactly(person1, person2, person3);
14271427
} finally {
14281428
disposable.dispose();
@@ -1455,7 +1455,7 @@ public void changeStreamEventsShouldBeFilteredCorrectly() throws InterruptedExce
14551455
Thread.sleep(500); // just give it some time to link receive all events
14561456

14571457
try {
1458-
Assertions.assertThat(documents.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList()))
1458+
assertThat(documents.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList()))
14591459
.containsExactly(person1, person3);
14601460
} finally {
14611461
disposable.dispose();
@@ -1499,7 +1499,7 @@ public void mapsReservedWordsCorrectly() throws InterruptedException {
14991499
Thread.sleep(500); // just give it some time to link receive all events
15001500

15011501
try {
1502-
Assertions.assertThat(documents.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList()))
1502+
assertThat(documents.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList()))
15031503
.containsExactly(replacement);
15041504
} finally {
15051505
disposable.dispose();
@@ -1541,7 +1541,7 @@ public void changeStreamEventsShouldBeResumedCorrectly() throws InterruptedExcep
15411541
Thread.sleep(500); // just give it some time to link receive all events
15421542

15431543
try {
1544-
Assertions.assertThat(resumeDocuments.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList()))
1544+
assertThat(resumeDocuments.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList()))
15451545
.containsExactly(person2, person3);
15461546
} finally {
15471547
disposable.dispose();
@@ -1563,7 +1563,7 @@ public void removeShouldConsiderLimit() {
15631563

15641564
template.remove(query(where("field").is("lannister")).limit(25), Sample.class) //
15651565
.as(StepVerifier::create) //
1566-
.assertNext(wr -> Assertions.assertThat(wr.getDeletedCount()).isEqualTo(25L)).verifyComplete();
1566+
.assertNext(wr -> assertThat(wr.getDeletedCount()).isEqualTo(25L)).verifyComplete();
15671567
}
15681568

15691569
@Test // DATAMONGO-1870
@@ -1577,7 +1577,7 @@ public void removeShouldConsiderSkipAndSort() {
15771577

15781578
template.remove(new Query().skip(25).with(Sort.by("field")), Sample.class) //
15791579
.as(StepVerifier::create) //
1580-
.assertNext(wr -> Assertions.assertThat(wr.getDeletedCount()).isEqualTo(75L)).verifyComplete();
1580+
.assertNext(wr -> assertThat(wr.getDeletedCount()).isEqualTo(75L)).verifyComplete();
15811581

15821582
template.count(query(where("field").is("lannister")), Sample.class).as(StepVerifier::create).expectNext(25L)
15831583
.verifyComplete();
@@ -1651,7 +1651,7 @@ public void watchesDatabaseCorrectly() throws InterruptedException {
16511651
Thread.sleep(500); // just give it some time to link receive all events
16521652

16531653
try {
1654-
Assertions.assertThat(documents.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList()))
1654+
assertThat(documents.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList()))
16551655
.containsExactly(person1, person2, person3);
16561656
} finally {
16571657
disposable.dispose();
@@ -1701,7 +1701,7 @@ public void resumesAtTimestampCorrectly() throws InterruptedException {
17011701
Thread.sleep(500); // just give it some time to link receive all events
17021702

17031703
try {
1704-
Assertions.assertThat(resumeDocuments.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList()))
1704+
assertThat(resumeDocuments.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList()))
17051705
.containsExactly(person2, person3);
17061706
} finally {
17071707
disposable.dispose();

spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationExtensionsTests.kt

+3
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import reactor.core.publisher.Flux
3434
class ReactiveChangeStreamOperationExtensionsTests {
3535

3636
val operation = mockk<ReactiveChangeStreamOperation>(relaxed = true)
37+
val changestream = mockk<ReactiveChangeStreamOperation.ReactiveChangeStream<First>>(relaxed = true)
3738

3839
@Test // DATAMONGO-2089
3940
fun `ReactiveChangeStreamOperation#changeStream() with reified type parameter extension should call its Java counterpart`() {
@@ -61,4 +62,6 @@ class ReactiveChangeStreamOperationExtensionsTests {
6162
spec.listen()
6263
}
6364
}
65+
66+
data class Last(val id: String)
6467
}

src/main/asciidoc/reference/change-streams.adoc

+3-4
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ Subscribing to Change Streams with the reactive API is a more natural approach t
5252
[source,java]
5353
----
5454
Flux<ChangeStreamEvent<User>> flux = reactiveTemplate.changeStream(User.class) <1>
55-
.watchCollection("persons")
55+
.watchCollection("people")
5656
.filter(where("age").gte(38)) <2>
5757
.listen(); <3>
5858
----
@@ -72,9 +72,8 @@ The following example shows how to set the resume offset using server time:
7272
====
7373
[source,java]
7474
----
75-
Flux<ChangeStreamEvent<Person>> resumed = template.changeStream()
76-
.watchCollection("persons")
77-
.as(User.class)
75+
Flux<ChangeStreamEvent<User>> resumed = template.changeStream(User.class)
76+
.watchCollection("people")
7877
.resumeAt(Instant.now().minusSeconds(1)) <1>
7978
.listen();
8079
----

0 commit comments

Comments
 (0)