diff --git a/pom.xml b/pom.xml
index a401249d79..24f6bafc9f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
org.springframework.data
spring-data-mongodb-parent
- 4.0.0-SNAPSHOT
+ 4.0.x-GH-4167-SNAPSHOT
pom
Spring Data MongoDB
diff --git a/spring-data-mongodb-benchmarks/pom.xml b/spring-data-mongodb-benchmarks/pom.xml
index c28a240d2c..64d7d599ea 100644
--- a/spring-data-mongodb-benchmarks/pom.xml
+++ b/spring-data-mongodb-benchmarks/pom.xml
@@ -7,7 +7,7 @@
org.springframework.data
spring-data-mongodb-parent
- 4.0.0-SNAPSHOT
+ 4.0.x-GH-4167-SNAPSHOT
../pom.xml
diff --git a/spring-data-mongodb-distribution/pom.xml b/spring-data-mongodb-distribution/pom.xml
index 0412911f82..007a78ba81 100644
--- a/spring-data-mongodb-distribution/pom.xml
+++ b/spring-data-mongodb-distribution/pom.xml
@@ -15,7 +15,7 @@
org.springframework.data
spring-data-mongodb-parent
- 4.0.0-SNAPSHOT
+ 4.0.x-GH-4167-SNAPSHOT
../pom.xml
diff --git a/spring-data-mongodb/pom.xml b/spring-data-mongodb/pom.xml
index cb8c76ade4..0bbc20fa63 100644
--- a/spring-data-mongodb/pom.xml
+++ b/spring-data-mongodb/pom.xml
@@ -13,7 +13,7 @@
org.springframework.data
spring-data-mongodb-parent
- 4.0.0-SNAPSHOT
+ 4.0.x-GH-4167-SNAPSHOT
../pom.xml
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java
index e8fad963c7..9af3b5518f 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java
@@ -1894,7 +1894,11 @@ public Flux> changeStream(@Nullable String database, @N
publisher = filter.isEmpty() ? db.watch(Document.class) : db.watch(filter, Document.class);
}
- publisher = options.getResumeToken().map(BsonValue::asDocument).map(publisher::resumeAfter).orElse(publisher);
+ if(options.isResumeAfter()) {
+ publisher = options.getResumeToken().map(BsonValue::asDocument).map(publisher::resumeAfter).orElse(publisher);
+ } else if (options.isStartAfter()) {
+ publisher = options.getResumeToken().map(BsonValue::asDocument).map(publisher::startAfter).orElse(publisher);
+ }
publisher = options.getCollation().map(Collation::toMongoCollation).map(publisher::collation)
.orElse(publisher);
publisher = options.getResumeBsonTimestamp().map(publisher::startAtOperationTime).orElse(publisher);
diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java
index 0d9bca468c..b6fedd86e3 100644
--- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java
+++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java
@@ -39,6 +39,8 @@
import java.util.stream.Collectors;
import org.assertj.core.api.Assertions;
+import org.bson.BsonDocument;
+import org.bson.BsonString;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.bson.types.ObjectId;
@@ -111,6 +113,7 @@
import com.mongodb.client.result.InsertOneResult;
import com.mongodb.client.result.UpdateResult;
import com.mongodb.reactivestreams.client.AggregatePublisher;
+import com.mongodb.reactivestreams.client.ChangeStreamPublisher;
import com.mongodb.reactivestreams.client.DistinctPublisher;
import com.mongodb.reactivestreams.client.FindPublisher;
import com.mongodb.reactivestreams.client.MapReducePublisher;
@@ -146,6 +149,7 @@ public class ReactiveMongoTemplateUnitTests {
@Mock DistinctPublisher distinctPublisher;
@Mock Publisher deletePublisher;
@Mock MapReducePublisher mapReducePublisher;
+ @Mock ChangeStreamPublisher changeStreamPublisher;
private MongoExceptionTranslator exceptionTranslator = new MongoExceptionTranslator();
private MappingMongoConverter converter;
@@ -1485,6 +1489,22 @@ void createCollectionShouldSetUpTimeSeries() {
.granularity(TimeSeriesGranularity.HOURS).toString());
}
+ @Test // GH-4167
+ void changeStreamOptionStartAftershouldApplied() {
+
+ when(factory.getMongoDatabase(anyString())).thenReturn(Mono.just(db));
+
+ when(collection.watch(any(Class.class))).thenReturn(changeStreamPublisher);
+ when(changeStreamPublisher.batchSize(anyInt())).thenReturn(changeStreamPublisher);
+ when(changeStreamPublisher.startAfter(any())).thenReturn(changeStreamPublisher);
+ when(changeStreamPublisher.fullDocument(any())).thenReturn(changeStreamPublisher);
+
+ BsonDocument token = new BsonDocument("token", new BsonString("id"));
+ template.changeStream("database", "collection", ChangeStreamOptions.builder().startAfter(token).build(), Object.class).subscribe();
+
+ verify(changeStreamPublisher).startAfter(eq(token));
+ }
+
private void stubFindSubscribe(Document document) {
Publisher realPublisher = Flux.just(document);