diff --git a/pom.xml b/pom.xml
index 340e6922b4..cf7c2a99d7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
org.springframework.data
spring-data-mongodb-parent
- 2.2.0.BUILD-SNAPSHOT
+ 2.2.0.DATAMONGO-2089-SNAPSHOT
pom
Spring Data MongoDB
diff --git a/spring-data-mongodb-benchmarks/pom.xml b/spring-data-mongodb-benchmarks/pom.xml
index bb7d9f03cc..6f8d9a6f8e 100644
--- a/spring-data-mongodb-benchmarks/pom.xml
+++ b/spring-data-mongodb-benchmarks/pom.xml
@@ -7,7 +7,7 @@
org.springframework.data
spring-data-mongodb-parent
- 2.2.0.BUILD-SNAPSHOT
+ 2.2.0.DATAMONGO-2089-SNAPSHOT
../pom.xml
diff --git a/spring-data-mongodb-distribution/pom.xml b/spring-data-mongodb-distribution/pom.xml
index b32dcba387..49891d9610 100644
--- a/spring-data-mongodb-distribution/pom.xml
+++ b/spring-data-mongodb-distribution/pom.xml
@@ -14,7 +14,7 @@
org.springframework.data
spring-data-mongodb-parent
- 2.2.0.BUILD-SNAPSHOT
+ 2.2.0.DATAMONGO-2089-SNAPSHOT
../pom.xml
diff --git a/spring-data-mongodb/pom.xml b/spring-data-mongodb/pom.xml
index b611cf01a8..3fb5cfeb32 100644
--- a/spring-data-mongodb/pom.xml
+++ b/spring-data-mongodb/pom.xml
@@ -11,7 +11,7 @@
org.springframework.data
spring-data-mongodb-parent
- 2.2.0.BUILD-SNAPSHOT
+ 2.2.0.DATAMONGO-2089-SNAPSHOT
../pom.xml
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperation.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperation.java
new file mode 100644
index 0000000000..521cd9569e
--- /dev/null
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperation.java
@@ -0,0 +1,191 @@
+/*
+ * Copyright 2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.data.mongodb.core;
+
+import reactor.core.publisher.Flux;
+
+import java.time.Instant;
+import java.util.function.Consumer;
+
+import org.bson.BsonTimestamp;
+import org.bson.BsonValue;
+import org.springframework.data.mongodb.core.ChangeStreamOptions.ChangeStreamOptionsBuilder;
+import org.springframework.data.mongodb.core.aggregation.Aggregation;
+import org.springframework.data.mongodb.core.query.CriteriaDefinition;
+
+/**
+ * {@link ReactiveChangeStreamOperation} allows creation and execution of reactive MongoDB
+ * Change Stream operations in a fluent API * style.
+ * The starting {@literal domainType} is used for mapping a potentially given
+ * {@link org.springframework.data.mongodb.core.aggregation.TypedAggregation} used for filtering. By default, the
+ * originating {@literal domainType} is also used for mapping back the result from the {@link org.bson.Document}.
+ * However, it is possible to define an different {@literal returnType} via {@code as}.
+ * The collection to operate on is optional in which case call collection with the actual database are watched, use
+ * {@literal watchCollection} to define a fixed collection.
+ *
+ *
+ *
+ * changeStream(Human.class)
+ * .watchCollection("star-wars")
+ * .filter(where("operationType").is("insert"))
+ * .as(Jedi.class)
+ * .resumeAt(Instant.now())
+ * .listen();
+ *
+ *
+ *
+ * @author Christoph Strobl
+ * @since 2.2
+ */
+public interface ReactiveChangeStreamOperation {
+
+ /**
+ * Start creating a change stream operation for the given {@literal domainType} watching all collections within the
+ * database.
+ * Consider limiting events be defining a {@link ChangeStreamWithCollection#watchCollection(String) collection} and/or
+ * {@link ChangeStreamWithFilterAndProjection#filter(CriteriaDefinition) filter}.
+ *
+ * @param domainType must not be {@literal null}. Use {@link org.bson.Document} to obtain raw elements.
+ * @return new instance of {@link ReactiveChangeStream}. Never {@literal null}.
+ * @throws IllegalArgumentException if domainType is {@literal null}.
+ */
+ ReactiveChangeStream changeStream(Class domainType);
+
+ /**
+ * Compose change stream execution by calling one of the terminating methods.
+ */
+ interface TerminatingChangeStream {
+
+ /**
+ * Start listening to changes. The stream will not be completed unless the {@link org.reactivestreams.Subscription}
+ * is {@link org.reactivestreams.Subscription#cancel() canceled}.
+ *
+ * However, the stream may become dead, or invalid, if all watched collections, databases are dropped.
+ */
+ Flux> listen();
+ }
+
+ /**
+ * Collection override (optional).
+ */
+ interface ChangeStreamWithCollection {
+
+ /**
+ * Explicitly set the name of the collection to watch.
+ * Skip this step to watch all collections within the database.
+ *
+ * @param collection must not be {@literal null} nor {@literal empty}.
+ * @return new instance of {@link ChangeStreamWithCollection}.
+ * @throws IllegalArgumentException if collection is {@literal null}.
+ */
+ ChangeStreamWithFilterAndProjection watchCollection(String collection);
+ }
+
+ /**
+ * Provide a filter for limiting results (optional).
+ */
+ interface ChangeStreamWithFilterAndProjection extends ResumingChangeStream, TerminatingChangeStream {
+
+ /**
+ * Use an {@link Aggregation} to filter matching events.
+ *
+ * @param by must not be {@literal null}.
+ * @return new instance of {@link ChangeStreamWithFilterAndProjection}.
+ * @throws IllegalArgumentException if the given {@link Aggregation} is {@literal null}.
+ */
+ ChangeStreamWithFilterAndProjection filter(Aggregation by);
+
+ /**
+ * Use a {@link CriteriaDefinition critera} to filter matching events via an
+ * {@link org.springframework.data.mongodb.core.aggregation.MatchOperation}.
+ *
+ * @param by must not be {@literal null}.
+ * @return new instance of {@link ChangeStreamWithFilterAndProjection}.
+ * @throws IllegalArgumentException if the given {@link CriteriaDefinition} is {@literal null}.
+ */
+ ChangeStreamWithFilterAndProjection filter(CriteriaDefinition by);
+
+ /**
+ * Define the target type fields should be mapped to.
+ *
+ * @param resultType must not be {@literal null}.
+ * @param result type.
+ * @return new instance of {@link ChangeStreamWithFilterAndProjection}.
+ * @throws IllegalArgumentException if resultType is {@literal null}.
+ */
+ ChangeStreamWithFilterAndProjection as(Class resultType);
+ }
+
+ /**
+ * Resume a change stream. (optional).
+ */
+ interface ResumingChangeStream extends TerminatingChangeStream {
+
+ /**
+ * Resume the change stream at a given point.
+ *
+ * @param token an {@link Instant} or {@link BsonTimestamp}
+ * @return new instance of {@link TerminatingChangeStream}.
+ * @see ChangeStreamOptionsBuilder#resumeAt(Instant)
+ * @see ChangeStreamOptionsBuilder#resumeAt(BsonTimestamp)
+ * @throws IllegalArgumentException if the given beacon is neither {@link Instant} nor {@link BsonTimestamp}.
+ */
+ TerminatingChangeStream resumeAt(Object token);
+
+ /**
+ * Resume the change stream after a given point.
+ *
+ * @param token an {@link Instant} or {@link BsonTimestamp}
+ * @return new instance of {@link TerminatingChangeStream}.
+ * @see ChangeStreamOptionsBuilder#resumeAfter(BsonValue)
+ * @see ChangeStreamOptionsBuilder#resumeToken(BsonValue)
+ * @throws IllegalArgumentException if the given beacon not a {@link BsonValue}.
+ */
+ TerminatingChangeStream resumeAfter(Object token);
+
+ /**
+ * Start the change stream after a given point.
+ *
+ * @param token an {@link Instant} or {@link BsonTimestamp}
+ * @return new instance of {@link TerminatingChangeStream}.
+ * @see ChangeStreamOptionsBuilder#startAfter(BsonValue) (BsonValue)
+ * @throws IllegalArgumentException if the given beacon not a {@link BsonValue}.
+ */
+ TerminatingChangeStream startAfter(Object token);
+ }
+
+ /**
+ * Provide some options.
+ */
+ interface ChangeStreamWithOptions {
+
+ /**
+ * Provide some options via the callback by modifying the given {@link ChangeStreamOptionsBuilder}. Previously
+ * defined options like a {@link ResumingChangeStream#resumeAfter(Object) resumeToken} are carried over to the
+ * builder and can be overwritten via eg. {@link ChangeStreamOptionsBuilder#resumeToken(BsonValue)}.
+ *
+ * @param optionsConsumer never {@literal null}.
+ * @return new instance of {@link ReactiveChangeStream}.
+ */
+ ReactiveChangeStream withOptions(Consumer optionsConsumer);
+ }
+
+ /**
+ * {@link ReactiveChangeStream} provides methods for constructing change stream operations in a fluent way.
+ */
+ interface ReactiveChangeStream extends ChangeStreamWithOptions, ChangeStreamWithCollection,
+ TerminatingChangeStream, ResumingChangeStream, ChangeStreamWithFilterAndProjection {}
+}
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupport.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupport.java
new file mode 100644
index 0000000000..1090f8e417
--- /dev/null
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupport.java
@@ -0,0 +1,215 @@
+/*
+ * Copyright 2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.data.mongodb.core;
+
+import reactor.core.publisher.Flux;
+
+import java.time.Instant;
+import java.util.List;
+import java.util.function.Consumer;
+
+import org.bson.BsonTimestamp;
+import org.bson.BsonValue;
+import org.bson.Document;
+import org.springframework.data.mongodb.core.ChangeStreamOptions.ChangeStreamOptionsBuilder;
+import org.springframework.data.mongodb.core.aggregation.Aggregation;
+import org.springframework.data.mongodb.core.aggregation.MatchOperation;
+import org.springframework.data.mongodb.core.query.CriteriaDefinition;
+import org.springframework.lang.Nullable;
+import org.springframework.util.Assert;
+
+/**
+ * @author Christoph Strobl
+ * @since 2.2
+ */
+class ReactiveChangeStreamOperationSupport implements ReactiveChangeStreamOperation {
+
+ private final ReactiveMongoTemplate template;
+
+ /**
+ * @param template must not be {@literal null}.
+ */
+ ReactiveChangeStreamOperationSupport(ReactiveMongoTemplate template) {
+ this.template = template;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation#changeStream(java.lang.Class)
+ */
+ @Override
+ public ReactiveChangeStream changeStream(Class domainType) {
+
+ Assert.notNull(domainType, "DomainType must not be null!");
+ return new ReactiveChangeStreamSupport(template, domainType, domainType, null, null);
+ }
+
+ static class ReactiveChangeStreamSupport
+ implements ReactiveChangeStream, ChangeStreamWithFilterAndProjection {
+
+ private final ReactiveMongoTemplate template;
+ private final @Nullable Class> domainType;
+ private final Class returnType;
+ private final @Nullable String collection;
+ private final @Nullable ChangeStreamOptions options;
+
+ private ReactiveChangeStreamSupport(ReactiveMongoTemplate template, Class> domainType, Class returnType,
+ @Nullable String collection, @Nullable ChangeStreamOptions options) {
+
+ this.template = template;
+ this.domainType = domainType;
+ this.returnType = returnType;
+ this.collection = collection;
+ this.options = options;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ChangeStreamWithCollection#watchCollection(java.lang.String)
+ */
+ @Override
+ public ChangeStreamWithFilterAndProjection watchCollection(String collection) {
+
+ Assert.hasText(collection, "Collection name must not be null nor empty!");
+ return new ReactiveChangeStreamSupport<>(template, domainType, returnType, collection, options);
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ResumingChangeStream#resumeAt(java.lang.Object)
+ */
+ @Override
+ public TerminatingChangeStream resumeAt(Object token) {
+
+ return withOptions(builder -> {
+
+ if (token instanceof Instant) {
+ builder.resumeAt((Instant) token);
+ } else if (token instanceof BsonTimestamp) {
+ builder.resumeAt((BsonTimestamp) token);
+ }
+ });
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ResumingChangeStream#resumeAfter(java.lang.Object)
+ */
+ @Override
+ public TerminatingChangeStream resumeAfter(Object token) {
+
+ Assert.isInstanceOf(BsonValue.class, token, "Token must be a BsonValue");
+ return withOptions(builder -> builder.resumeAfter((BsonValue) token));
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ResumingChangeStream#startAfter(java.lang.Object)
+ */
+ @Override
+ public TerminatingChangeStream startAfter(Object token) {
+
+ Assert.isInstanceOf(BsonValue.class, token, "Token must be a BsonValue");
+ return withOptions(builder -> builder.startAfter((BsonValue) token));
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ChangeStreamWithOptions#withOptions(java.util.function.Consumer)
+ */
+ @Override
+ public ReactiveChangeStreamSupport withOptions(Consumer optionsConsumer) {
+
+ ChangeStreamOptionsBuilder builder = initOptionsBuilder();
+ optionsConsumer.accept(builder);
+
+ return new ReactiveChangeStreamSupport<>(template, domainType, returnType, collection, builder.build());
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ChangeStreamWithProjection#as(java.lang.Class)
+ */
+ @Override
+ public ChangeStreamWithFilterAndProjection as(Class resultType) {
+
+ Assert.notNull(resultType, "ResultType must not be null!");
+ return new ReactiveChangeStreamSupport<>(template, domainType, resultType, collection, options);
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ChangeStreamWithFilter#filter(org.springframework.data.mongodb.core.aggregation.Aggregation)
+ */
+ @Override
+ public ChangeStreamWithFilterAndProjection filter(Aggregation filter) {
+ return withOptions(builder -> builder.filter(filter));
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ChangeStreamWithFilter#filter(org.springframework.data.mongodb.core.query.CriteriaDefinition)
+ */
+ @Override
+ public ChangeStreamWithFilterAndProjection filter(CriteriaDefinition by) {
+
+ MatchOperation $match = Aggregation.match(by);
+ Aggregation aggregation = domainType != null && !Document.class.equals(domainType)
+ ? Aggregation.newAggregation(domainType, $match)
+ : Aggregation.newAggregation($match);
+ return filter(aggregation);
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.TerminatingChangeStream#listen()
+ */
+ @Override
+ public Flux> listen() {
+ return template.changeStream(collection, options != null ? options : ChangeStreamOptions.empty(), returnType);
+ }
+
+ private ChangeStreamOptionsBuilder initOptionsBuilder() {
+
+ ChangeStreamOptionsBuilder builder = ChangeStreamOptions.builder();
+ if (options == null) {
+ return builder;
+ }
+
+ options.getFilter().ifPresent(it -> {
+ if (it instanceof Aggregation) {
+ builder.filter((Aggregation) it);
+ } else {
+ builder.filter(((List) it).toArray(new Document[0]));
+ }
+ });
+ options.getFullDocumentLookup().ifPresent(builder::fullDocumentLookup);
+ options.getCollation().ifPresent(builder::collation);
+
+ if (options.isResumeAfter()) {
+ options.getResumeToken().ifPresent(builder::resumeAfter);
+ options.getResumeBsonTimestamp().ifPresent(builder::resumeAfter);
+ } else if (options.isStartAfter()) {
+ options.getResumeToken().ifPresent(builder::startAfter);
+ } else {
+ options.getResumeTimestamp().ifPresent(builder::resumeAt);
+ options.getResumeBsonTimestamp().ifPresent(builder::resumeAt);
+ }
+ return builder;
+
+ }
+ }
+}
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveFluentMongoOperations.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveFluentMongoOperations.java
index d493cac3b4..efb1d8a35c 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveFluentMongoOperations.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveFluentMongoOperations.java
@@ -23,4 +23,4 @@
* @since 2.0
*/
public interface ReactiveFluentMongoOperations extends ReactiveFindOperation, ReactiveInsertOperation,
- ReactiveUpdateOperation, ReactiveRemoveOperation, ReactiveAggregationOperation, ReactiveMapReduceOperation {}
+ ReactiveUpdateOperation, ReactiveRemoveOperation, ReactiveAggregationOperation, ReactiveMapReduceOperation, ReactiveChangeStreamOperation {}
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java
index ec0d5b663d..6faac53792 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java
@@ -2269,6 +2269,15 @@ public ReactiveMapReduce mapReduce(Class domainType) {
return new ReactiveMapReduceOperationSupport(this).mapReduce(domainType);
}
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation#changeStream(java.lang.Class)
+ */
+ @Override
+ public ReactiveChangeStream changeStream(Class domainType) {
+ return new ReactiveChangeStreamOperationSupport(this).changeStream(domainType);
+ }
+
/**
* Retrieve and remove all documents matching the given {@code query} by calling {@link #find(Query, Class, String)}
* and {@link #remove(Query, Class, String)}, whereas the {@link Query} for {@link #remove(Query, Class, String)} is
diff --git a/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationExtensions.kt b/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationExtensions.kt
new file mode 100644
index 0000000000..bb89842fd4
--- /dev/null
+++ b/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationExtensions.kt
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.data.mongodb.core
+
+import kotlinx.coroutines.FlowPreview
+import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.reactive.flow.asFlow
+
+
+/**
+ * Extension for [RactiveChangeStreamOperation. changeStream] leveraging reified type parameters.
+ *
+ * @author Christoph Strobl
+ * @since 2.2
+ */
+inline fun ReactiveChangeStreamOperation.changeStream(): ReactiveChangeStreamOperation.ReactiveChangeStream =
+ changeStream(T::class.java)
+
+/**
+ * Extension for [ReactiveChangeStreamOperation.ChangeStreamWithFilterAndProjection. as] leveraging reified type parameters.
+ *
+ * @author Christoph Strobl
+ * @since 2.2
+ */
+inline fun ReactiveChangeStreamOperation.ChangeStreamWithFilterAndProjection<*>.asType(): ReactiveChangeStreamOperation.ChangeStreamWithFilterAndProjection =
+ `as`(T::class.java)
+
+/**
+ * Coroutines [Flow] variant of [ReactiveChangeStreamOperation.TerminatingChangeStream. listen].
+ *
+ * Backpressure is controlled by [batchSize] parameter that controls the size of in-flight elements
+ * and [org.reactivestreams.Subscription.request] size.
+ *
+ * @author Christoph Strobl
+ * @since 2.2
+ */
+@FlowPreview
+fun ReactiveChangeStreamOperation.TerminatingChangeStream.flow(batchSize: Int = 1): Flow> =
+ listen().asFlow(batchSize)
+
diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupportTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupportTests.java
new file mode 100644
index 0000000000..abaf1141f5
--- /dev/null
+++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupportTests.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright 2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.data.mongodb.core;
+
+import static org.springframework.data.mongodb.core.query.Criteria.*;
+
+import lombok.SneakyThrows;
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.test.StepVerifier;
+
+import java.time.Duration;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+
+import org.assertj.core.api.Assertions;
+import org.bson.Document;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.springframework.data.mongodb.test.util.MongoTestUtils;
+import org.springframework.data.mongodb.test.util.ReplicaSet;
+
+import com.mongodb.reactivestreams.client.MongoClient;
+
+/**
+ * @author Christoph Strobl
+ * @currentRead Dawn Cook - The Decoy Princess
+ */
+public class ReactiveChangeStreamOperationSupportTests {
+
+ public static @ClassRule ReplicaSet REPL_SET_REQUIRED = ReplicaSet.required();
+ static final String DATABASE_NAME = "rx-change-stream";
+
+ MongoClient client;
+ ReactiveMongoTemplate template;
+
+ @Before
+ public void setUp() {
+
+ client = MongoTestUtils.reactiveReplSetClient();
+ template = new ReactiveMongoTemplate(client, DATABASE_NAME);
+
+ MongoTestUtils.createOrReplaceCollectionNow(DATABASE_NAME, "person", client);
+ }
+
+ @After
+ public void tearDown() {
+ MongoTestUtils.dropCollectionNow(DATABASE_NAME, "person", client);
+ }
+
+ @SneakyThrows
+ @Test // DATAMONGO-2089
+ public void changeStreamEventsShouldBeEmittedCorrectly() {
+
+ BlockingQueue> documents = new LinkedBlockingQueue<>(100);
+
+ Disposable disposable = template.changeStream(Document.class) //
+ .watchCollection("person") //
+ .listen() //
+ .doOnNext(documents::add).subscribe();
+
+ Thread.sleep(500); // just give it some time to link to the collection.
+
+ Person person1 = new Person("Spring", 38);
+ Person person2 = new Person("Data", 39);
+ Person person3 = new Person("MongoDB", 37);
+
+ Flux.merge(template.insert(person1).delayElement(Duration.ofMillis(2)),
+ template.insert(person2).delayElement(Duration.ofMillis(2)),
+ template.insert(person3).delayElement(Duration.ofMillis(2))) //
+ .as(StepVerifier::create) //
+ .expectNextCount(3) //
+ .verifyComplete();
+
+ Thread.sleep(500); // just give it some time to link receive all events
+
+ try {
+ Assertions.assertThat(documents.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList())).hasSize(3)
+ .allMatch(val -> val instanceof Document);
+ } finally {
+ disposable.dispose();
+ }
+ }
+
+ @Test // DATAMONGO-1803
+ public void changeStreamEventsShouldBeConvertedCorrectly() throws InterruptedException {
+
+ BlockingQueue> documents = new LinkedBlockingQueue<>(100);
+
+ Disposable disposable = template.changeStream(Person.class).listen() //
+ .doOnNext(documents::add).subscribe();
+
+ Thread.sleep(500); // just give it some time to link to the collection.
+
+ Person person1 = new Person("Spring", 38);
+ Person person2 = new Person("Data", 39);
+ Person person3 = new Person("MongoDB", 37);
+
+ Flux.merge(template.insert(person1).delayElement(Duration.ofMillis(2)),
+ template.insert(person2).delayElement(Duration.ofMillis(2)),
+ template.insert(person3).delayElement(Duration.ofMillis(2))) //
+ .as(StepVerifier::create) //
+ .expectNextCount(3) //
+ .verifyComplete();
+
+ Thread.sleep(500); // just give it some time to link receive all events
+
+ try {
+ Assertions.assertThat(documents.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList()))
+ .containsExactly(person1, person2, person3);
+ } finally {
+ disposable.dispose();
+ }
+ }
+
+ @Test // DATAMONGO-1803
+ public void changeStreamEventsShouldBeFilteredCorrectly() throws InterruptedException {
+
+ BlockingQueue> documents = new LinkedBlockingQueue<>(100);
+
+ Disposable disposable = template.changeStream(Person.class) //
+ .watchCollection("person") //
+ .filter(where("age").gte(38)) //
+ .listen() //
+ .doOnNext(documents::add).subscribe();
+
+ Thread.sleep(500); // just give it some time to link to the collection.
+
+ Person person1 = new Person("Spring", 38);
+ Person person2 = new Person("Data", 37);
+ Person person3 = new Person("MongoDB", 39);
+
+ Flux.merge(template.save(person1).delayElement(Duration.ofMillis(2)),
+ template.save(person2).delayElement(Duration.ofMillis(2)),
+ template.save(person3).delayElement(Duration.ofMillis(2))) //
+ .as(StepVerifier::create) //
+ .expectNextCount(3) //
+ .verifyComplete();
+
+ Thread.sleep(500); // just give it some time to link receive all events
+
+ try {
+ Assertions.assertThat(documents.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList()))
+ .containsExactly(person1, person3);
+ } finally {
+ disposable.dispose();
+ }
+ }
+}
diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupportUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupportUnitTests.java
new file mode 100644
index 0000000000..d42cb7568f
--- /dev/null
+++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupportUnitTests.java
@@ -0,0 +1,164 @@
+/*
+ * Copyright 2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.data.mongodb.core;
+
+import static org.assertj.core.api.Assertions.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.isNull;
+import static org.springframework.data.mongodb.core.query.Criteria.*;
+
+import reactor.core.publisher.Flux;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.bson.Document;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.springframework.data.mongodb.core.aggregation.Aggregation;
+import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
+import org.springframework.data.mongodb.core.query.Criteria;
+
+/**
+ * @author Christoph Strobl
+ * @currentRead Dawn Cook - The Decoy Princess
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class ReactiveChangeStreamOperationSupportUnitTests {
+
+ @Mock ReactiveMongoTemplate template;
+ ReactiveChangeStreamOperationSupport changeStreamSupport;
+
+ @Before
+ public void setUp() {
+ when(template.changeStream(any(), any(), any())).thenReturn(Flux.empty());
+ changeStreamSupport = new ReactiveChangeStreamOperationSupport(template);
+ }
+
+ @Test // DATAMONGO-2089
+ public void listenWithoutDomainTypeUsesDocumentAsDefault() {
+
+ changeStreamSupport.changeStream(Document.class).listen().subscribe();
+
+ verify(template).changeStream(isNull(), eq(ChangeStreamOptions.empty()), eq(Document.class));
+ }
+
+ @Test // DATAMONGO-2089
+ public void listenWithDomainTypeUsesSourceAsTarget() {
+
+ changeStreamSupport.changeStream(Person.class).listen().subscribe();
+
+ verify(template).changeStream(isNull(), eq(ChangeStreamOptions.empty()), eq(Person.class));
+ }
+
+ @Test // DATAMONGO-2089
+ public void collectionNameIsPassedOnCorrectly() {
+
+ changeStreamSupport.changeStream(Person.class).watchCollection("star-wars").listen().subscribe();
+
+ verify(template).changeStream(eq("star-wars"), eq(ChangeStreamOptions.empty()), eq(Person.class));
+ }
+
+ @Test // DATAMONGO-2089
+ public void listenWithDomainTypeCreatesTypedAggregation() {
+
+ Criteria criteria = where("operationType").is("insert");
+ changeStreamSupport.changeStream(Person.class).filter(criteria).listen().subscribe();
+
+ ArgumentCaptor optionsArgumentCaptor = ArgumentCaptor.forClass(ChangeStreamOptions.class);
+ verify(template).changeStream(isNull(), optionsArgumentCaptor.capture(), eq(Person.class));
+
+ assertThat(optionsArgumentCaptor.getValue().getFilter()).hasValueSatisfying(it -> {
+
+ assertThat(it).isInstanceOf(TypedAggregation.class);
+ TypedAggregation> aggregation = (TypedAggregation>) it;
+
+ assertThat(aggregation.getInputType()).isEqualTo(Person.class);
+ assertThat(extractPipeline(aggregation))
+ .containsExactly(new Document("$match", new Document("operationType", "insert")));
+ });
+ }
+
+ @Test // DATAMONGO-2089
+ public void listenWithoutDomainTypeCreatesUntypedAggregation() {
+
+ Criteria criteria = where("operationType").is("insert");
+ changeStreamSupport.changeStream(Document.class).filter(criteria).listen().subscribe();
+
+ ArgumentCaptor optionsArgumentCaptor = ArgumentCaptor.forClass(ChangeStreamOptions.class);
+ verify(template).changeStream(isNull(), optionsArgumentCaptor.capture(), eq(Document.class));
+
+ assertThat(optionsArgumentCaptor.getValue().getFilter()).hasValueSatisfying(it -> {
+
+ assertThat(it).isInstanceOf(Aggregation.class);
+ assertThat(it).isNotInstanceOf(TypedAggregation.class);
+
+ Aggregation aggregation = (Aggregation) it;
+
+ assertThat(extractPipeline(aggregation))
+ .containsExactly(new Document("$match", new Document("operationType", "insert")));
+ });
+ }
+
+ @Test // DATAMONGO-2089
+ public void optionsShouldBePassedOnCorrectly() {
+
+ Document filter = new Document("$match", new Document("operationType", "insert"));
+
+ changeStreamSupport.changeStream(Document.class).withOptions(options -> {
+ options.filter(filter);
+ }).listen().subscribe();
+
+ ArgumentCaptor optionsArgumentCaptor = ArgumentCaptor.forClass(ChangeStreamOptions.class);
+ verify(template).changeStream(isNull(), optionsArgumentCaptor.capture(), eq(Document.class));
+
+ assertThat(optionsArgumentCaptor.getValue()).satisfies(it -> {
+ assertThat(it.getFilter().get()).isEqualTo(Collections.singletonList(filter));
+ });
+ }
+
+ @Test // DATAMONGO-2089
+ public void optionsShouldBeCombinedCorrectly() {
+
+ Document filter = new Document("$match", new Document("operationType", "insert"));
+ Instant resumeTimestamp = Instant.now();
+
+ changeStreamSupport.changeStream(Document.class).withOptions(options -> {
+ options.filter(filter);
+ }).resumeAt(resumeTimestamp).listen().subscribe();
+
+ ArgumentCaptor optionsArgumentCaptor = ArgumentCaptor.forClass(ChangeStreamOptions.class);
+ verify(template).changeStream(isNull(), optionsArgumentCaptor.capture(), eq(Document.class));
+
+ assertThat(optionsArgumentCaptor.getValue()).satisfies(it -> {
+
+ assertThat(it.getFilter().get()).isEqualTo(Collections.singletonList(filter));
+ assertThat(it.getResumeTimestamp()).contains(resumeTimestamp);
+ });
+ }
+
+ private static List extractPipeline(Aggregation aggregation) {
+ return aggregation.toDocument("person", Aggregation.DEFAULT_CONTEXT).get("pipeline", ArrayList.class);
+ }
+}
diff --git a/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationExtensionsTests.kt b/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationExtensionsTests.kt
new file mode 100644
index 0000000000..f6c67780da
--- /dev/null
+++ b/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationExtensionsTests.kt
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.data.mongodb.core
+
+import example.first.First
+import io.mockk.every
+import io.mockk.mockk
+import io.mockk.verify
+import kotlinx.coroutines.FlowPreview
+import kotlinx.coroutines.flow.toList
+import kotlinx.coroutines.runBlocking
+import org.assertj.core.api.Assertions.assertThat
+import org.bson.Document
+import org.junit.Test
+import reactor.core.publisher.Flux
+
+/**
+ * @author Christoph Strobl
+ * @soundtrack Rage Against The Machine - Take the Power Back
+ */
+class ReactiveChangeStreamOperationExtensionsTests {
+
+ val operation = mockk(relaxed = true)
+
+ @Test // DATAMONGO-2089
+ fun `ReactiveChangeStreamOperation#changeStream() with reified type parameter extension should call its Java counterpart`() {
+
+ operation.changeStream()
+ verify { operation.changeStream(First::class.java) }
+ }
+
+ @Test // DATAMONGO-2089
+ @FlowPreview
+ fun `TerminatingChangeStream#listen() flow extension`() {
+
+ val doc1 = mockk>()
+ val doc2 = mockk>()
+ val doc3 = mockk>()
+
+ val spec = mockk>()
+ every { spec.listen() } returns Flux.just(doc1, doc2, doc3)
+
+ runBlocking {
+ assertThat(spec.flow().toList()).contains(doc1, doc2, doc3)
+ }
+
+ verify {
+ spec.listen()
+ }
+ }
+}
diff --git a/src/main/asciidoc/reference/change-streams.adoc b/src/main/asciidoc/reference/change-streams.adoc
index c18bd0dd38..e794b388d7 100644
--- a/src/main/asciidoc/reference/change-streams.adoc
+++ b/src/main/asciidoc/reference/change-streams.adoc
@@ -51,14 +51,14 @@ Subscribing to Change Streams with the reactive API is a more natural approach t
====
[source,java]
----
-ChangeStreamOptions options = ChangeStreamOptions.builder()
- .filter(newAggregation(User.class, match(where("age").gte(38))) <1>
- .build();
-
-Flux> flux = reactiveTemplate.changeStream("user", options, User.class); <2>
+Flux> flux = reactiveTemplate.changeStream(User.class) <1>
+ .watchCollection("persons")
+ .filter(where("age").gte(38)) <2>
+ .listen(); <3>
----
-<1> Use an aggregation pipeline to filter events.
-<2> Obtain a `Flux` of change stream events. The `ChangeStreamEvent#getBody()` is converted to the requested domain type. Use `Document` to receive raw results without conversion.
+<1> The event target type the underlying document should be converted to. Leave this out to receive raw results without conversion.
+<2> Use an aggregation pipeline or just a query `Criteria` to filter events.
+<3> Obtain a `Flux` of change stream events. The `ChangeStreamEvent#getBody()` is converted to the requested domain type from (2).
====
=== Resuming Change Streams
@@ -72,11 +72,11 @@ The following example shows how to set the resume offset using server time:
====
[source,java]
----
-ChangeStreamOptions = ChangeStreamOptions.builder()
- .resumeAt(Instant.now().minusSeconds(1)) <1>
- .build()
-
-Flux> resumed = template.changeStream("person", options, User.class)
+Flux> resumed = template.changeStream()
+ .watchCollection("persons")
+ .as(User.class)
+ .resumeAt(Instant.now().minusSeconds(1)) <1>
+ .listen();
----
<1> You may obtain the server time of an `ChangeStreamEvent` through the `getTimestamp` method or use the `resumeToken`
exposed through `getResumeToken`.