diff --git a/pom.xml b/pom.xml index 340e6922b4..cf7c2a99d7 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.springframework.data spring-data-mongodb-parent - 2.2.0.BUILD-SNAPSHOT + 2.2.0.DATAMONGO-2089-SNAPSHOT pom Spring Data MongoDB diff --git a/spring-data-mongodb-benchmarks/pom.xml b/spring-data-mongodb-benchmarks/pom.xml index bb7d9f03cc..6f8d9a6f8e 100644 --- a/spring-data-mongodb-benchmarks/pom.xml +++ b/spring-data-mongodb-benchmarks/pom.xml @@ -7,7 +7,7 @@ org.springframework.data spring-data-mongodb-parent - 2.2.0.BUILD-SNAPSHOT + 2.2.0.DATAMONGO-2089-SNAPSHOT ../pom.xml diff --git a/spring-data-mongodb-distribution/pom.xml b/spring-data-mongodb-distribution/pom.xml index b32dcba387..49891d9610 100644 --- a/spring-data-mongodb-distribution/pom.xml +++ b/spring-data-mongodb-distribution/pom.xml @@ -14,7 +14,7 @@ org.springframework.data spring-data-mongodb-parent - 2.2.0.BUILD-SNAPSHOT + 2.2.0.DATAMONGO-2089-SNAPSHOT ../pom.xml diff --git a/spring-data-mongodb/pom.xml b/spring-data-mongodb/pom.xml index b611cf01a8..3fb5cfeb32 100644 --- a/spring-data-mongodb/pom.xml +++ b/spring-data-mongodb/pom.xml @@ -11,7 +11,7 @@ org.springframework.data spring-data-mongodb-parent - 2.2.0.BUILD-SNAPSHOT + 2.2.0.DATAMONGO-2089-SNAPSHOT ../pom.xml diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperation.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperation.java new file mode 100644 index 0000000000..521cd9569e --- /dev/null +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperation.java @@ -0,0 +1,191 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.core; + +import reactor.core.publisher.Flux; + +import java.time.Instant; +import java.util.function.Consumer; + +import org.bson.BsonTimestamp; +import org.bson.BsonValue; +import org.springframework.data.mongodb.core.ChangeStreamOptions.ChangeStreamOptionsBuilder; +import org.springframework.data.mongodb.core.aggregation.Aggregation; +import org.springframework.data.mongodb.core.query.CriteriaDefinition; + +/** + * {@link ReactiveChangeStreamOperation} allows creation and execution of reactive MongoDB + * Change Stream operations in a fluent API * style.
+ * The starting {@literal domainType} is used for mapping a potentially given + * {@link org.springframework.data.mongodb.core.aggregation.TypedAggregation} used for filtering. By default, the + * originating {@literal domainType} is also used for mapping back the result from the {@link org.bson.Document}. + * However, it is possible to define an different {@literal returnType} via {@code as}.
+ * The collection to operate on is optional in which case call collection with the actual database are watched, use + * {@literal watchCollection} to define a fixed collection. + * + *
+ *     
+ *         changeStream(Human.class)
+ *             .watchCollection("star-wars")
+ *             .filter(where("operationType").is("insert"))
+ *             .as(Jedi.class)
+ *             .resumeAt(Instant.now())
+ *             .listen();
+ *     
+ * 
+ * + * @author Christoph Strobl + * @since 2.2 + */ +public interface ReactiveChangeStreamOperation { + + /** + * Start creating a change stream operation for the given {@literal domainType} watching all collections within the + * database.
+ * Consider limiting events be defining a {@link ChangeStreamWithCollection#watchCollection(String) collection} and/or + * {@link ChangeStreamWithFilterAndProjection#filter(CriteriaDefinition) filter}. + * + * @param domainType must not be {@literal null}. Use {@link org.bson.Document} to obtain raw elements. + * @return new instance of {@link ReactiveChangeStream}. Never {@literal null}. + * @throws IllegalArgumentException if domainType is {@literal null}. + */ + ReactiveChangeStream changeStream(Class domainType); + + /** + * Compose change stream execution by calling one of the terminating methods. + */ + interface TerminatingChangeStream { + + /** + * Start listening to changes. The stream will not be completed unless the {@link org.reactivestreams.Subscription} + * is {@link org.reactivestreams.Subscription#cancel() canceled}. + *

+ * However, the stream may become dead, or invalid, if all watched collections, databases are dropped. + */ + Flux> listen(); + } + + /** + * Collection override (optional). + */ + interface ChangeStreamWithCollection { + + /** + * Explicitly set the name of the collection to watch.
+ * Skip this step to watch all collections within the database. + * + * @param collection must not be {@literal null} nor {@literal empty}. + * @return new instance of {@link ChangeStreamWithCollection}. + * @throws IllegalArgumentException if collection is {@literal null}. + */ + ChangeStreamWithFilterAndProjection watchCollection(String collection); + } + + /** + * Provide a filter for limiting results (optional). + */ + interface ChangeStreamWithFilterAndProjection extends ResumingChangeStream, TerminatingChangeStream { + + /** + * Use an {@link Aggregation} to filter matching events. + * + * @param by must not be {@literal null}. + * @return new instance of {@link ChangeStreamWithFilterAndProjection}. + * @throws IllegalArgumentException if the given {@link Aggregation} is {@literal null}. + */ + ChangeStreamWithFilterAndProjection filter(Aggregation by); + + /** + * Use a {@link CriteriaDefinition critera} to filter matching events via an + * {@link org.springframework.data.mongodb.core.aggregation.MatchOperation}. + * + * @param by must not be {@literal null}. + * @return new instance of {@link ChangeStreamWithFilterAndProjection}. + * @throws IllegalArgumentException if the given {@link CriteriaDefinition} is {@literal null}. + */ + ChangeStreamWithFilterAndProjection filter(CriteriaDefinition by); + + /** + * Define the target type fields should be mapped to. + * + * @param resultType must not be {@literal null}. + * @param result type. + * @return new instance of {@link ChangeStreamWithFilterAndProjection}. + * @throws IllegalArgumentException if resultType is {@literal null}. + */ + ChangeStreamWithFilterAndProjection as(Class resultType); + } + + /** + * Resume a change stream. (optional). + */ + interface ResumingChangeStream extends TerminatingChangeStream { + + /** + * Resume the change stream at a given point. + * + * @param token an {@link Instant} or {@link BsonTimestamp} + * @return new instance of {@link TerminatingChangeStream}. + * @see ChangeStreamOptionsBuilder#resumeAt(Instant) + * @see ChangeStreamOptionsBuilder#resumeAt(BsonTimestamp) + * @throws IllegalArgumentException if the given beacon is neither {@link Instant} nor {@link BsonTimestamp}. + */ + TerminatingChangeStream resumeAt(Object token); + + /** + * Resume the change stream after a given point. + * + * @param token an {@link Instant} or {@link BsonTimestamp} + * @return new instance of {@link TerminatingChangeStream}. + * @see ChangeStreamOptionsBuilder#resumeAfter(BsonValue) + * @see ChangeStreamOptionsBuilder#resumeToken(BsonValue) + * @throws IllegalArgumentException if the given beacon not a {@link BsonValue}. + */ + TerminatingChangeStream resumeAfter(Object token); + + /** + * Start the change stream after a given point. + * + * @param token an {@link Instant} or {@link BsonTimestamp} + * @return new instance of {@link TerminatingChangeStream}. + * @see ChangeStreamOptionsBuilder#startAfter(BsonValue) (BsonValue) + * @throws IllegalArgumentException if the given beacon not a {@link BsonValue}. + */ + TerminatingChangeStream startAfter(Object token); + } + + /** + * Provide some options. + */ + interface ChangeStreamWithOptions { + + /** + * Provide some options via the callback by modifying the given {@link ChangeStreamOptionsBuilder}. Previously + * defined options like a {@link ResumingChangeStream#resumeAfter(Object) resumeToken} are carried over to the + * builder and can be overwritten via eg. {@link ChangeStreamOptionsBuilder#resumeToken(BsonValue)}. + * + * @param optionsConsumer never {@literal null}. + * @return new instance of {@link ReactiveChangeStream}. + */ + ReactiveChangeStream withOptions(Consumer optionsConsumer); + } + + /** + * {@link ReactiveChangeStream} provides methods for constructing change stream operations in a fluent way. + */ + interface ReactiveChangeStream extends ChangeStreamWithOptions, ChangeStreamWithCollection, + TerminatingChangeStream, ResumingChangeStream, ChangeStreamWithFilterAndProjection {} +} diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupport.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupport.java new file mode 100644 index 0000000000..1090f8e417 --- /dev/null +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupport.java @@ -0,0 +1,215 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.core; + +import reactor.core.publisher.Flux; + +import java.time.Instant; +import java.util.List; +import java.util.function.Consumer; + +import org.bson.BsonTimestamp; +import org.bson.BsonValue; +import org.bson.Document; +import org.springframework.data.mongodb.core.ChangeStreamOptions.ChangeStreamOptionsBuilder; +import org.springframework.data.mongodb.core.aggregation.Aggregation; +import org.springframework.data.mongodb.core.aggregation.MatchOperation; +import org.springframework.data.mongodb.core.query.CriteriaDefinition; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; + +/** + * @author Christoph Strobl + * @since 2.2 + */ +class ReactiveChangeStreamOperationSupport implements ReactiveChangeStreamOperation { + + private final ReactiveMongoTemplate template; + + /** + * @param template must not be {@literal null}. + */ + ReactiveChangeStreamOperationSupport(ReactiveMongoTemplate template) { + this.template = template; + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation#changeStream(java.lang.Class) + */ + @Override + public ReactiveChangeStream changeStream(Class domainType) { + + Assert.notNull(domainType, "DomainType must not be null!"); + return new ReactiveChangeStreamSupport(template, domainType, domainType, null, null); + } + + static class ReactiveChangeStreamSupport + implements ReactiveChangeStream, ChangeStreamWithFilterAndProjection { + + private final ReactiveMongoTemplate template; + private final @Nullable Class domainType; + private final Class returnType; + private final @Nullable String collection; + private final @Nullable ChangeStreamOptions options; + + private ReactiveChangeStreamSupport(ReactiveMongoTemplate template, Class domainType, Class returnType, + @Nullable String collection, @Nullable ChangeStreamOptions options) { + + this.template = template; + this.domainType = domainType; + this.returnType = returnType; + this.collection = collection; + this.options = options; + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ChangeStreamWithCollection#watchCollection(java.lang.String) + */ + @Override + public ChangeStreamWithFilterAndProjection watchCollection(String collection) { + + Assert.hasText(collection, "Collection name must not be null nor empty!"); + return new ReactiveChangeStreamSupport<>(template, domainType, returnType, collection, options); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ResumingChangeStream#resumeAt(java.lang.Object) + */ + @Override + public TerminatingChangeStream resumeAt(Object token) { + + return withOptions(builder -> { + + if (token instanceof Instant) { + builder.resumeAt((Instant) token); + } else if (token instanceof BsonTimestamp) { + builder.resumeAt((BsonTimestamp) token); + } + }); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ResumingChangeStream#resumeAfter(java.lang.Object) + */ + @Override + public TerminatingChangeStream resumeAfter(Object token) { + + Assert.isInstanceOf(BsonValue.class, token, "Token must be a BsonValue"); + return withOptions(builder -> builder.resumeAfter((BsonValue) token)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ResumingChangeStream#startAfter(java.lang.Object) + */ + @Override + public TerminatingChangeStream startAfter(Object token) { + + Assert.isInstanceOf(BsonValue.class, token, "Token must be a BsonValue"); + return withOptions(builder -> builder.startAfter((BsonValue) token)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ChangeStreamWithOptions#withOptions(java.util.function.Consumer) + */ + @Override + public ReactiveChangeStreamSupport withOptions(Consumer optionsConsumer) { + + ChangeStreamOptionsBuilder builder = initOptionsBuilder(); + optionsConsumer.accept(builder); + + return new ReactiveChangeStreamSupport<>(template, domainType, returnType, collection, builder.build()); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ChangeStreamWithProjection#as(java.lang.Class) + */ + @Override + public ChangeStreamWithFilterAndProjection as(Class resultType) { + + Assert.notNull(resultType, "ResultType must not be null!"); + return new ReactiveChangeStreamSupport<>(template, domainType, resultType, collection, options); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ChangeStreamWithFilter#filter(org.springframework.data.mongodb.core.aggregation.Aggregation) + */ + @Override + public ChangeStreamWithFilterAndProjection filter(Aggregation filter) { + return withOptions(builder -> builder.filter(filter)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ChangeStreamWithFilter#filter(org.springframework.data.mongodb.core.query.CriteriaDefinition) + */ + @Override + public ChangeStreamWithFilterAndProjection filter(CriteriaDefinition by) { + + MatchOperation $match = Aggregation.match(by); + Aggregation aggregation = domainType != null && !Document.class.equals(domainType) + ? Aggregation.newAggregation(domainType, $match) + : Aggregation.newAggregation($match); + return filter(aggregation); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.TerminatingChangeStream#listen() + */ + @Override + public Flux> listen() { + return template.changeStream(collection, options != null ? options : ChangeStreamOptions.empty(), returnType); + } + + private ChangeStreamOptionsBuilder initOptionsBuilder() { + + ChangeStreamOptionsBuilder builder = ChangeStreamOptions.builder(); + if (options == null) { + return builder; + } + + options.getFilter().ifPresent(it -> { + if (it instanceof Aggregation) { + builder.filter((Aggregation) it); + } else { + builder.filter(((List) it).toArray(new Document[0])); + } + }); + options.getFullDocumentLookup().ifPresent(builder::fullDocumentLookup); + options.getCollation().ifPresent(builder::collation); + + if (options.isResumeAfter()) { + options.getResumeToken().ifPresent(builder::resumeAfter); + options.getResumeBsonTimestamp().ifPresent(builder::resumeAfter); + } else if (options.isStartAfter()) { + options.getResumeToken().ifPresent(builder::startAfter); + } else { + options.getResumeTimestamp().ifPresent(builder::resumeAt); + options.getResumeBsonTimestamp().ifPresent(builder::resumeAt); + } + return builder; + + } + } +} diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveFluentMongoOperations.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveFluentMongoOperations.java index d493cac3b4..efb1d8a35c 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveFluentMongoOperations.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveFluentMongoOperations.java @@ -23,4 +23,4 @@ * @since 2.0 */ public interface ReactiveFluentMongoOperations extends ReactiveFindOperation, ReactiveInsertOperation, - ReactiveUpdateOperation, ReactiveRemoveOperation, ReactiveAggregationOperation, ReactiveMapReduceOperation {} + ReactiveUpdateOperation, ReactiveRemoveOperation, ReactiveAggregationOperation, ReactiveMapReduceOperation, ReactiveChangeStreamOperation {} diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java index ec0d5b663d..6faac53792 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java @@ -2269,6 +2269,15 @@ public ReactiveMapReduce mapReduce(Class domainType) { return new ReactiveMapReduceOperationSupport(this).mapReduce(domainType); } + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation#changeStream(java.lang.Class) + */ + @Override + public ReactiveChangeStream changeStream(Class domainType) { + return new ReactiveChangeStreamOperationSupport(this).changeStream(domainType); + } + /** * Retrieve and remove all documents matching the given {@code query} by calling {@link #find(Query, Class, String)} * and {@link #remove(Query, Class, String)}, whereas the {@link Query} for {@link #remove(Query, Class, String)} is diff --git a/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationExtensions.kt b/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationExtensions.kt new file mode 100644 index 0000000000..bb89842fd4 --- /dev/null +++ b/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationExtensions.kt @@ -0,0 +1,53 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.core + +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.reactive.flow.asFlow + + +/** + * Extension for [RactiveChangeStreamOperation. changeStream] leveraging reified type parameters. + * + * @author Christoph Strobl + * @since 2.2 + */ +inline fun ReactiveChangeStreamOperation.changeStream(): ReactiveChangeStreamOperation.ReactiveChangeStream = + changeStream(T::class.java) + +/** + * Extension for [ReactiveChangeStreamOperation.ChangeStreamWithFilterAndProjection. as] leveraging reified type parameters. + * + * @author Christoph Strobl + * @since 2.2 + */ +inline fun ReactiveChangeStreamOperation.ChangeStreamWithFilterAndProjection<*>.asType(): ReactiveChangeStreamOperation.ChangeStreamWithFilterAndProjection = + `as`(T::class.java) + +/** + * Coroutines [Flow] variant of [ReactiveChangeStreamOperation.TerminatingChangeStream. listen]. + * + * Backpressure is controlled by [batchSize] parameter that controls the size of in-flight elements + * and [org.reactivestreams.Subscription.request] size. + * + * @author Christoph Strobl + * @since 2.2 + */ +@FlowPreview +fun ReactiveChangeStreamOperation.TerminatingChangeStream.flow(batchSize: Int = 1): Flow> = + listen().asFlow(batchSize) + diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupportTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupportTests.java new file mode 100644 index 0000000000..abaf1141f5 --- /dev/null +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupportTests.java @@ -0,0 +1,165 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.core; + +import static org.springframework.data.mongodb.core.query.Criteria.*; + +import lombok.SneakyThrows; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; + +import java.time.Duration; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.stream.Collectors; + +import org.assertj.core.api.Assertions; +import org.bson.Document; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.springframework.data.mongodb.test.util.MongoTestUtils; +import org.springframework.data.mongodb.test.util.ReplicaSet; + +import com.mongodb.reactivestreams.client.MongoClient; + +/** + * @author Christoph Strobl + * @currentRead Dawn Cook - The Decoy Princess + */ +public class ReactiveChangeStreamOperationSupportTests { + + public static @ClassRule ReplicaSet REPL_SET_REQUIRED = ReplicaSet.required(); + static final String DATABASE_NAME = "rx-change-stream"; + + MongoClient client; + ReactiveMongoTemplate template; + + @Before + public void setUp() { + + client = MongoTestUtils.reactiveReplSetClient(); + template = new ReactiveMongoTemplate(client, DATABASE_NAME); + + MongoTestUtils.createOrReplaceCollectionNow(DATABASE_NAME, "person", client); + } + + @After + public void tearDown() { + MongoTestUtils.dropCollectionNow(DATABASE_NAME, "person", client); + } + + @SneakyThrows + @Test // DATAMONGO-2089 + public void changeStreamEventsShouldBeEmittedCorrectly() { + + BlockingQueue> documents = new LinkedBlockingQueue<>(100); + + Disposable disposable = template.changeStream(Document.class) // + .watchCollection("person") // + .listen() // + .doOnNext(documents::add).subscribe(); + + Thread.sleep(500); // just give it some time to link to the collection. + + Person person1 = new Person("Spring", 38); + Person person2 = new Person("Data", 39); + Person person3 = new Person("MongoDB", 37); + + Flux.merge(template.insert(person1).delayElement(Duration.ofMillis(2)), + template.insert(person2).delayElement(Duration.ofMillis(2)), + template.insert(person3).delayElement(Duration.ofMillis(2))) // + .as(StepVerifier::create) // + .expectNextCount(3) // + .verifyComplete(); + + Thread.sleep(500); // just give it some time to link receive all events + + try { + Assertions.assertThat(documents.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList())).hasSize(3) + .allMatch(val -> val instanceof Document); + } finally { + disposable.dispose(); + } + } + + @Test // DATAMONGO-1803 + public void changeStreamEventsShouldBeConvertedCorrectly() throws InterruptedException { + + BlockingQueue> documents = new LinkedBlockingQueue<>(100); + + Disposable disposable = template.changeStream(Person.class).listen() // + .doOnNext(documents::add).subscribe(); + + Thread.sleep(500); // just give it some time to link to the collection. + + Person person1 = new Person("Spring", 38); + Person person2 = new Person("Data", 39); + Person person3 = new Person("MongoDB", 37); + + Flux.merge(template.insert(person1).delayElement(Duration.ofMillis(2)), + template.insert(person2).delayElement(Duration.ofMillis(2)), + template.insert(person3).delayElement(Duration.ofMillis(2))) // + .as(StepVerifier::create) // + .expectNextCount(3) // + .verifyComplete(); + + Thread.sleep(500); // just give it some time to link receive all events + + try { + Assertions.assertThat(documents.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList())) + .containsExactly(person1, person2, person3); + } finally { + disposable.dispose(); + } + } + + @Test // DATAMONGO-1803 + public void changeStreamEventsShouldBeFilteredCorrectly() throws InterruptedException { + + BlockingQueue> documents = new LinkedBlockingQueue<>(100); + + Disposable disposable = template.changeStream(Person.class) // + .watchCollection("person") // + .filter(where("age").gte(38)) // + .listen() // + .doOnNext(documents::add).subscribe(); + + Thread.sleep(500); // just give it some time to link to the collection. + + Person person1 = new Person("Spring", 38); + Person person2 = new Person("Data", 37); + Person person3 = new Person("MongoDB", 39); + + Flux.merge(template.save(person1).delayElement(Duration.ofMillis(2)), + template.save(person2).delayElement(Duration.ofMillis(2)), + template.save(person3).delayElement(Duration.ofMillis(2))) // + .as(StepVerifier::create) // + .expectNextCount(3) // + .verifyComplete(); + + Thread.sleep(500); // just give it some time to link receive all events + + try { + Assertions.assertThat(documents.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList())) + .containsExactly(person1, person3); + } finally { + disposable.dispose(); + } + } +} diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupportUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupportUnitTests.java new file mode 100644 index 0000000000..d42cb7568f --- /dev/null +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupportUnitTests.java @@ -0,0 +1,164 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.core; + +import static org.assertj.core.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.isNull; +import static org.springframework.data.mongodb.core.query.Criteria.*; + +import reactor.core.publisher.Flux; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.bson.Document; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.springframework.data.mongodb.core.aggregation.Aggregation; +import org.springframework.data.mongodb.core.aggregation.TypedAggregation; +import org.springframework.data.mongodb.core.query.Criteria; + +/** + * @author Christoph Strobl + * @currentRead Dawn Cook - The Decoy Princess + */ +@RunWith(MockitoJUnitRunner.class) +public class ReactiveChangeStreamOperationSupportUnitTests { + + @Mock ReactiveMongoTemplate template; + ReactiveChangeStreamOperationSupport changeStreamSupport; + + @Before + public void setUp() { + when(template.changeStream(any(), any(), any())).thenReturn(Flux.empty()); + changeStreamSupport = new ReactiveChangeStreamOperationSupport(template); + } + + @Test // DATAMONGO-2089 + public void listenWithoutDomainTypeUsesDocumentAsDefault() { + + changeStreamSupport.changeStream(Document.class).listen().subscribe(); + + verify(template).changeStream(isNull(), eq(ChangeStreamOptions.empty()), eq(Document.class)); + } + + @Test // DATAMONGO-2089 + public void listenWithDomainTypeUsesSourceAsTarget() { + + changeStreamSupport.changeStream(Person.class).listen().subscribe(); + + verify(template).changeStream(isNull(), eq(ChangeStreamOptions.empty()), eq(Person.class)); + } + + @Test // DATAMONGO-2089 + public void collectionNameIsPassedOnCorrectly() { + + changeStreamSupport.changeStream(Person.class).watchCollection("star-wars").listen().subscribe(); + + verify(template).changeStream(eq("star-wars"), eq(ChangeStreamOptions.empty()), eq(Person.class)); + } + + @Test // DATAMONGO-2089 + public void listenWithDomainTypeCreatesTypedAggregation() { + + Criteria criteria = where("operationType").is("insert"); + changeStreamSupport.changeStream(Person.class).filter(criteria).listen().subscribe(); + + ArgumentCaptor optionsArgumentCaptor = ArgumentCaptor.forClass(ChangeStreamOptions.class); + verify(template).changeStream(isNull(), optionsArgumentCaptor.capture(), eq(Person.class)); + + assertThat(optionsArgumentCaptor.getValue().getFilter()).hasValueSatisfying(it -> { + + assertThat(it).isInstanceOf(TypedAggregation.class); + TypedAggregation aggregation = (TypedAggregation) it; + + assertThat(aggregation.getInputType()).isEqualTo(Person.class); + assertThat(extractPipeline(aggregation)) + .containsExactly(new Document("$match", new Document("operationType", "insert"))); + }); + } + + @Test // DATAMONGO-2089 + public void listenWithoutDomainTypeCreatesUntypedAggregation() { + + Criteria criteria = where("operationType").is("insert"); + changeStreamSupport.changeStream(Document.class).filter(criteria).listen().subscribe(); + + ArgumentCaptor optionsArgumentCaptor = ArgumentCaptor.forClass(ChangeStreamOptions.class); + verify(template).changeStream(isNull(), optionsArgumentCaptor.capture(), eq(Document.class)); + + assertThat(optionsArgumentCaptor.getValue().getFilter()).hasValueSatisfying(it -> { + + assertThat(it).isInstanceOf(Aggregation.class); + assertThat(it).isNotInstanceOf(TypedAggregation.class); + + Aggregation aggregation = (Aggregation) it; + + assertThat(extractPipeline(aggregation)) + .containsExactly(new Document("$match", new Document("operationType", "insert"))); + }); + } + + @Test // DATAMONGO-2089 + public void optionsShouldBePassedOnCorrectly() { + + Document filter = new Document("$match", new Document("operationType", "insert")); + + changeStreamSupport.changeStream(Document.class).withOptions(options -> { + options.filter(filter); + }).listen().subscribe(); + + ArgumentCaptor optionsArgumentCaptor = ArgumentCaptor.forClass(ChangeStreamOptions.class); + verify(template).changeStream(isNull(), optionsArgumentCaptor.capture(), eq(Document.class)); + + assertThat(optionsArgumentCaptor.getValue()).satisfies(it -> { + assertThat(it.getFilter().get()).isEqualTo(Collections.singletonList(filter)); + }); + } + + @Test // DATAMONGO-2089 + public void optionsShouldBeCombinedCorrectly() { + + Document filter = new Document("$match", new Document("operationType", "insert")); + Instant resumeTimestamp = Instant.now(); + + changeStreamSupport.changeStream(Document.class).withOptions(options -> { + options.filter(filter); + }).resumeAt(resumeTimestamp).listen().subscribe(); + + ArgumentCaptor optionsArgumentCaptor = ArgumentCaptor.forClass(ChangeStreamOptions.class); + verify(template).changeStream(isNull(), optionsArgumentCaptor.capture(), eq(Document.class)); + + assertThat(optionsArgumentCaptor.getValue()).satisfies(it -> { + + assertThat(it.getFilter().get()).isEqualTo(Collections.singletonList(filter)); + assertThat(it.getResumeTimestamp()).contains(resumeTimestamp); + }); + } + + private static List extractPipeline(Aggregation aggregation) { + return aggregation.toDocument("person", Aggregation.DEFAULT_CONTEXT).get("pipeline", ArrayList.class); + } +} diff --git a/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationExtensionsTests.kt b/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationExtensionsTests.kt new file mode 100644 index 0000000000..f6c67780da --- /dev/null +++ b/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationExtensionsTests.kt @@ -0,0 +1,64 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.core + +import example.first.First +import io.mockk.every +import io.mockk.mockk +import io.mockk.verify +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.runBlocking +import org.assertj.core.api.Assertions.assertThat +import org.bson.Document +import org.junit.Test +import reactor.core.publisher.Flux + +/** + * @author Christoph Strobl + * @soundtrack Rage Against The Machine - Take the Power Back + */ +class ReactiveChangeStreamOperationExtensionsTests { + + val operation = mockk(relaxed = true) + + @Test // DATAMONGO-2089 + fun `ReactiveChangeStreamOperation#changeStream() with reified type parameter extension should call its Java counterpart`() { + + operation.changeStream() + verify { operation.changeStream(First::class.java) } + } + + @Test // DATAMONGO-2089 + @FlowPreview + fun `TerminatingChangeStream#listen() flow extension`() { + + val doc1 = mockk>() + val doc2 = mockk>() + val doc3 = mockk>() + + val spec = mockk>() + every { spec.listen() } returns Flux.just(doc1, doc2, doc3) + + runBlocking { + assertThat(spec.flow().toList()).contains(doc1, doc2, doc3) + } + + verify { + spec.listen() + } + } +} diff --git a/src/main/asciidoc/reference/change-streams.adoc b/src/main/asciidoc/reference/change-streams.adoc index c18bd0dd38..e794b388d7 100644 --- a/src/main/asciidoc/reference/change-streams.adoc +++ b/src/main/asciidoc/reference/change-streams.adoc @@ -51,14 +51,14 @@ Subscribing to Change Streams with the reactive API is a more natural approach t ==== [source,java] ---- -ChangeStreamOptions options = ChangeStreamOptions.builder() - .filter(newAggregation(User.class, match(where("age").gte(38))) <1> - .build(); - -Flux> flux = reactiveTemplate.changeStream("user", options, User.class); <2> +Flux> flux = reactiveTemplate.changeStream(User.class) <1> + .watchCollection("persons") + .filter(where("age").gte(38)) <2> + .listen(); <3> ---- -<1> Use an aggregation pipeline to filter events. -<2> Obtain a `Flux` of change stream events. The `ChangeStreamEvent#getBody()` is converted to the requested domain type. Use `Document` to receive raw results without conversion. +<1> The event target type the underlying document should be converted to. Leave this out to receive raw results without conversion. +<2> Use an aggregation pipeline or just a query `Criteria` to filter events. +<3> Obtain a `Flux` of change stream events. The `ChangeStreamEvent#getBody()` is converted to the requested domain type from (2). ==== === Resuming Change Streams @@ -72,11 +72,11 @@ The following example shows how to set the resume offset using server time: ==== [source,java] ---- -ChangeStreamOptions = ChangeStreamOptions.builder() - .resumeAt(Instant.now().minusSeconds(1)) <1> - .build() - -Flux> resumed = template.changeStream("person", options, User.class) +Flux> resumed = template.changeStream() + .watchCollection("persons") + .as(User.class) + .resumeAt(Instant.now().minusSeconds(1)) <1> + .listen(); ---- <1> You may obtain the server time of an `ChangeStreamEvent` through the `getTimestamp` method or use the `resumeToken` exposed through `getResumeToken`.