diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamOptions.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamOptions.java index 583d958686..bfed68cc25 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamOptions.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamOptions.java @@ -40,6 +40,7 @@ * * @author Christoph Strobl * @author Mark Paluch + * @author Tudor Marc * @since 2.1 */ public class ChangeStreamOptions { @@ -49,6 +50,7 @@ public class ChangeStreamOptions { private @Nullable FullDocument fullDocumentLookup; private @Nullable Collation collation; private @Nullable Object resumeTimestamp; + private @Nullable Object startAtOperationTime; private Resume resume = Resume.UNDEFINED; protected ChangeStreamOptions() {} @@ -96,6 +98,20 @@ public Optional getResumeBsonTimestamp() { return Optional.ofNullable(resumeTimestamp).map(timestamp -> asTimestampOfType(timestamp, BsonTimestamp.class)); } + /** + * @return {@link Optional#empty()} if not set. + */ + public Optional getStartAtOperationTime() { + return Optional.ofNullable(startAtOperationTime).map(timestamp -> asTimestampOfType(timestamp, Instant.class)); + } + + /** + * @return {@link Optional#empty()} if not set. + */ + public Optional getStartAtOperationTimeBson() { + return Optional.ofNullable(startAtOperationTime).map(timestamp -> asTimestampOfType(timestamp, BsonTimestamp.class)); + } + /** * @return {@literal true} if the change stream should be started after the {@link #getResumeToken() token}. * @since 2.2 @@ -176,6 +192,9 @@ public boolean equals(Object o) { if (!ObjectUtils.nullSafeEquals(this.resumeTimestamp, that.resumeTimestamp)) { return false; } + if (!ObjectUtils.nullSafeEquals(this.startAtOperationTime, that.startAtOperationTime)) { + return false; + } return resume == that.resume; } @@ -186,6 +205,7 @@ public int hashCode() { result = 31 * result + ObjectUtils.nullSafeHashCode(fullDocumentLookup); result = 31 * result + ObjectUtils.nullSafeHashCode(collation); result = 31 * result + ObjectUtils.nullSafeHashCode(resumeTimestamp); + result = 31 * result + ObjectUtils.nullSafeHashCode(startAtOperationTime); result = 31 * result + ObjectUtils.nullSafeHashCode(resume); return result; } @@ -213,6 +233,7 @@ enum Resume { * Builder for creating {@link ChangeStreamOptions}. * * @author Christoph Strobl + * @author Tudor Marc * @since 2.1 */ public static class ChangeStreamOptionsBuilder { @@ -222,6 +243,7 @@ public static class ChangeStreamOptionsBuilder { private @Nullable FullDocument fullDocumentLookup; private @Nullable Collation collation; private @Nullable Object resumeTimestamp; + private @Nullable Object startAtOperationTime; private Resume resume = Resume.UNDEFINED; private ChangeStreamOptionsBuilder() {} @@ -351,6 +373,34 @@ public ChangeStreamOptionsBuilder resumeAt(BsonTimestamp resumeTimestamp) { return this; } + /** + * Set the cluster startAtOperationTime to open the cursor at a particular point in time. + * + * @param startAtOperationTime must not be {@literal null}. + * @return this. + */ + public ChangeStreamOptionsBuilder startAtOperationTime(Instant startAtOperationTime) { + + Assert.notNull(startAtOperationTime, "startAtOperationTime must not be null!"); + + this.startAtOperationTime = startAtOperationTime; + return this; + } + + /** + * Set the cluster startAtOperationTime to open the cursor at a particular point in time. + * + * @param startAtOperationTime must not be {@literal null}. + * @return this. + */ + public ChangeStreamOptionsBuilder startAtOperationTime(BsonTimestamp startAtOperationTime) { + + Assert.notNull(startAtOperationTime, "startAtOperationTime must not be null!"); + + this.startAtOperationTime = startAtOperationTime; + return this; + } + /** * Set the resume token after which to continue emitting notifications. * @@ -393,6 +443,7 @@ public ChangeStreamOptions build() { options.fullDocumentLookup = this.fullDocumentLookup; options.collation = this.collation; options.resumeTimestamp = this.resumeTimestamp; + options.startAtOperationTime = this.startAtOperationTime; options.resume = this.resume; return options; diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupport.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupport.java index 438289ac9d..459734ad8f 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupport.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupport.java @@ -33,6 +33,7 @@ /** * @author Christoph Strobl + * @author Tudor Marc * @since 2.2 */ class ReactiveChangeStreamOperationSupport implements ReactiveChangeStreamOperation { @@ -178,6 +179,8 @@ private ChangeStreamOptionsBuilder initOptionsBuilder() { } else { options.getResumeTimestamp().ifPresent(builder::resumeAt); options.getResumeBsonTimestamp().ifPresent(builder::resumeAt); + options.getStartAtOperationTime().ifPresent(builder::startAtOperationTime); + options.getStartAtOperationTimeBson().ifPresent(builder::startAtOperationTime); } return builder; diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamRequest.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamRequest.java index 35b339d562..4037d1e070 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamRequest.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamRequest.java @@ -232,6 +232,7 @@ public Duration maxAwaitTime() { * Builder for creating {@link ChangeStreamRequest}. * * @author Christoph Strobl + * @author Tudor Marc * @since 2.1 * @see ChangeStreamOptions */ @@ -409,6 +410,22 @@ public ChangeStreamRequestBuilder startAfter(BsonValue resumeToken) { return this; } + /** + * Set the cluster startAtOperationTime to open the cursor at a particular point in time. + * + * @param clusterTime must not be {@literal null}. + * @return this. + * @see ChangeStreamOptions#getStartAtOperationTime() () + * @see ChangeStreamOptionsBuilder#startAtOperationTime(java.time.Instant) + */ + public ChangeStreamRequestBuilder startAtOperationTime(Instant clusterTime) { + + Assert.notNull(clusterTime, "clusterTime must not be null!"); + this.delegate.startAtOperationTime(clusterTime); + + return this; + } + /** * Set the {@link FullDocument} lookup to {@link FullDocument#UPDATE_LOOKUP}. * diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ChangeStreamOptionsUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ChangeStreamOptionsUnitTests.java index f4172ed30f..0acbf464f3 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ChangeStreamOptionsUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ChangeStreamOptionsUnitTests.java @@ -18,12 +18,14 @@ import static org.assertj.core.api.Assertions.*; import org.bson.BsonDocument; +import org.bson.BsonTimestamp; import org.junit.jupiter.api.Test; /** * Unit tests for {@link ChangeStreamOptions}. * * @author Mark Paluch + * @author Tudor Marc */ public class ChangeStreamOptionsUnitTests { @@ -53,4 +55,12 @@ public void shouldNotReportResumeStartAfter() { assertThat(options.isResumeAfter()).isFalse(); assertThat(options.isStartAfter()).isFalse(); } + + @Test // DATAMONGO-2607 + public void shouldReportStartAtOperationTime() { + + ChangeStreamOptions options = ChangeStreamOptions.builder().startAtOperationTime(new BsonTimestamp()).build(); + + assertThat(options.getStartAtOperationTime()).isNotNull(); + } }