diff --git a/pom.xml b/pom.xml
index 28dc5c96c1..3bc9648f18 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
org.springframework.data
spring-data-mongodb-parent
- 2.2.0.BUILD-SNAPSHOT
+ 2.2.0.DATAMONGO-2115-SNAPSHOT
pom
Spring Data MongoDB
diff --git a/spring-data-mongodb-benchmarks/pom.xml b/spring-data-mongodb-benchmarks/pom.xml
index c2ff37b35c..fd5c06d1cf 100644
--- a/spring-data-mongodb-benchmarks/pom.xml
+++ b/spring-data-mongodb-benchmarks/pom.xml
@@ -7,7 +7,7 @@
org.springframework.data
spring-data-mongodb-parent
- 2.2.0.BUILD-SNAPSHOT
+ 2.2.0.DATAMONGO-2115-SNAPSHOT
../pom.xml
diff --git a/spring-data-mongodb-cross-store/pom.xml b/spring-data-mongodb-cross-store/pom.xml
index fd36f227c0..7911320f78 100644
--- a/spring-data-mongodb-cross-store/pom.xml
+++ b/spring-data-mongodb-cross-store/pom.xml
@@ -6,7 +6,7 @@
org.springframework.data
spring-data-mongodb-parent
- 2.2.0.BUILD-SNAPSHOT
+ 2.2.0.DATAMONGO-2115-SNAPSHOT
../pom.xml
@@ -50,7 +50,7 @@
org.springframework.data
spring-data-mongodb
- 2.2.0.BUILD-SNAPSHOT
+ 2.2.0.DATAMONGO-2115-SNAPSHOT
diff --git a/spring-data-mongodb-distribution/pom.xml b/spring-data-mongodb-distribution/pom.xml
index cb441dd8ef..4dc5282953 100644
--- a/spring-data-mongodb-distribution/pom.xml
+++ b/spring-data-mongodb-distribution/pom.xml
@@ -13,7 +13,7 @@
org.springframework.data
spring-data-mongodb-parent
- 2.2.0.BUILD-SNAPSHOT
+ 2.2.0.DATAMONGO-2115-SNAPSHOT
../pom.xml
diff --git a/spring-data-mongodb/pom.xml b/spring-data-mongodb/pom.xml
index f3c85a046a..f8abca6954 100644
--- a/spring-data-mongodb/pom.xml
+++ b/spring-data-mongodb/pom.xml
@@ -11,7 +11,7 @@
org.springframework.data
spring-data-mongodb-parent
- 2.2.0.BUILD-SNAPSHOT
+ 2.2.0.DATAMONGO-2115-SNAPSHOT
../pom.xml
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamEvent.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamEvent.java
index 8f661c5af2..2702719fd1 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamEvent.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamEvent.java
@@ -20,6 +20,7 @@
import java.time.Instant;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.bson.BsonTimestamp;
import org.bson.BsonValue;
import org.bson.Document;
import org.springframework.data.mongodb.core.convert.MongoConverter;
@@ -84,8 +85,19 @@ public ChangeStreamDocument getRaw() {
@Nullable
public Instant getTimestamp() {
- return raw != null && raw.getClusterTime() != null
- ? converter.getConversionService().convert(raw.getClusterTime(), Instant.class) : null;
+ return getBsonTimestamp() != null ? converter.getConversionService().convert(raw.getClusterTime(), Instant.class)
+ : null;
+ }
+
+ /**
+ * Get the {@link ChangeStreamDocument#getClusterTime() cluster time}.
+ *
+ * @return can be {@literal null}.
+ * @since 2.2
+ */
+ @Nullable
+ public BsonTimestamp getBsonTimestamp() {
+ return raw != null ? raw.getClusterTime() : null;
}
/**
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamOptions.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamOptions.java
index 42b0de8c1a..552a088c5b 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamOptions.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamOptions.java
@@ -21,12 +21,15 @@
import java.util.Arrays;
import java.util.Optional;
+import org.bson.BsonTimestamp;
import org.bson.BsonValue;
import org.bson.Document;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.query.Collation;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
+import org.springframework.util.ClassUtils;
+import org.springframework.util.ObjectUtils;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
@@ -47,7 +50,7 @@ public class ChangeStreamOptions {
private @Nullable BsonValue resumeToken;
private @Nullable FullDocument fullDocumentLookup;
private @Nullable Collation collation;
- private @Nullable Instant resumeTimestamp;
+ private @Nullable Object resumeTimestamp;
protected ChangeStreamOptions() {}
@@ -83,7 +86,15 @@ public Optional getCollation() {
* @return {@link Optional#empty()} if not set.
*/
public Optional getResumeTimestamp() {
- return Optional.ofNullable(resumeTimestamp);
+ return Optional.ofNullable(resumeTimestamp).map(this::asInstant);
+ }
+
+ /**
+ * @return {@link Optional#empty()} if not set.
+ * @since 2.2
+ */
+ public Optional getResumeBsonTimestamp() {
+ return Optional.ofNullable(resumeTimestamp).map(this::asBsonTimestamp);
}
/**
@@ -103,6 +114,33 @@ public static ChangeStreamOptionsBuilder builder() {
return new ChangeStreamOptionsBuilder();
}
+ private Instant asInstant(Object timestamp) {
+ return asTimestampOfType(timestamp, Instant.class);
+ }
+
+ private BsonTimestamp asBsonTimestamp(Object timestamp) {
+ return asTimestampOfType(timestamp, BsonTimestamp.class);
+ }
+
+ private T asTimestampOfType(Object timestamp, Class targetType) {
+
+ if (ClassUtils.isAssignableValue(targetType, timestamp)) {
+ return (T) timestamp;
+ }
+
+ if (timestamp instanceof Instant) {
+ return (T) new BsonTimestamp((int) ((Instant) timestamp).getEpochSecond(), 0);
+ }
+
+ if (timestamp instanceof BsonTimestamp) {
+ return (T) Instant.ofEpochSecond(((BsonTimestamp) timestamp).getTime());
+ }
+
+ throw new IllegalArgumentException(
+ "o_O that should actually not happen. The timestampt should be an Instant or a BsonTimestamp but was "
+ + ObjectUtils.nullSafeClassName(timestamp));
+ }
+
/**
* Builder for creating {@link ChangeStreamOptions}.
*
@@ -115,7 +153,7 @@ public static class ChangeStreamOptionsBuilder {
private @Nullable BsonValue resumeToken;
private @Nullable FullDocument fullDocumentLookup;
private @Nullable Collation collation;
- private @Nullable Instant resumeTimestamp;
+ private @Nullable Object resumeTimestamp;
private ChangeStreamOptionsBuilder() {}
@@ -224,6 +262,21 @@ public ChangeStreamOptionsBuilder resumeAt(Instant resumeTimestamp) {
return this;
}
+ /**
+ * Set the cluster time to resume from.
+ *
+ * @param resumeTimestamp must not be {@literal null}.
+ * @return this.
+ * @since 2.2
+ */
+ public ChangeStreamOptionsBuilder resumeAt(BsonTimestamp resumeTimestamp) {
+
+ Assert.notNull(resumeTimestamp, "ResumeTimestamp must not be null!");
+
+ this.resumeTimestamp = resumeTimestamp;
+ return this;
+ }
+
/**
* @return the built {@link ChangeStreamOptions}
*/
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java
index e5826ec663..0ea20fbf82 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java
@@ -31,7 +31,6 @@
import java.util.function.Function;
import java.util.stream.Collectors;
-import org.bson.BsonTimestamp;
import org.bson.BsonValue;
import org.bson.Document;
import org.bson.codecs.Codec;
@@ -1927,8 +1926,7 @@ public Flux> changeStream(@Nullable String database, @N
publisher = options.getResumeToken().map(BsonValue::asDocument).map(publisher::resumeAfter).orElse(publisher);
publisher = options.getCollation().map(Collation::toMongoCollation).map(publisher::collation).orElse(publisher);
- publisher = options.getResumeTimestamp().map(it -> new BsonTimestamp((int) it.getEpochSecond(), 0))
- .map(publisher::startAtOperationTime).orElse(publisher);
+ publisher = options.getResumeBsonTimestamp().map(publisher::startAtOperationTime).orElse(publisher);
publisher = publisher.fullDocument(options.getFullDocumentLookup().orElse(fullDocument));
return Flux.from(publisher).map(document -> new ChangeStreamEvent<>(document, targetType, getConverter()));
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java
index 5d0ff694d7..7daae31c2b 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java
@@ -115,8 +115,7 @@ protected MongoCursor> initCursor(MongoTemplate t
.orElseGet(() -> ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT
: FullDocument.UPDATE_LOOKUP);
- startAt = changeStreamOptions.getResumeTimestamp().map(it -> new BsonTimestamp((int) it.getEpochSecond(), 0))
- .orElse(null);
+ startAt = changeStreamOptions.getResumeBsonTimestamp().orElse(null);
}
MongoDatabase db = StringUtils.hasText(options.getDatabaseName())
diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateTests.java
index 22fc49317d..660daa239d 100644
--- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateTests.java
+++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateTests.java
@@ -24,6 +24,7 @@
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Wither;
+import org.bson.BsonTimestamp;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -1399,6 +1400,49 @@ public void resumesAtTimestampCorrectly() throws InterruptedException {
}
}
+ @Test // DATAMONGO-2115
+ public void resumesAtBsonTimestampCorrectly() throws InterruptedException {
+
+ Assumptions.assumeThat(ReplicaSet.required().runsAsReplicaSet()).isTrue();
+
+ StepVerifier.create(template.createCollection(Person.class)).expectNextCount(1).verifyComplete();
+
+ BlockingQueue> documents = new LinkedBlockingQueue<>(100);
+ Disposable disposable = template.changeStream("person", ChangeStreamOptions.empty(), Person.class)
+ .doOnNext(documents::add).subscribe();
+
+ Thread.sleep(500); // just give it some time to link to the collection.
+
+ Person person1 = new Person("Spring", 38);
+ Person person2 = new Person("Data", 37);
+ Person person3 = new Person("MongoDB", 39);
+
+ StepVerifier.create(template.save(person1).delayElement(Duration.ofSeconds(1))).expectNextCount(1).verifyComplete();
+ StepVerifier.create(template.save(person2)).expectNextCount(1).verifyComplete();
+
+ Thread.sleep(500); // just give it some time to link receive all events
+
+ disposable.dispose();
+
+ documents.take(); // skip first
+ BsonTimestamp resumeAt = documents.take().getBsonTimestamp(); // take 2nd
+
+ StepVerifier.create(template.save(person3)).expectNextCount(1).verifyComplete();
+
+ BlockingQueue> resumeDocuments = new LinkedBlockingQueue<>(100);
+ template.changeStream("person", ChangeStreamOptions.builder().resumeAt(resumeAt).build(), Person.class)
+ .doOnNext(resumeDocuments::add).subscribe();
+
+ Thread.sleep(500); // just give it some time to link receive all events
+
+ try {
+ Assertions.assertThat(resumeDocuments.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList()))
+ .containsExactly(person2, person3);
+ } finally {
+ disposable.dispose();
+ }
+ }
+
private PersonWithAList createPersonWithAList(String firstname, int age) {
PersonWithAList p = new PersonWithAList();
diff --git a/src/main/asciidoc/reference/change-streams.adoc b/src/main/asciidoc/reference/change-streams.adoc
index a31aad2a4d..c18bd0dd38 100644
--- a/src/main/asciidoc/reference/change-streams.adoc
+++ b/src/main/asciidoc/reference/change-streams.adoc
@@ -81,3 +81,6 @@ Flux> resumed = template.changeStream("person", option
<1> You may obtain the server time of an `ChangeStreamEvent` through the `getTimestamp` method or use the `resumeToken`
exposed through `getResumeToken`.
====
+
+TIP: In some cases an `Instant` might not be a precise enough measure when resuming a Change Stream. Use a MongoDB native
+https://docs.mongodb.com/manual/reference/bson-types/#timestamps[BsonTimestamp] for that purpose.