Skip to content

Commit da5f249

Browse files
christophstroblmp911de
authored andcommitted
Fix usage of change stream option startAfter.
We now make sure to apply the token to startAfter method of the driver. Before this change it had been incorrectly applied to resumeAfter. Closes #4167. Original pull request: #4168.
1 parent 7c7b05f commit da5f249

File tree

2 files changed

+25
-1
lines changed

2 files changed

+25
-1
lines changed

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -2141,7 +2141,11 @@ public <T> Flux<ChangeStreamEvent<T>> changeStream(@Nullable String database, @N
21412141
publisher = filter.isEmpty() ? db.watch(Document.class) : db.watch(filter, Document.class);
21422142
}
21432143

2144-
publisher = options.getResumeToken().map(BsonValue::asDocument).map(publisher::resumeAfter).orElse(publisher);
2144+
if(options.isResumeAfter()) {
2145+
publisher = options.getResumeToken().map(BsonValue::asDocument).map(publisher::resumeAfter).orElse(publisher);
2146+
} else if (options.isStartAfter()) {
2147+
publisher = options.getResumeToken().map(BsonValue::asDocument).map(publisher::startAfter).orElse(publisher);
2148+
}
21452149
publisher = options.getCollation().map(Collation::toMongoCollation).map(publisher::collation)
21462150
.orElse(publisher);
21472151
publisher = options.getResumeBsonTimestamp().map(publisher::startAtOperationTime).orElse(publisher);

spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java

+20
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
import java.util.stream.Collectors;
4040

4141
import org.assertj.core.api.Assertions;
42+
import org.bson.BsonDocument;
43+
import org.bson.BsonString;
4244
import org.bson.Document;
4345
import org.bson.conversions.Bson;
4446
import org.bson.types.ObjectId;
@@ -110,6 +112,7 @@
110112
import com.mongodb.client.result.InsertOneResult;
111113
import com.mongodb.client.result.UpdateResult;
112114
import com.mongodb.reactivestreams.client.AggregatePublisher;
115+
import com.mongodb.reactivestreams.client.ChangeStreamPublisher;
113116
import com.mongodb.reactivestreams.client.DistinctPublisher;
114117
import com.mongodb.reactivestreams.client.FindPublisher;
115118
import com.mongodb.reactivestreams.client.MapReducePublisher;
@@ -145,6 +148,7 @@ public class ReactiveMongoTemplateUnitTests {
145148
@Mock DistinctPublisher distinctPublisher;
146149
@Mock Publisher deletePublisher;
147150
@Mock MapReducePublisher mapReducePublisher;
151+
@Mock ChangeStreamPublisher changeStreamPublisher;
148152

149153
private MongoExceptionTranslator exceptionTranslator = new MongoExceptionTranslator();
150154
private MappingMongoConverter converter;
@@ -1455,6 +1459,22 @@ void createCollectionShouldSetUpTimeSeries() {
14551459
.granularity(TimeSeriesGranularity.HOURS).toString());
14561460
}
14571461

1462+
@Test // GH-4167
1463+
void changeStreamOptionStartAftershouldApplied() {
1464+
1465+
when(factory.getMongoDatabase(anyString())).thenReturn(Mono.just(db));
1466+
1467+
when(collection.watch(any(Class.class))).thenReturn(changeStreamPublisher);
1468+
when(changeStreamPublisher.batchSize(anyInt())).thenReturn(changeStreamPublisher);
1469+
when(changeStreamPublisher.startAfter(any())).thenReturn(changeStreamPublisher);
1470+
when(changeStreamPublisher.fullDocument(any())).thenReturn(changeStreamPublisher);
1471+
1472+
BsonDocument token = new BsonDocument("token", new BsonString("id"));
1473+
template.changeStream("database", "collection", ChangeStreamOptions.builder().startAfter(token).build(), Object.class).subscribe();
1474+
1475+
verify(changeStreamPublisher).startAfter(eq(token));
1476+
}
1477+
14581478
private void stubFindSubscribe(Document document) {
14591479

14601480
Publisher<Document> realPublisher = Flux.just(document);

0 commit comments

Comments
 (0)