diff --git a/pom.xml b/pom.xml index a401249d79..24f6bafc9f 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.springframework.data spring-data-mongodb-parent - 4.0.0-SNAPSHOT + 4.0.x-GH-4167-SNAPSHOT pom Spring Data MongoDB diff --git a/spring-data-mongodb-benchmarks/pom.xml b/spring-data-mongodb-benchmarks/pom.xml index c28a240d2c..64d7d599ea 100644 --- a/spring-data-mongodb-benchmarks/pom.xml +++ b/spring-data-mongodb-benchmarks/pom.xml @@ -7,7 +7,7 @@ org.springframework.data spring-data-mongodb-parent - 4.0.0-SNAPSHOT + 4.0.x-GH-4167-SNAPSHOT ../pom.xml diff --git a/spring-data-mongodb-distribution/pom.xml b/spring-data-mongodb-distribution/pom.xml index 0412911f82..007a78ba81 100644 --- a/spring-data-mongodb-distribution/pom.xml +++ b/spring-data-mongodb-distribution/pom.xml @@ -15,7 +15,7 @@ org.springframework.data spring-data-mongodb-parent - 4.0.0-SNAPSHOT + 4.0.x-GH-4167-SNAPSHOT ../pom.xml diff --git a/spring-data-mongodb/pom.xml b/spring-data-mongodb/pom.xml index cb8c76ade4..0bbc20fa63 100644 --- a/spring-data-mongodb/pom.xml +++ b/spring-data-mongodb/pom.xml @@ -13,7 +13,7 @@ org.springframework.data spring-data-mongodb-parent - 4.0.0-SNAPSHOT + 4.0.x-GH-4167-SNAPSHOT ../pom.xml diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java index e8fad963c7..9af3b5518f 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java @@ -1894,7 +1894,11 @@ public Flux> changeStream(@Nullable String database, @N publisher = filter.isEmpty() ? db.watch(Document.class) : db.watch(filter, Document.class); } - publisher = options.getResumeToken().map(BsonValue::asDocument).map(publisher::resumeAfter).orElse(publisher); + if(options.isResumeAfter()) { + publisher = options.getResumeToken().map(BsonValue::asDocument).map(publisher::resumeAfter).orElse(publisher); + } else if (options.isStartAfter()) { + publisher = options.getResumeToken().map(BsonValue::asDocument).map(publisher::startAfter).orElse(publisher); + } publisher = options.getCollation().map(Collation::toMongoCollation).map(publisher::collation) .orElse(publisher); publisher = options.getResumeBsonTimestamp().map(publisher::startAtOperationTime).orElse(publisher); diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java index 0d9bca468c..b6fedd86e3 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java @@ -39,6 +39,8 @@ import java.util.stream.Collectors; import org.assertj.core.api.Assertions; +import org.bson.BsonDocument; +import org.bson.BsonString; import org.bson.Document; import org.bson.conversions.Bson; import org.bson.types.ObjectId; @@ -111,6 +113,7 @@ import com.mongodb.client.result.InsertOneResult; import com.mongodb.client.result.UpdateResult; import com.mongodb.reactivestreams.client.AggregatePublisher; +import com.mongodb.reactivestreams.client.ChangeStreamPublisher; import com.mongodb.reactivestreams.client.DistinctPublisher; import com.mongodb.reactivestreams.client.FindPublisher; import com.mongodb.reactivestreams.client.MapReducePublisher; @@ -146,6 +149,7 @@ public class ReactiveMongoTemplateUnitTests { @Mock DistinctPublisher distinctPublisher; @Mock Publisher deletePublisher; @Mock MapReducePublisher mapReducePublisher; + @Mock ChangeStreamPublisher changeStreamPublisher; private MongoExceptionTranslator exceptionTranslator = new MongoExceptionTranslator(); private MappingMongoConverter converter; @@ -1485,6 +1489,22 @@ void createCollectionShouldSetUpTimeSeries() { .granularity(TimeSeriesGranularity.HOURS).toString()); } + @Test // GH-4167 + void changeStreamOptionStartAftershouldApplied() { + + when(factory.getMongoDatabase(anyString())).thenReturn(Mono.just(db)); + + when(collection.watch(any(Class.class))).thenReturn(changeStreamPublisher); + when(changeStreamPublisher.batchSize(anyInt())).thenReturn(changeStreamPublisher); + when(changeStreamPublisher.startAfter(any())).thenReturn(changeStreamPublisher); + when(changeStreamPublisher.fullDocument(any())).thenReturn(changeStreamPublisher); + + BsonDocument token = new BsonDocument("token", new BsonString("id")); + template.changeStream("database", "collection", ChangeStreamOptions.builder().startAfter(token).build(), Object.class).subscribe(); + + verify(changeStreamPublisher).startAfter(eq(token)); + } + private void stubFindSubscribe(Document document) { Publisher realPublisher = Flux.just(document);