From 06422fc1137f41a34da33008b6c97ffc684ce31e Mon Sep 17 00:00:00 2001 From: Christoph Strobl Date: Wed, 15 May 2019 15:16:35 +0200 Subject: [PATCH 1/3] DATAMONGO-2089 - Prepare issue branch. --- pom.xml | 2 +- spring-data-mongodb-benchmarks/pom.xml | 2 +- spring-data-mongodb-distribution/pom.xml | 2 +- spring-data-mongodb/pom.xml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pom.xml b/pom.xml index 340e6922b4..cf7c2a99d7 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.springframework.data spring-data-mongodb-parent - 2.2.0.BUILD-SNAPSHOT + 2.2.0.DATAMONGO-2089-SNAPSHOT pom Spring Data MongoDB diff --git a/spring-data-mongodb-benchmarks/pom.xml b/spring-data-mongodb-benchmarks/pom.xml index bb7d9f03cc..6f8d9a6f8e 100644 --- a/spring-data-mongodb-benchmarks/pom.xml +++ b/spring-data-mongodb-benchmarks/pom.xml @@ -7,7 +7,7 @@ org.springframework.data spring-data-mongodb-parent - 2.2.0.BUILD-SNAPSHOT + 2.2.0.DATAMONGO-2089-SNAPSHOT ../pom.xml diff --git a/spring-data-mongodb-distribution/pom.xml b/spring-data-mongodb-distribution/pom.xml index b32dcba387..49891d9610 100644 --- a/spring-data-mongodb-distribution/pom.xml +++ b/spring-data-mongodb-distribution/pom.xml @@ -14,7 +14,7 @@ org.springframework.data spring-data-mongodb-parent - 2.2.0.BUILD-SNAPSHOT + 2.2.0.DATAMONGO-2089-SNAPSHOT ../pom.xml diff --git a/spring-data-mongodb/pom.xml b/spring-data-mongodb/pom.xml index b611cf01a8..3fb5cfeb32 100644 --- a/spring-data-mongodb/pom.xml +++ b/spring-data-mongodb/pom.xml @@ -11,7 +11,7 @@ org.springframework.data spring-data-mongodb-parent - 2.2.0.BUILD-SNAPSHOT + 2.2.0.DATAMONGO-2089-SNAPSHOT ../pom.xml From d76456fb1de39378e79e08a66f62177b645540e7 Mon Sep 17 00:00:00 2001 From: Christoph Strobl Date: Thu, 16 May 2019 15:21:01 +0200 Subject: [PATCH 2/3] DATAMONGO-2089 - Add fluent change stream API to ReactiveMongoTemplate. We now offer a fluent API for more intuitive change stream interaction. Flux> flux = reactiveTemplate.changeStream() .watchCollection("persons") .filter(where("age").gte(38)) .as(User.class) .listen(); --- .../core/ReactiveChangeStreamOperation.java | 208 +++++++++++++++++ .../ReactiveChangeStreamOperationSupport.java | 221 ++++++++++++++++++ .../core/ReactiveFluentMongoOperations.java | 2 +- .../mongodb/core/ReactiveMongoTemplate.java | 18 ++ ...tiveChangeStreamOperationSupportTests.java | 165 +++++++++++++ ...ChangeStreamOperationSupportUnitTests.java | 164 +++++++++++++ .../asciidoc/reference/change-streams.adoc | 25 +- 7 files changed, 790 insertions(+), 13 deletions(-) create mode 100644 spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperation.java create mode 100644 spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupport.java create mode 100644 spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupportTests.java create mode 100644 spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupportUnitTests.java diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperation.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperation.java new file mode 100644 index 0000000000..a84fd59d9c --- /dev/null +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperation.java @@ -0,0 +1,208 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.core; + +import reactor.core.publisher.Flux; + +import java.time.Instant; +import java.util.function.Consumer; + +import org.bson.BsonTimestamp; +import org.bson.BsonValue; +import org.bson.Document; +import org.springframework.data.mongodb.core.ChangeStreamOptions.ChangeStreamOptionsBuilder; +import org.springframework.data.mongodb.core.ReactiveFindOperation.ReactiveFind; +import org.springframework.data.mongodb.core.aggregation.Aggregation; +import org.springframework.data.mongodb.core.query.CriteriaDefinition; + +/** + * {@link ReactiveChangeStreamOperation} allows creation and execution of reactive MongoDB + * Change Stream operations in a fluent API * style.
+ * The starting {@literal domainType} is used for mapping a potentially given + * {@link org.springframework.data.mongodb.core.aggregation.TypedAggregation} used for filtering. By default, the + * originating {@literal domainType} is also used for mapping back the result from the {@link org.bson.Document}. + * However, it is possible to define an different {@literal returnType} via {@code as}.
+ * The collection to operate on is optional in which case call collection with the actual database are watched, use + * {@literal watchCollection} to define a fixed collection. + * + *
+ *     
+ *         changeStream(Human.class)
+ *             .watchCollection("star-wars")
+ *             .filter(where("operationType").is("insert"))
+ *             .as(Jedi.class)
+ *             .resumeAt(Instant.now())
+ *             .listen();
+ *     
+ * 
+ * + * @author Christoph Strobl + * @since 2.2 + */ +public interface ReactiveChangeStreamOperation { + + /** + * Start creating a change stream operation.watching all collections within the database.
+ * Consider limiting events be defining a {@link ChangeStreamWithCollection#watchCollection(String) collection} and/or + * {@link ChangeStreamWithFilter#filter(CriteriaDefinition) filter}. + * + * @return new instance of {@link ReactiveFind}. Never {@literal null}. + */ + ReactiveChangeStream changeStream(); + + /** + * Start creating a change stream operation for the given {@literal domainType} watching all collections within the + * database.
+ * Consider limiting events be defining a {@link ChangeStreamWithCollection#watchCollection(String) collection} and/or + * {@link ChangeStreamWithFilter#filter(CriteriaDefinition) filter}. + * + * @param domainType must not be {@literal null}. + * @return new instance of {@link ReactiveChangeStream}. Never {@literal null}. + * @throws IllegalArgumentException if domainType is {@literal null}. + */ + ReactiveChangeStream changeStream(Class domainType); + + /** + * Compose change stream execution by calling one of the terminating methods. + */ + interface TerminatingChangeStream { + + /** + * Start listening to changes. The stream will not be completed unless the {@link org.reactivestreams.Subscription} + * is {@link org.reactivestreams.Subscription#cancel() canceled}. + *

+ * However, the stream may become dead, or invalid, if all watched collections, databases are dropped. + */ + Flux> listen(); + } + + /** + * Collection override (optional). + */ + interface ChangeStreamWithCollection { + + /** + * Explicitly set the name of the collection to watch.
+ * Skip this step to watch all collections within the database. + * + * @param collection must not be {@literal null} nor {@literal empty}. + * @return new instance of {@link ChangeStreamWithCollection}. + * @throws IllegalArgumentException if collection is {@literal null}. + */ + ChangeStreamWithProjection watchCollection(String collection); + } + + /** + * Result type override (optional). + */ + interface ChangeStreamWithProjection extends ChangeStreamWithFilter { + + /** + * Define the target type fields should be mapped to. + * + * @param resultType must not be {@literal null}. + * @param result type. + * @return new instance of {@link ChangeStreamWithProjection}. + * @throws IllegalArgumentException if resultType is {@literal null}. + */ + ChangeStreamWithFilter as(Class resultType); + } + + /** + * Provide a filter for limiting results (optional). + */ + interface ChangeStreamWithFilter extends ResumingChangeStream, TerminatingChangeStream { + + /** + * Use an {@link Aggregation} to filter matching events. + * + * @param by must not be {@literal null}. + * @return new instance of {@link ChangeStreamWithProjection}. + * @throws IllegalArgumentException if the given {@link Aggregation} is {@literal null}. + */ + ChangeStreamWithProjection filter(Aggregation by); + + /** + * Use a {@link CriteriaDefinition critera} to filter matching events via an + * {@link org.springframework.data.mongodb.core.aggregation.MatchOperation}. + * + * @param by must not be {@literal null}. + * @return new instance of {@link ChangeStreamWithProjection}. + * @throws IllegalArgumentException if the given {@link CriteriaDefinition} is {@literal null}. + */ + ChangeStreamWithProjection filter(CriteriaDefinition by); + } + + /** + * Resume a change stream. (optional). + */ + interface ResumingChangeStream extends TerminatingChangeStream { + + /** + * Resume the change stream at a given point. + * + * @param beacon an {@link Instant} or {@link BsonTimestamp} + * @return new instance of {@link TerminatingChangeStream}. + * @see ChangeStreamOptionsBuilder#resumeAt(Instant) + * @see ChangeStreamOptionsBuilder#resumeAt(BsonTimestamp) + * @throws IllegalArgumentException if the given beacon is neither {@link Instant} nor {@link BsonTimestamp}. + */ + TerminatingChangeStream resumeAt(Object beacon); + + /** + * Resume the change stream after a given point. + * + * @param beacon an {@link Instant} or {@link BsonTimestamp} + * @return new instance of {@link TerminatingChangeStream}. + * @see ChangeStreamOptionsBuilder#resumeAfter(BsonValue) + * @see ChangeStreamOptionsBuilder#resumeToken(BsonValue) + * @throws IllegalArgumentException if the given beacon not a {@link BsonValue}. + */ + TerminatingChangeStream resumeAfter(Object beacon); + + /** + * Start the change stream after a given point. + * + * @param beacon an {@link Instant} or {@link BsonTimestamp} + * @return new instance of {@link TerminatingChangeStream}. + * @see ChangeStreamOptionsBuilder#startAfter(BsonValue) (BsonValue) + * @throws IllegalArgumentException if the given beacon not a {@link BsonValue}. + */ + TerminatingChangeStream startAfter(Object beacon); + } + + /** + * Provide some options. + */ + interface ChangeStreamWithOptions { + + /** + * Provide some options via the callback by modifying the given {@link ChangeStreamOptionsBuilder}. Previously + * defined options like a {@link ResumingChangeStream#resumeAfter(Object) resumeToken} are carried over to the + * builder and can be overwritten via eg. {@link ChangeStreamOptionsBuilder#resumeToken(BsonValue)}. + * + * @param optionsConsumer never {@literal null}. + * @return new instance of {@link ReactiveChangeStream}. + */ + ReactiveChangeStream withOptions(Consumer optionsConsumer); + } + + /** + * {@link ReactiveChangeStream} provides methods for constructing change stream operations in a fluent way. + */ + interface ReactiveChangeStream extends ChangeStreamWithOptions, ChangeStreamWithCollection, + TerminatingChangeStream, ResumingChangeStream, ChangeStreamWithFilter {} +} diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupport.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupport.java new file mode 100644 index 0000000000..7e898e445c --- /dev/null +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupport.java @@ -0,0 +1,221 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.core; + +import reactor.core.publisher.Flux; + +import java.time.Instant; +import java.util.List; +import java.util.function.Consumer; + +import org.bson.BsonTimestamp; +import org.bson.BsonValue; +import org.bson.Document; +import org.springframework.data.mongodb.core.ChangeStreamOptions.ChangeStreamOptionsBuilder; +import org.springframework.data.mongodb.core.aggregation.Aggregation; +import org.springframework.data.mongodb.core.aggregation.MatchOperation; +import org.springframework.data.mongodb.core.query.CriteriaDefinition; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; + +/** + * @author Christoph Strobl + * @since 2.2 + */ +class ReactiveChangeStreamOperationSupport implements ReactiveChangeStreamOperation { + + private final ReactiveMongoTemplate template; + + /** + * @param template must not be {@literal null}. + */ + ReactiveChangeStreamOperationSupport(ReactiveMongoTemplate template) { + this.template = template; + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation#changeStream() + */ + @Override + public ReactiveChangeStream changeStream() { + return new ReactiveChangeStreamSupport(template, null, Document.class, null, null); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation#changeStream(java.lang.Class) + */ + @Override + public ReactiveChangeStream changeStream(Class domainType) { + + Assert.notNull(domainType, "DomainType must not be null!"); + return new ReactiveChangeStreamSupport(template, domainType, domainType, null, null); + } + + static class ReactiveChangeStreamSupport + implements ReactiveChangeStream, ChangeStreamWithProjection, ChangeStreamWithFilter { + + private final ReactiveMongoTemplate template; + private final @Nullable Class domainType; + private final Class returnType; + private final @Nullable String collection; + private final @Nullable ChangeStreamOptions options; + + private ReactiveChangeStreamSupport(ReactiveMongoTemplate template, Class domainType, Class returnType, + @Nullable String collection, @Nullable ChangeStreamOptions options) { + + this.template = template; + this.domainType = domainType; + this.returnType = returnType; + this.collection = collection; + this.options = options; + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ChangeStreamWithCollection#watchCollection(java.lang.String) + */ + @Override + public ChangeStreamWithProjection watchCollection(String collection) { + + Assert.hasText(collection, "Collection name must not be null nor empty!"); + return new ReactiveChangeStreamSupport<>(template, domainType, returnType, collection, options); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ResumingChangeStream#resumeAt(java.lang.Object) + */ + @Override + public TerminatingChangeStream resumeAt(Object beacon) { + + return withOptions(builder -> { + + if (beacon instanceof Instant) { + builder.resumeAt((Instant) beacon); + } else if (beacon instanceof BsonTimestamp) { + builder.resumeAt((BsonTimestamp) beacon); + } + }); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ResumingChangeStream#resumeAfter(java.lang.Object) + */ + @Override + public TerminatingChangeStream resumeAfter(Object beacon) { + + Assert.isInstanceOf(BsonValue.class, beacon, "Beacon must be a BsonValue"); + return withOptions(builder -> builder.resumeAfter((BsonValue) beacon)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ResumingChangeStream#startAfter(java.lang.Object) + */ + @Override + public TerminatingChangeStream startAfter(Object beacon) { + + Assert.isInstanceOf(BsonValue.class, beacon, "Beacon must be a BsonValue"); + return withOptions(builder -> builder.startAfter((BsonValue) beacon)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ChangeStreamWithOptions#withOptions(java.util.function.Consumer) + */ + @Override + public ReactiveChangeStreamSupport withOptions(Consumer optionsConsumer) { + + ChangeStreamOptionsBuilder builder = initOptionsBuilder(); + optionsConsumer.accept(builder); + + return new ReactiveChangeStreamSupport<>(template, domainType, returnType, collection, builder.build()); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ChangeStreamWithProjection#as(java.lang.Class) + */ + @Override + public ChangeStreamWithFilter as(Class resultType) { + return new ReactiveChangeStreamSupport<>(template, domainType, resultType, collection, options); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ChangeStreamWithFilter#filter(org.springframework.data.mongodb.core.aggregation.Aggregation) + */ + @Override + public ChangeStreamWithProjection filter(Aggregation aggregation) { + return withOptions(builder -> builder.filter(aggregation)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ChangeStreamWithFilter#filter(org.springframework.data.mongodb.core.query.CriteriaDefinition) + */ + @Override + public ChangeStreamWithProjection filter(CriteriaDefinition by) { + + MatchOperation $match = Aggregation.match(by); + Aggregation aggregation = domainType != null ? Aggregation.newAggregation(domainType, $match) + : Aggregation.newAggregation($match); + return filter(aggregation); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.TerminatingChangeStream#listen() + */ + @Override + public Flux> listen() { + return template.changeStream(collection, options != null ? options : ChangeStreamOptions.empty(), returnType); + } + + private ChangeStreamOptionsBuilder initOptionsBuilder() { + + ChangeStreamOptionsBuilder builder = ChangeStreamOptions.builder(); + if (options == null) { + return builder; + } + + options.getFilter().ifPresent(it -> { + if (it instanceof Aggregation) { + builder.filter((Aggregation) it); + } else { + builder.filter(((List) it).toArray(new Document[0])); + } + }); + options.getFullDocumentLookup().ifPresent(builder::fullDocumentLookup); + options.getCollation().ifPresent(builder::collation); + + if (options.isResumeAfter()) { + options.getResumeToken().ifPresent(builder::resumeAfter); + options.getResumeBsonTimestamp().ifPresent(builder::resumeAfter); + } else if (options.isStartAfter()) { + options.getResumeToken().ifPresent(builder::startAfter); + } else { + options.getResumeTimestamp().ifPresent(builder::resumeAt); + options.getResumeBsonTimestamp().ifPresent(builder::resumeAt); + } + return builder; + + } + } +} diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveFluentMongoOperations.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveFluentMongoOperations.java index d493cac3b4..efb1d8a35c 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveFluentMongoOperations.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveFluentMongoOperations.java @@ -23,4 +23,4 @@ * @since 2.0 */ public interface ReactiveFluentMongoOperations extends ReactiveFindOperation, ReactiveInsertOperation, - ReactiveUpdateOperation, ReactiveRemoveOperation, ReactiveAggregationOperation, ReactiveMapReduceOperation {} + ReactiveUpdateOperation, ReactiveRemoveOperation, ReactiveAggregationOperation, ReactiveMapReduceOperation, ReactiveChangeStreamOperation {} diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java index ec0d5b663d..8b442ab8fb 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java @@ -2269,6 +2269,24 @@ public ReactiveMapReduce mapReduce(Class domainType) { return new ReactiveMapReduceOperationSupport(this).mapReduce(domainType); } + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation#changeStream() + */ + @Override + public ReactiveChangeStream changeStream() { + return new ReactiveChangeStreamOperationSupport(this).changeStream(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation#changeStream(java.lang.Class) + */ + @Override + public ReactiveChangeStream changeStream(Class domainType) { + return new ReactiveChangeStreamOperationSupport(this).changeStream(domainType); + } + /** * Retrieve and remove all documents matching the given {@code query} by calling {@link #find(Query, Class, String)} * and {@link #remove(Query, Class, String)}, whereas the {@link Query} for {@link #remove(Query, Class, String)} is diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupportTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupportTests.java new file mode 100644 index 0000000000..bd75b3e8b7 --- /dev/null +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupportTests.java @@ -0,0 +1,165 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.core; + +import static org.springframework.data.mongodb.core.query.Criteria.*; + +import lombok.SneakyThrows; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; + +import java.time.Duration; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.stream.Collectors; + +import org.assertj.core.api.Assertions; +import org.bson.Document; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.springframework.data.mongodb.test.util.MongoTestUtils; +import org.springframework.data.mongodb.test.util.ReplicaSet; + +import com.mongodb.reactivestreams.client.MongoClient; + +/** + * @author Christoph Strobl + * @currentRead Dawn Cook - The Decoy Princess + */ +public class ReactiveChangeStreamOperationSupportTests { + + public static @ClassRule ReplicaSet REPL_SET_REQUIRED = ReplicaSet.required(); + static final String DATABASE_NAME = "rx-change-stream"; + + MongoClient client; + ReactiveMongoTemplate template; + + @Before + public void setUp() { + + client = MongoTestUtils.reactiveReplSetClient(); + template = new ReactiveMongoTemplate(client, DATABASE_NAME); + + MongoTestUtils.createOrReplaceCollectionNow(DATABASE_NAME, "person", client); + } + + @After + public void tearDown() { + MongoTestUtils.dropCollectionNow(DATABASE_NAME, "person", client); + } + + @SneakyThrows + @Test // DATAMONGO-2089 + public void changeStreamEventsShouldBeEmittedCorrectly() { + + BlockingQueue> documents = new LinkedBlockingQueue<>(100); + + Disposable disposable = template.changeStream() // + .watchCollection("person") // + .listen() // + .doOnNext(documents::add).subscribe(); + + Thread.sleep(500); // just give it some time to link to the collection. + + Person person1 = new Person("Spring", 38); + Person person2 = new Person("Data", 39); + Person person3 = new Person("MongoDB", 37); + + Flux.merge(template.insert(person1).delayElement(Duration.ofMillis(2)), + template.insert(person2).delayElement(Duration.ofMillis(2)), + template.insert(person3).delayElement(Duration.ofMillis(2))) // + .as(StepVerifier::create) // + .expectNextCount(3) // + .verifyComplete(); + + Thread.sleep(500); // just give it some time to link receive all events + + try { + Assertions.assertThat(documents.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList())).hasSize(3) + .allMatch(val -> val instanceof Document); + } finally { + disposable.dispose(); + } + } + + @Test // DATAMONGO-1803 + public void changeStreamEventsShouldBeConvertedCorrectly() throws InterruptedException { + + BlockingQueue> documents = new LinkedBlockingQueue<>(100); + + Disposable disposable = template.changeStream(Person.class).listen() // + .doOnNext(documents::add).subscribe(); + + Thread.sleep(500); // just give it some time to link to the collection. + + Person person1 = new Person("Spring", 38); + Person person2 = new Person("Data", 39); + Person person3 = new Person("MongoDB", 37); + + Flux.merge(template.insert(person1).delayElement(Duration.ofMillis(2)), + template.insert(person2).delayElement(Duration.ofMillis(2)), + template.insert(person3).delayElement(Duration.ofMillis(2))) // + .as(StepVerifier::create) // + .expectNextCount(3) // + .verifyComplete(); + + Thread.sleep(500); // just give it some time to link receive all events + + try { + Assertions.assertThat(documents.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList())) + .containsExactly(person1, person2, person3); + } finally { + disposable.dispose(); + } + } + + @Test // DATAMONGO-1803 + public void changeStreamEventsShouldBeFilteredCorrectly() throws InterruptedException { + + BlockingQueue> documents = new LinkedBlockingQueue<>(100); + + Disposable disposable = template.changeStream(Person.class) // + .watchCollection("person") // + .filter(where("age").gte(38)) // + .listen() // + .doOnNext(documents::add).subscribe(); + + Thread.sleep(500); // just give it some time to link to the collection. + + Person person1 = new Person("Spring", 38); + Person person2 = new Person("Data", 37); + Person person3 = new Person("MongoDB", 39); + + Flux.merge(template.save(person1).delayElement(Duration.ofMillis(2)), + template.save(person2).delayElement(Duration.ofMillis(2)), + template.save(person3).delayElement(Duration.ofMillis(2))) // + .as(StepVerifier::create) // + .expectNextCount(3) // + .verifyComplete(); + + Thread.sleep(500); // just give it some time to link receive all events + + try { + Assertions.assertThat(documents.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList())) + .containsExactly(person1, person3); + } finally { + disposable.dispose(); + } + } +} diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupportUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupportUnitTests.java new file mode 100644 index 0000000000..ad8124083f --- /dev/null +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupportUnitTests.java @@ -0,0 +1,164 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.core; + +import static org.assertj.core.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.isNull; +import static org.springframework.data.mongodb.core.query.Criteria.*; + +import reactor.core.publisher.Flux; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.bson.Document; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.springframework.data.mongodb.core.aggregation.Aggregation; +import org.springframework.data.mongodb.core.aggregation.TypedAggregation; +import org.springframework.data.mongodb.core.query.Criteria; + +/** + * @author Christoph Strobl + * @currentRead Dawn Cook - The Decoy Princess + */ +@RunWith(MockitoJUnitRunner.class) +public class ReactiveChangeStreamOperationSupportUnitTests { + + @Mock ReactiveMongoTemplate template; + ReactiveChangeStreamOperationSupport changeStreamSupport; + + @Before + public void setUp() { + when(template.changeStream(any(), any(), any())).thenReturn(Flux.empty()); + changeStreamSupport = new ReactiveChangeStreamOperationSupport(template); + } + + @Test // DATAMONGO-2089 + public void listenWithoutDomainTypeUsesDocumentAsDefault() { + + changeStreamSupport.changeStream().listen().subscribe(); + + verify(template).changeStream(isNull(), eq(ChangeStreamOptions.empty()), eq(Document.class)); + } + + @Test // DATAMONGO-2089 + public void listenWithDomainTypeUsesSourceAsTarget() { + + changeStreamSupport.changeStream(Person.class).listen().subscribe(); + + verify(template).changeStream(isNull(), eq(ChangeStreamOptions.empty()), eq(Person.class)); + } + + @Test // DATAMONGO-2089 + public void collectionNameIsPassedOnCorrectly() { + + changeStreamSupport.changeStream(Person.class).watchCollection("star-wars").listen().subscribe(); + + verify(template).changeStream(eq("star-wars"), eq(ChangeStreamOptions.empty()), eq(Person.class)); + } + + @Test // DATAMONGO-2089 + public void listenWithDomainTypeCreatesTypedAggregation() { + + Criteria criteria = where("operationType").is("insert"); + changeStreamSupport.changeStream(Person.class).filter(criteria).listen().subscribe(); + + ArgumentCaptor optionsArgumentCaptor = ArgumentCaptor.forClass(ChangeStreamOptions.class); + verify(template).changeStream(isNull(), optionsArgumentCaptor.capture(), eq(Person.class)); + + assertThat(optionsArgumentCaptor.getValue().getFilter()).hasValueSatisfying(it -> { + + assertThat(it).isInstanceOf(TypedAggregation.class); + TypedAggregation aggregation = (TypedAggregation) it; + + assertThat(aggregation.getInputType()).isEqualTo(Person.class); + assertThat(extractPipeline(aggregation)) + .containsExactly(new Document("$match", new Document("operationType", "insert"))); + }); + } + + @Test // DATAMONGO-2089 + public void listenWithoutDomainTypeCreatesUntypedAggregation() { + + Criteria criteria = where("operationType").is("insert"); + changeStreamSupport.changeStream().filter(criteria).listen().subscribe(); + + ArgumentCaptor optionsArgumentCaptor = ArgumentCaptor.forClass(ChangeStreamOptions.class); + verify(template).changeStream(isNull(), optionsArgumentCaptor.capture(), eq(Document.class)); + + assertThat(optionsArgumentCaptor.getValue().getFilter()).hasValueSatisfying(it -> { + + assertThat(it).isInstanceOf(Aggregation.class); + assertThat(it).isNotInstanceOf(TypedAggregation.class); + + Aggregation aggregation = (Aggregation) it; + + assertThat(extractPipeline(aggregation)) + .containsExactly(new Document("$match", new Document("operationType", "insert"))); + }); + } + + @Test // DATAMONGO-2089 + public void optionsShouldBePassedOnCorrectly() { + + Document filter = new Document("$match", new Document("operationType", "insert")); + + changeStreamSupport.changeStream().withOptions(options -> { + options.filter(filter); + }).listen().subscribe(); + + ArgumentCaptor optionsArgumentCaptor = ArgumentCaptor.forClass(ChangeStreamOptions.class); + verify(template).changeStream(isNull(), optionsArgumentCaptor.capture(), eq(Document.class)); + + assertThat(optionsArgumentCaptor.getValue()).satisfies(it -> { + assertThat(it.getFilter().get()).isEqualTo(Collections.singletonList(filter)); + }); + } + + @Test // DATAMONGO-2089 + public void optionsShouldBeCombinedCorrectly() { + + Document filter = new Document("$match", new Document("operationType", "insert")); + Instant resumeTimestamp = Instant.now(); + + changeStreamSupport.changeStream().withOptions(options -> { + options.filter(filter); + }).resumeAt(resumeTimestamp).listen().subscribe(); + + ArgumentCaptor optionsArgumentCaptor = ArgumentCaptor.forClass(ChangeStreamOptions.class); + verify(template).changeStream(isNull(), optionsArgumentCaptor.capture(), eq(Document.class)); + + assertThat(optionsArgumentCaptor.getValue()).satisfies(it -> { + + assertThat(it.getFilter().get()).isEqualTo(Collections.singletonList(filter)); + assertThat(it.getResumeTimestamp()).contains(resumeTimestamp); + }); + } + + private static List extractPipeline(Aggregation aggregation) { + return aggregation.toDocument("person", Aggregation.DEFAULT_CONTEXT).get("pipeline", ArrayList.class); + } +} diff --git a/src/main/asciidoc/reference/change-streams.adoc b/src/main/asciidoc/reference/change-streams.adoc index c18bd0dd38..3d3df7ccc1 100644 --- a/src/main/asciidoc/reference/change-streams.adoc +++ b/src/main/asciidoc/reference/change-streams.adoc @@ -51,14 +51,15 @@ Subscribing to Change Streams with the reactive API is a more natural approach t ==== [source,java] ---- -ChangeStreamOptions options = ChangeStreamOptions.builder() - .filter(newAggregation(User.class, match(where("age").gte(38))) <1> - .build(); - -Flux> flux = reactiveTemplate.changeStream("user", options, User.class); <2> +Flux> flux = reactiveTemplate.changeStream() + .watchCollection("persons") + .filter(where("age").gte(38)) <1> + .as(User.class) <2> + .listen(); <3> ---- -<1> Use an aggregation pipeline to filter events. -<2> Obtain a `Flux` of change stream events. The `ChangeStreamEvent#getBody()` is converted to the requested domain type. Use `Document` to receive raw results without conversion. +<1> Use an aggregation pipeline or just a query `Criteria` to filter events. +<2> The event target type the underlying document should be converted to. Leave this out to receive raw results without conversion. +<3> Obtain a `Flux` of change stream events. The `ChangeStreamEvent#getBody()` is converted to the requested domain type from (2). ==== === Resuming Change Streams @@ -72,11 +73,11 @@ The following example shows how to set the resume offset using server time: ==== [source,java] ---- -ChangeStreamOptions = ChangeStreamOptions.builder() - .resumeAt(Instant.now().minusSeconds(1)) <1> - .build() - -Flux> resumed = template.changeStream("person", options, User.class) +Flux> resumed = template.changeStream() + .watchCollection("persons") + .as(User.class) + .resumeAt(Instant.now().minusSeconds(1)) <1> + .listen(); ---- <1> You may obtain the server time of an `ChangeStreamEvent` through the `getTimestamp` method or use the `resumeToken` exposed through `getResumeToken`. From 03531a20bee42e1602ae2b977f9bd83dbb2a8fe8 Mon Sep 17 00:00:00 2001 From: Christoph Strobl Date: Tue, 18 Jun 2019 11:54:15 +0200 Subject: [PATCH 3/3] DATAMONGO-2089 - Refactor and add Kotlin extension. --- .../core/ReactiveChangeStreamOperation.java | 67 +++++++------------ .../ReactiveChangeStreamOperationSupport.java | 48 ++++++------- .../mongodb/core/ReactiveMongoTemplate.java | 9 --- ...ReactiveChangeStreamOperationExtensions.kt | 53 +++++++++++++++ ...tiveChangeStreamOperationSupportTests.java | 2 +- ...ChangeStreamOperationSupportUnitTests.java | 8 +-- ...iveChangeStreamOperationExtensionsTests.kt | 64 ++++++++++++++++++ .../asciidoc/reference/change-streams.adoc | 11 ++- 8 files changed, 173 insertions(+), 89 deletions(-) create mode 100644 spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationExtensions.kt create mode 100644 spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationExtensionsTests.kt diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperation.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperation.java index a84fd59d9c..521cd9569e 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperation.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperation.java @@ -22,9 +22,7 @@ import org.bson.BsonTimestamp; import org.bson.BsonValue; -import org.bson.Document; import org.springframework.data.mongodb.core.ChangeStreamOptions.ChangeStreamOptionsBuilder; -import org.springframework.data.mongodb.core.ReactiveFindOperation.ReactiveFind; import org.springframework.data.mongodb.core.aggregation.Aggregation; import org.springframework.data.mongodb.core.query.CriteriaDefinition; @@ -54,22 +52,13 @@ */ public interface ReactiveChangeStreamOperation { - /** - * Start creating a change stream operation.watching all collections within the database.
- * Consider limiting events be defining a {@link ChangeStreamWithCollection#watchCollection(String) collection} and/or - * {@link ChangeStreamWithFilter#filter(CriteriaDefinition) filter}. - * - * @return new instance of {@link ReactiveFind}. Never {@literal null}. - */ - ReactiveChangeStream changeStream(); - /** * Start creating a change stream operation for the given {@literal domainType} watching all collections within the * database.
* Consider limiting events be defining a {@link ChangeStreamWithCollection#watchCollection(String) collection} and/or - * {@link ChangeStreamWithFilter#filter(CriteriaDefinition) filter}. + * {@link ChangeStreamWithFilterAndProjection#filter(CriteriaDefinition) filter}. * - * @param domainType must not be {@literal null}. + * @param domainType must not be {@literal null}. Use {@link org.bson.Document} to obtain raw elements. * @return new instance of {@link ReactiveChangeStream}. Never {@literal null}. * @throws IllegalArgumentException if domainType is {@literal null}. */ @@ -102,48 +91,42 @@ interface ChangeStreamWithCollection { * @return new instance of {@link ChangeStreamWithCollection}. * @throws IllegalArgumentException if collection is {@literal null}. */ - ChangeStreamWithProjection watchCollection(String collection); - } - - /** - * Result type override (optional). - */ - interface ChangeStreamWithProjection extends ChangeStreamWithFilter { - - /** - * Define the target type fields should be mapped to. - * - * @param resultType must not be {@literal null}. - * @param result type. - * @return new instance of {@link ChangeStreamWithProjection}. - * @throws IllegalArgumentException if resultType is {@literal null}. - */ - ChangeStreamWithFilter as(Class resultType); + ChangeStreamWithFilterAndProjection watchCollection(String collection); } /** * Provide a filter for limiting results (optional). */ - interface ChangeStreamWithFilter extends ResumingChangeStream, TerminatingChangeStream { + interface ChangeStreamWithFilterAndProjection extends ResumingChangeStream, TerminatingChangeStream { /** * Use an {@link Aggregation} to filter matching events. * * @param by must not be {@literal null}. - * @return new instance of {@link ChangeStreamWithProjection}. + * @return new instance of {@link ChangeStreamWithFilterAndProjection}. * @throws IllegalArgumentException if the given {@link Aggregation} is {@literal null}. */ - ChangeStreamWithProjection filter(Aggregation by); + ChangeStreamWithFilterAndProjection filter(Aggregation by); /** * Use a {@link CriteriaDefinition critera} to filter matching events via an * {@link org.springframework.data.mongodb.core.aggregation.MatchOperation}. * * @param by must not be {@literal null}. - * @return new instance of {@link ChangeStreamWithProjection}. + * @return new instance of {@link ChangeStreamWithFilterAndProjection}. * @throws IllegalArgumentException if the given {@link CriteriaDefinition} is {@literal null}. */ - ChangeStreamWithProjection filter(CriteriaDefinition by); + ChangeStreamWithFilterAndProjection filter(CriteriaDefinition by); + + /** + * Define the target type fields should be mapped to. + * + * @param resultType must not be {@literal null}. + * @param result type. + * @return new instance of {@link ChangeStreamWithFilterAndProjection}. + * @throws IllegalArgumentException if resultType is {@literal null}. + */ + ChangeStreamWithFilterAndProjection as(Class resultType); } /** @@ -154,34 +137,34 @@ interface ResumingChangeStream extends TerminatingChangeStream { /** * Resume the change stream at a given point. * - * @param beacon an {@link Instant} or {@link BsonTimestamp} + * @param token an {@link Instant} or {@link BsonTimestamp} * @return new instance of {@link TerminatingChangeStream}. * @see ChangeStreamOptionsBuilder#resumeAt(Instant) * @see ChangeStreamOptionsBuilder#resumeAt(BsonTimestamp) * @throws IllegalArgumentException if the given beacon is neither {@link Instant} nor {@link BsonTimestamp}. */ - TerminatingChangeStream resumeAt(Object beacon); + TerminatingChangeStream resumeAt(Object token); /** * Resume the change stream after a given point. * - * @param beacon an {@link Instant} or {@link BsonTimestamp} + * @param token an {@link Instant} or {@link BsonTimestamp} * @return new instance of {@link TerminatingChangeStream}. * @see ChangeStreamOptionsBuilder#resumeAfter(BsonValue) * @see ChangeStreamOptionsBuilder#resumeToken(BsonValue) * @throws IllegalArgumentException if the given beacon not a {@link BsonValue}. */ - TerminatingChangeStream resumeAfter(Object beacon); + TerminatingChangeStream resumeAfter(Object token); /** * Start the change stream after a given point. * - * @param beacon an {@link Instant} or {@link BsonTimestamp} + * @param token an {@link Instant} or {@link BsonTimestamp} * @return new instance of {@link TerminatingChangeStream}. * @see ChangeStreamOptionsBuilder#startAfter(BsonValue) (BsonValue) * @throws IllegalArgumentException if the given beacon not a {@link BsonValue}. */ - TerminatingChangeStream startAfter(Object beacon); + TerminatingChangeStream startAfter(Object token); } /** @@ -204,5 +187,5 @@ interface ChangeStreamWithOptions { * {@link ReactiveChangeStream} provides methods for constructing change stream operations in a fluent way. */ interface ReactiveChangeStream extends ChangeStreamWithOptions, ChangeStreamWithCollection, - TerminatingChangeStream, ResumingChangeStream, ChangeStreamWithFilter {} + TerminatingChangeStream, ResumingChangeStream, ChangeStreamWithFilterAndProjection {} } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupport.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupport.java index 7e898e445c..1090f8e417 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupport.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupport.java @@ -46,15 +46,6 @@ class ReactiveChangeStreamOperationSupport implements ReactiveChangeStreamOperat this.template = template; } - /* - * (non-Javadoc) - * @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation#changeStream() - */ - @Override - public ReactiveChangeStream changeStream() { - return new ReactiveChangeStreamSupport(template, null, Document.class, null, null); - } - /* * (non-Javadoc) * @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation#changeStream(java.lang.Class) @@ -67,7 +58,7 @@ public ReactiveChangeStream changeStream(Class domainType) { } static class ReactiveChangeStreamSupport - implements ReactiveChangeStream, ChangeStreamWithProjection, ChangeStreamWithFilter { + implements ReactiveChangeStream, ChangeStreamWithFilterAndProjection { private final ReactiveMongoTemplate template; private final @Nullable Class domainType; @@ -90,7 +81,7 @@ private ReactiveChangeStreamSupport(ReactiveMongoTemplate template, Class dom * @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ChangeStreamWithCollection#watchCollection(java.lang.String) */ @Override - public ChangeStreamWithProjection watchCollection(String collection) { + public ChangeStreamWithFilterAndProjection watchCollection(String collection) { Assert.hasText(collection, "Collection name must not be null nor empty!"); return new ReactiveChangeStreamSupport<>(template, domainType, returnType, collection, options); @@ -101,14 +92,14 @@ public ChangeStreamWithProjection watchCollection(String collection) { * @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ResumingChangeStream#resumeAt(java.lang.Object) */ @Override - public TerminatingChangeStream resumeAt(Object beacon) { + public TerminatingChangeStream resumeAt(Object token) { return withOptions(builder -> { - if (beacon instanceof Instant) { - builder.resumeAt((Instant) beacon); - } else if (beacon instanceof BsonTimestamp) { - builder.resumeAt((BsonTimestamp) beacon); + if (token instanceof Instant) { + builder.resumeAt((Instant) token); + } else if (token instanceof BsonTimestamp) { + builder.resumeAt((BsonTimestamp) token); } }); } @@ -118,10 +109,10 @@ public TerminatingChangeStream resumeAt(Object beacon) { * @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ResumingChangeStream#resumeAfter(java.lang.Object) */ @Override - public TerminatingChangeStream resumeAfter(Object beacon) { + public TerminatingChangeStream resumeAfter(Object token) { - Assert.isInstanceOf(BsonValue.class, beacon, "Beacon must be a BsonValue"); - return withOptions(builder -> builder.resumeAfter((BsonValue) beacon)); + Assert.isInstanceOf(BsonValue.class, token, "Token must be a BsonValue"); + return withOptions(builder -> builder.resumeAfter((BsonValue) token)); } /* @@ -129,10 +120,10 @@ public TerminatingChangeStream resumeAfter(Object beacon) { * @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ResumingChangeStream#startAfter(java.lang.Object) */ @Override - public TerminatingChangeStream startAfter(Object beacon) { + public TerminatingChangeStream startAfter(Object token) { - Assert.isInstanceOf(BsonValue.class, beacon, "Beacon must be a BsonValue"); - return withOptions(builder -> builder.startAfter((BsonValue) beacon)); + Assert.isInstanceOf(BsonValue.class, token, "Token must be a BsonValue"); + return withOptions(builder -> builder.startAfter((BsonValue) token)); } /* @@ -153,7 +144,9 @@ public ReactiveChangeStreamSupport withOptions(Consumer ChangeStreamWithFilter as(Class resultType) { + public ChangeStreamWithFilterAndProjection as(Class resultType) { + + Assert.notNull(resultType, "ResultType must not be null!"); return new ReactiveChangeStreamSupport<>(template, domainType, resultType, collection, options); } @@ -162,8 +155,8 @@ public ChangeStreamWithFilter as(Class resultType) { * @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ChangeStreamWithFilter#filter(org.springframework.data.mongodb.core.aggregation.Aggregation) */ @Override - public ChangeStreamWithProjection filter(Aggregation aggregation) { - return withOptions(builder -> builder.filter(aggregation)); + public ChangeStreamWithFilterAndProjection filter(Aggregation filter) { + return withOptions(builder -> builder.filter(filter)); } /* @@ -171,10 +164,11 @@ public ChangeStreamWithProjection filter(Aggregation aggregation) { * @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ChangeStreamWithFilter#filter(org.springframework.data.mongodb.core.query.CriteriaDefinition) */ @Override - public ChangeStreamWithProjection filter(CriteriaDefinition by) { + public ChangeStreamWithFilterAndProjection filter(CriteriaDefinition by) { MatchOperation $match = Aggregation.match(by); - Aggregation aggregation = domainType != null ? Aggregation.newAggregation(domainType, $match) + Aggregation aggregation = domainType != null && !Document.class.equals(domainType) + ? Aggregation.newAggregation(domainType, $match) : Aggregation.newAggregation($match); return filter(aggregation); } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java index 8b442ab8fb..6faac53792 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java @@ -2269,15 +2269,6 @@ public ReactiveMapReduce mapReduce(Class domainType) { return new ReactiveMapReduceOperationSupport(this).mapReduce(domainType); } - /* - * (non-Javadoc) - * @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation#changeStream() - */ - @Override - public ReactiveChangeStream changeStream() { - return new ReactiveChangeStreamOperationSupport(this).changeStream(); - } - /* * (non-Javadoc) * @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation#changeStream(java.lang.Class) diff --git a/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationExtensions.kt b/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationExtensions.kt new file mode 100644 index 0000000000..bb89842fd4 --- /dev/null +++ b/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationExtensions.kt @@ -0,0 +1,53 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.core + +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.reactive.flow.asFlow + + +/** + * Extension for [RactiveChangeStreamOperation. changeStream] leveraging reified type parameters. + * + * @author Christoph Strobl + * @since 2.2 + */ +inline fun ReactiveChangeStreamOperation.changeStream(): ReactiveChangeStreamOperation.ReactiveChangeStream = + changeStream(T::class.java) + +/** + * Extension for [ReactiveChangeStreamOperation.ChangeStreamWithFilterAndProjection. as] leveraging reified type parameters. + * + * @author Christoph Strobl + * @since 2.2 + */ +inline fun ReactiveChangeStreamOperation.ChangeStreamWithFilterAndProjection<*>.asType(): ReactiveChangeStreamOperation.ChangeStreamWithFilterAndProjection = + `as`(T::class.java) + +/** + * Coroutines [Flow] variant of [ReactiveChangeStreamOperation.TerminatingChangeStream. listen]. + * + * Backpressure is controlled by [batchSize] parameter that controls the size of in-flight elements + * and [org.reactivestreams.Subscription.request] size. + * + * @author Christoph Strobl + * @since 2.2 + */ +@FlowPreview +fun ReactiveChangeStreamOperation.TerminatingChangeStream.flow(batchSize: Int = 1): Flow> = + listen().asFlow(batchSize) + diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupportTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupportTests.java index bd75b3e8b7..abaf1141f5 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupportTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupportTests.java @@ -70,7 +70,7 @@ public void changeStreamEventsShouldBeEmittedCorrectly() { BlockingQueue> documents = new LinkedBlockingQueue<>(100); - Disposable disposable = template.changeStream() // + Disposable disposable = template.changeStream(Document.class) // .watchCollection("person") // .listen() // .doOnNext(documents::add).subscribe(); diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupportUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupportUnitTests.java index ad8124083f..d42cb7568f 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupportUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupportUnitTests.java @@ -59,7 +59,7 @@ public void setUp() { @Test // DATAMONGO-2089 public void listenWithoutDomainTypeUsesDocumentAsDefault() { - changeStreamSupport.changeStream().listen().subscribe(); + changeStreamSupport.changeStream(Document.class).listen().subscribe(); verify(template).changeStream(isNull(), eq(ChangeStreamOptions.empty()), eq(Document.class)); } @@ -104,7 +104,7 @@ public void listenWithDomainTypeCreatesTypedAggregation() { public void listenWithoutDomainTypeCreatesUntypedAggregation() { Criteria criteria = where("operationType").is("insert"); - changeStreamSupport.changeStream().filter(criteria).listen().subscribe(); + changeStreamSupport.changeStream(Document.class).filter(criteria).listen().subscribe(); ArgumentCaptor optionsArgumentCaptor = ArgumentCaptor.forClass(ChangeStreamOptions.class); verify(template).changeStream(isNull(), optionsArgumentCaptor.capture(), eq(Document.class)); @@ -126,7 +126,7 @@ public void optionsShouldBePassedOnCorrectly() { Document filter = new Document("$match", new Document("operationType", "insert")); - changeStreamSupport.changeStream().withOptions(options -> { + changeStreamSupport.changeStream(Document.class).withOptions(options -> { options.filter(filter); }).listen().subscribe(); @@ -144,7 +144,7 @@ public void optionsShouldBeCombinedCorrectly() { Document filter = new Document("$match", new Document("operationType", "insert")); Instant resumeTimestamp = Instant.now(); - changeStreamSupport.changeStream().withOptions(options -> { + changeStreamSupport.changeStream(Document.class).withOptions(options -> { options.filter(filter); }).resumeAt(resumeTimestamp).listen().subscribe(); diff --git a/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationExtensionsTests.kt b/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationExtensionsTests.kt new file mode 100644 index 0000000000..f6c67780da --- /dev/null +++ b/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationExtensionsTests.kt @@ -0,0 +1,64 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.core + +import example.first.First +import io.mockk.every +import io.mockk.mockk +import io.mockk.verify +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.runBlocking +import org.assertj.core.api.Assertions.assertThat +import org.bson.Document +import org.junit.Test +import reactor.core.publisher.Flux + +/** + * @author Christoph Strobl + * @soundtrack Rage Against The Machine - Take the Power Back + */ +class ReactiveChangeStreamOperationExtensionsTests { + + val operation = mockk(relaxed = true) + + @Test // DATAMONGO-2089 + fun `ReactiveChangeStreamOperation#changeStream() with reified type parameter extension should call its Java counterpart`() { + + operation.changeStream() + verify { operation.changeStream(First::class.java) } + } + + @Test // DATAMONGO-2089 + @FlowPreview + fun `TerminatingChangeStream#listen() flow extension`() { + + val doc1 = mockk>() + val doc2 = mockk>() + val doc3 = mockk>() + + val spec = mockk>() + every { spec.listen() } returns Flux.just(doc1, doc2, doc3) + + runBlocking { + assertThat(spec.flow().toList()).contains(doc1, doc2, doc3) + } + + verify { + spec.listen() + } + } +} diff --git a/src/main/asciidoc/reference/change-streams.adoc b/src/main/asciidoc/reference/change-streams.adoc index 3d3df7ccc1..e794b388d7 100644 --- a/src/main/asciidoc/reference/change-streams.adoc +++ b/src/main/asciidoc/reference/change-streams.adoc @@ -51,14 +51,13 @@ Subscribing to Change Streams with the reactive API is a more natural approach t ==== [source,java] ---- -Flux> flux = reactiveTemplate.changeStream() +Flux> flux = reactiveTemplate.changeStream(User.class) <1> .watchCollection("persons") - .filter(where("age").gte(38)) <1> - .as(User.class) <2> - .listen(); <3> + .filter(where("age").gte(38)) <2> + .listen(); <3> ---- -<1> Use an aggregation pipeline or just a query `Criteria` to filter events. -<2> The event target type the underlying document should be converted to. Leave this out to receive raw results without conversion. +<1> The event target type the underlying document should be converted to. Leave this out to receive raw results without conversion. +<2> Use an aggregation pipeline or just a query `Criteria` to filter events. <3> Obtain a `Flux` of change stream events. The `ChangeStreamEvent#getBody()` is converted to the requested domain type from (2). ====