Skip to content

Commit 9246d33

Browse files
christophstroblmp911de
authored andcommitted
DATAMONGO-2115 - Add support for BsonTimestamp when resuming Change Streams.
Original pull request: #624.
1 parent 6e3e3e4 commit 9246d33

File tree

6 files changed

+119
-10
lines changed

6 files changed

+119
-10
lines changed

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamEvent.java

+14-2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.time.Instant;
2121
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
2222

23+
import org.bson.BsonTimestamp;
2324
import org.bson.BsonValue;
2425
import org.bson.Document;
2526
import org.springframework.data.mongodb.core.convert.MongoConverter;
@@ -84,8 +85,19 @@ public ChangeStreamDocument<Document> getRaw() {
8485
@Nullable
8586
public Instant getTimestamp() {
8687

87-
return raw != null && raw.getClusterTime() != null
88-
? converter.getConversionService().convert(raw.getClusterTime(), Instant.class) : null;
88+
return getBsonTimestamp() != null ? converter.getConversionService().convert(raw.getClusterTime(), Instant.class)
89+
: null;
90+
}
91+
92+
/**
93+
* Get the {@link ChangeStreamDocument#getClusterTime() cluster time}.
94+
*
95+
* @return can be {@literal null}.
96+
* @since 2.2
97+
*/
98+
@Nullable
99+
public BsonTimestamp getBsonTimestamp() {
100+
return raw != null ? raw.getClusterTime() : null;
89101
}
90102

91103
/**

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamOptions.java

+56-3
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,15 @@
2121
import java.util.Arrays;
2222
import java.util.Optional;
2323

24+
import org.bson.BsonTimestamp;
2425
import org.bson.BsonValue;
2526
import org.bson.Document;
2627
import org.springframework.data.mongodb.core.aggregation.Aggregation;
2728
import org.springframework.data.mongodb.core.query.Collation;
2829
import org.springframework.lang.Nullable;
2930
import org.springframework.util.Assert;
31+
import org.springframework.util.ClassUtils;
32+
import org.springframework.util.ObjectUtils;
3033

3134
import com.mongodb.client.model.changestream.ChangeStreamDocument;
3235
import com.mongodb.client.model.changestream.FullDocument;
@@ -47,7 +50,7 @@ public class ChangeStreamOptions {
4750
private @Nullable BsonValue resumeToken;
4851
private @Nullable FullDocument fullDocumentLookup;
4952
private @Nullable Collation collation;
50-
private @Nullable Instant resumeTimestamp;
53+
private @Nullable Object resumeTimestamp;
5154

5255
protected ChangeStreamOptions() {}
5356

@@ -83,7 +86,15 @@ public Optional<Collation> getCollation() {
8386
* @return {@link Optional#empty()} if not set.
8487
*/
8588
public Optional<Instant> getResumeTimestamp() {
86-
return Optional.ofNullable(resumeTimestamp);
89+
return Optional.ofNullable(resumeTimestamp).map(this::asInstant);
90+
}
91+
92+
/**
93+
* @return {@link Optional#empty()} if not set.
94+
* @since 2.2
95+
*/
96+
public Optional<BsonTimestamp> getResumeBsonTimestamp() {
97+
return Optional.ofNullable(resumeTimestamp).map(this::asBsonTimestamp);
8798
}
8899

89100
/**
@@ -103,6 +114,33 @@ public static ChangeStreamOptionsBuilder builder() {
103114
return new ChangeStreamOptionsBuilder();
104115
}
105116

117+
private Instant asInstant(Object timestamp) {
118+
return asTimestampOfType(timestamp, Instant.class);
119+
}
120+
121+
private BsonTimestamp asBsonTimestamp(Object timestamp) {
122+
return asTimestampOfType(timestamp, BsonTimestamp.class);
123+
}
124+
125+
private <T> T asTimestampOfType(Object timestamp, Class<T> targetType) {
126+
127+
if (ClassUtils.isAssignableValue(targetType, timestamp)) {
128+
return (T) timestamp;
129+
}
130+
131+
if (timestamp instanceof Instant) {
132+
return (T) new BsonTimestamp((int) ((Instant) timestamp).getEpochSecond(), 0);
133+
}
134+
135+
if (timestamp instanceof BsonTimestamp) {
136+
return (T) Instant.ofEpochSecond(((BsonTimestamp) timestamp).getTime());
137+
}
138+
139+
throw new IllegalArgumentException(
140+
"o_O that should actually not happen. The timestampt should be an Instant or a BsonTimestamp but was "
141+
+ ObjectUtils.nullSafeClassName(timestamp));
142+
}
143+
106144
/**
107145
* Builder for creating {@link ChangeStreamOptions}.
108146
*
@@ -115,7 +153,7 @@ public static class ChangeStreamOptionsBuilder {
115153
private @Nullable BsonValue resumeToken;
116154
private @Nullable FullDocument fullDocumentLookup;
117155
private @Nullable Collation collation;
118-
private @Nullable Instant resumeTimestamp;
156+
private @Nullable Object resumeTimestamp;
119157

120158
private ChangeStreamOptionsBuilder() {}
121159

@@ -224,6 +262,21 @@ public ChangeStreamOptionsBuilder resumeAt(Instant resumeTimestamp) {
224262
return this;
225263
}
226264

265+
/**
266+
* Set the cluster time to resume from.
267+
*
268+
* @param resumeTimestamp must not be {@literal null}.
269+
* @return this.
270+
* @since 2.2
271+
*/
272+
public ChangeStreamOptionsBuilder resumeAt(BsonTimestamp resumeTimestamp) {
273+
274+
Assert.notNull(resumeTimestamp, "ResumeTimestamp must not be null!");
275+
276+
this.resumeTimestamp = resumeTimestamp;
277+
return this;
278+
}
279+
227280
/**
228281
* @return the built {@link ChangeStreamOptions}
229282
*/

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import java.util.function.Function;
3232
import java.util.stream.Collectors;
3333

34-
import org.bson.BsonTimestamp;
3534
import org.bson.BsonValue;
3635
import org.bson.Document;
3736
import org.bson.codecs.Codec;
@@ -1927,8 +1926,7 @@ public <T> Flux<ChangeStreamEvent<T>> changeStream(@Nullable String database, @N
19271926

19281927
publisher = options.getResumeToken().map(BsonValue::asDocument).map(publisher::resumeAfter).orElse(publisher);
19291928
publisher = options.getCollation().map(Collation::toMongoCollation).map(publisher::collation).orElse(publisher);
1930-
publisher = options.getResumeTimestamp().map(it -> new BsonTimestamp((int) it.getEpochSecond(), 0))
1931-
.map(publisher::startAtOperationTime).orElse(publisher);
1929+
publisher = options.getResumeBsonTimestamp().map(publisher::startAtOperationTime).orElse(publisher);
19321930
publisher = publisher.fullDocument(options.getFullDocumentLookup().orElse(fullDocument));
19331931

19341932
return Flux.from(publisher).map(document -> new ChangeStreamEvent<>(document, targetType, getConverter()));

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,7 @@ protected MongoCursor<ChangeStreamDocument<Document>> initCursor(MongoTemplate t
115115
.orElseGet(() -> ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT
116116
: FullDocument.UPDATE_LOOKUP);
117117

118-
startAt = changeStreamOptions.getResumeTimestamp().map(it -> new BsonTimestamp((int) it.getEpochSecond(), 0))
119-
.orElse(null);
118+
startAt = changeStreamOptions.getResumeBsonTimestamp().orElse(null);
120119
}
121120

122121
MongoDatabase db = StringUtils.hasText(options.getDatabaseName())

spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateTests.java

+44
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import lombok.Data;
2525
import lombok.NoArgsConstructor;
2626
import lombok.experimental.Wither;
27+
import org.bson.BsonTimestamp;
2728
import reactor.core.Disposable;
2829
import reactor.core.publisher.Flux;
2930
import reactor.core.publisher.Mono;
@@ -1399,6 +1400,49 @@ public void resumesAtTimestampCorrectly() throws InterruptedException {
13991400
}
14001401
}
14011402

1403+
@Test // DATAMONGO-2115
1404+
public void resumesAtBsonTimestampCorrectly() throws InterruptedException {
1405+
1406+
Assumptions.assumeThat(ReplicaSet.required().runsAsReplicaSet()).isTrue();
1407+
1408+
StepVerifier.create(template.createCollection(Person.class)).expectNextCount(1).verifyComplete();
1409+
1410+
BlockingQueue<ChangeStreamEvent<Person>> documents = new LinkedBlockingQueue<>(100);
1411+
Disposable disposable = template.changeStream("person", ChangeStreamOptions.empty(), Person.class)
1412+
.doOnNext(documents::add).subscribe();
1413+
1414+
Thread.sleep(500); // just give it some time to link to the collection.
1415+
1416+
Person person1 = new Person("Spring", 38);
1417+
Person person2 = new Person("Data", 37);
1418+
Person person3 = new Person("MongoDB", 39);
1419+
1420+
StepVerifier.create(template.save(person1).delayElement(Duration.ofSeconds(1))).expectNextCount(1).verifyComplete();
1421+
StepVerifier.create(template.save(person2)).expectNextCount(1).verifyComplete();
1422+
1423+
Thread.sleep(500); // just give it some time to link receive all events
1424+
1425+
disposable.dispose();
1426+
1427+
documents.take(); // skip first
1428+
BsonTimestamp resumeAt = documents.take().getBsonTimestamp(); // take 2nd
1429+
1430+
StepVerifier.create(template.save(person3)).expectNextCount(1).verifyComplete();
1431+
1432+
BlockingQueue<ChangeStreamEvent<Person>> resumeDocuments = new LinkedBlockingQueue<>(100);
1433+
template.changeStream("person", ChangeStreamOptions.builder().resumeAt(resumeAt).build(), Person.class)
1434+
.doOnNext(resumeDocuments::add).subscribe();
1435+
1436+
Thread.sleep(500); // just give it some time to link receive all events
1437+
1438+
try {
1439+
Assertions.assertThat(resumeDocuments.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList()))
1440+
.containsExactly(person2, person3);
1441+
} finally {
1442+
disposable.dispose();
1443+
}
1444+
}
1445+
14021446
private PersonWithAList createPersonWithAList(String firstname, int age) {
14031447

14041448
PersonWithAList p = new PersonWithAList();

src/main/asciidoc/reference/change-streams.adoc

+3
Original file line numberDiff line numberDiff line change
@@ -81,3 +81,6 @@ Flux<ChangeStreamEvent<Person>> resumed = template.changeStream("person", option
8181
<1> You may obtain the server time of an `ChangeStreamEvent` through the `getTimestamp` method or use the `resumeToken`
8282
exposed through `getResumeToken`.
8383
====
84+
85+
TIP: In some cases an `Instant` might not be a precise enough measure when resuming a Change Stream. Use a MongoDB native
86+
https://docs.mongodb.com/manual/reference/bson-types/#timestamps[BsonTimestamp] for that purpose.

0 commit comments

Comments
 (0)