Skip to content

Commit fcd6187

Browse files
christophstroblmp911de
authored andcommitted
Fix usage of change stream option startAfter.
We now make sure to apply the token to startAfter method of the driver. Before this change it had been incorrectly applied to resumeAfter. Closes #4167. Original pull request: #4168.
1 parent 62bffc0 commit fcd6187

File tree

2 files changed

+25
-1
lines changed

2 files changed

+25
-1
lines changed

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -2221,7 +2221,11 @@ public <T> Flux<ChangeStreamEvent<T>> changeStream(@Nullable String database, @N
22212221
publisher = filter.isEmpty() ? db.watch(Document.class) : db.watch(filter, Document.class);
22222222
}
22232223

2224-
publisher = options.getResumeToken().map(BsonValue::asDocument).map(publisher::resumeAfter).orElse(publisher);
2224+
if(options.isResumeAfter()) {
2225+
publisher = options.getResumeToken().map(BsonValue::asDocument).map(publisher::resumeAfter).orElse(publisher);
2226+
} else if (options.isStartAfter()) {
2227+
publisher = options.getResumeToken().map(BsonValue::asDocument).map(publisher::startAfter).orElse(publisher);
2228+
}
22252229
publisher = options.getCollation().map(Collation::toMongoCollation).map(publisher::collation)
22262230
.orElse(publisher);
22272231
publisher = options.getResumeBsonTimestamp().map(publisher::startAtOperationTime).orElse(publisher);

spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java

+20
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
import java.util.stream.Collectors;
4040

4141
import org.assertj.core.api.Assertions;
42+
import org.bson.BsonDocument;
43+
import org.bson.BsonString;
4244
import org.bson.Document;
4345
import org.bson.conversions.Bson;
4446
import org.bson.types.ObjectId;
@@ -111,6 +113,7 @@
111113
import com.mongodb.client.result.InsertOneResult;
112114
import com.mongodb.client.result.UpdateResult;
113115
import com.mongodb.reactivestreams.client.AggregatePublisher;
116+
import com.mongodb.reactivestreams.client.ChangeStreamPublisher;
114117
import com.mongodb.reactivestreams.client.DistinctPublisher;
115118
import com.mongodb.reactivestreams.client.FindPublisher;
116119
import com.mongodb.reactivestreams.client.MapReducePublisher;
@@ -146,6 +149,7 @@ public class ReactiveMongoTemplateUnitTests {
146149
@Mock DistinctPublisher distinctPublisher;
147150
@Mock Publisher deletePublisher;
148151
@Mock MapReducePublisher mapReducePublisher;
152+
@Mock ChangeStreamPublisher changeStreamPublisher;
149153

150154
private MongoExceptionTranslator exceptionTranslator = new MongoExceptionTranslator();
151155
private MappingMongoConverter converter;
@@ -1485,6 +1489,22 @@ void createCollectionShouldSetUpTimeSeries() {
14851489
.granularity(TimeSeriesGranularity.HOURS).toString());
14861490
}
14871491

1492+
@Test // GH-4167
1493+
void changeStreamOptionStartAftershouldApplied() {
1494+
1495+
when(factory.getMongoDatabase(anyString())).thenReturn(Mono.just(db));
1496+
1497+
when(collection.watch(any(Class.class))).thenReturn(changeStreamPublisher);
1498+
when(changeStreamPublisher.batchSize(anyInt())).thenReturn(changeStreamPublisher);
1499+
when(changeStreamPublisher.startAfter(any())).thenReturn(changeStreamPublisher);
1500+
when(changeStreamPublisher.fullDocument(any())).thenReturn(changeStreamPublisher);
1501+
1502+
BsonDocument token = new BsonDocument("token", new BsonString("id"));
1503+
template.changeStream("database", "collection", ChangeStreamOptions.builder().startAfter(token).build(), Object.class).subscribe();
1504+
1505+
verify(changeStreamPublisher).startAfter(eq(token));
1506+
}
1507+
14881508
private void stubFindSubscribe(Document document) {
14891509

14901510
Publisher<Document> realPublisher = Flux.just(document);

0 commit comments

Comments
 (0)