Skip to content

Commit 82e1227

Browse files
committed
Make ChangeStreamOptionsBuilder support startAtOperationTime
Closes spring-projects#3460
1 parent 84faff6 commit 82e1227

File tree

4 files changed

+81
-0
lines changed

4 files changed

+81
-0
lines changed

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamOptions.java

+51
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
*
4141
* @author Christoph Strobl
4242
* @author Mark Paluch
43+
* @author Tudor Marc
4344
* @since 2.1
4445
*/
4546
public class ChangeStreamOptions {
@@ -49,6 +50,7 @@ public class ChangeStreamOptions {
4950
private @Nullable FullDocument fullDocumentLookup;
5051
private @Nullable Collation collation;
5152
private @Nullable Object resumeTimestamp;
53+
private @Nullable Object startAtOperationTime;
5254
private Resume resume = Resume.UNDEFINED;
5355

5456
protected ChangeStreamOptions() {}
@@ -96,6 +98,20 @@ public Optional<BsonTimestamp> getResumeBsonTimestamp() {
9698
return Optional.ofNullable(resumeTimestamp).map(timestamp -> asTimestampOfType(timestamp, BsonTimestamp.class));
9799
}
98100

101+
/**
102+
* @return {@link Optional#empty()} if not set.
103+
*/
104+
public Optional<Instant> getStartAtOperationTime() {
105+
return Optional.ofNullable(startAtOperationTime).map(timestamp -> asTimestampOfType(timestamp, Instant.class));
106+
}
107+
108+
/**
109+
* @return {@link Optional#empty()} if not set.
110+
*/
111+
public Optional<BsonTimestamp> getStartAtOperationTimeBson() {
112+
return Optional.ofNullable(startAtOperationTime).map(timestamp -> asTimestampOfType(timestamp, BsonTimestamp.class));
113+
}
114+
99115
/**
100116
* @return {@literal true} if the change stream should be started after the {@link #getResumeToken() token}.
101117
* @since 2.2
@@ -176,6 +192,9 @@ public boolean equals(Object o) {
176192
if (!ObjectUtils.nullSafeEquals(this.resumeTimestamp, that.resumeTimestamp)) {
177193
return false;
178194
}
195+
if (!ObjectUtils.nullSafeEquals(this.startAtOperationTime, that.startAtOperationTime)) {
196+
return false;
197+
}
179198
return resume == that.resume;
180199
}
181200

@@ -186,6 +205,7 @@ public int hashCode() {
186205
result = 31 * result + ObjectUtils.nullSafeHashCode(fullDocumentLookup);
187206
result = 31 * result + ObjectUtils.nullSafeHashCode(collation);
188207
result = 31 * result + ObjectUtils.nullSafeHashCode(resumeTimestamp);
208+
result = 31 * result + ObjectUtils.nullSafeHashCode(startAtOperationTime);
189209
result = 31 * result + ObjectUtils.nullSafeHashCode(resume);
190210
return result;
191211
}
@@ -213,6 +233,7 @@ enum Resume {
213233
* Builder for creating {@link ChangeStreamOptions}.
214234
*
215235
* @author Christoph Strobl
236+
* @author Tudor Marc
216237
* @since 2.1
217238
*/
218239
public static class ChangeStreamOptionsBuilder {
@@ -222,6 +243,7 @@ public static class ChangeStreamOptionsBuilder {
222243
private @Nullable FullDocument fullDocumentLookup;
223244
private @Nullable Collation collation;
224245
private @Nullable Object resumeTimestamp;
246+
private @Nullable Object startAtOperationTime;
225247
private Resume resume = Resume.UNDEFINED;
226248

227249
private ChangeStreamOptionsBuilder() {}
@@ -351,6 +373,34 @@ public ChangeStreamOptionsBuilder resumeAt(BsonTimestamp resumeTimestamp) {
351373
return this;
352374
}
353375

376+
/**
377+
* Set the cluster startAtOperationTime to open the cursor at a particular point in time.
378+
*
379+
* @param startAtOperationTime must not be {@literal null}.
380+
* @return this.
381+
*/
382+
public ChangeStreamOptionsBuilder startAtOperationTime(Instant startAtOperationTime) {
383+
384+
Assert.notNull(startAtOperationTime, "startAtOperationTime must not be null!");
385+
386+
this.startAtOperationTime = startAtOperationTime;
387+
return this;
388+
}
389+
390+
/**
391+
* Set the cluster startAtOperationTime to open the cursor at a particular point in time.
392+
*
393+
* @param startAtOperationTime must not be {@literal null}.
394+
* @return this.
395+
*/
396+
public ChangeStreamOptionsBuilder startAtOperationTime(BsonTimestamp startAtOperationTime) {
397+
398+
Assert.notNull(startAtOperationTime, "startAtOperationTime must not be null!");
399+
400+
this.startAtOperationTime = startAtOperationTime;
401+
return this;
402+
}
403+
354404
/**
355405
* Set the resume token after which to continue emitting notifications.
356406
*
@@ -393,6 +443,7 @@ public ChangeStreamOptions build() {
393443
options.fullDocumentLookup = this.fullDocumentLookup;
394444
options.collation = this.collation;
395445
options.resumeTimestamp = this.resumeTimestamp;
446+
options.startAtOperationTime = this.startAtOperationTime;
396447
options.resume = this.resume;
397448

398449
return options;

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupport.java

+3
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
/**
3535
* @author Christoph Strobl
36+
* @author Tudor Marc
3637
* @since 2.2
3738
*/
3839
class ReactiveChangeStreamOperationSupport implements ReactiveChangeStreamOperation {
@@ -178,6 +179,8 @@ private ChangeStreamOptionsBuilder initOptionsBuilder() {
178179
} else {
179180
options.getResumeTimestamp().ifPresent(builder::resumeAt);
180181
options.getResumeBsonTimestamp().ifPresent(builder::resumeAt);
182+
options.getStartAtOperationTime().ifPresent(builder::startAtOperationTime);
183+
options.getStartAtOperationTimeBson().ifPresent(builder::startAtOperationTime);
181184
}
182185

183186
return builder;

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamRequest.java

+17
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@ public Duration maxAwaitTime() {
232232
* Builder for creating {@link ChangeStreamRequest}.
233233
*
234234
* @author Christoph Strobl
235+
* @author Tudor Marc
235236
* @since 2.1
236237
* @see ChangeStreamOptions
237238
*/
@@ -409,6 +410,22 @@ public ChangeStreamRequestBuilder<T> startAfter(BsonValue resumeToken) {
409410
return this;
410411
}
411412

413+
/**
414+
* Set the cluster startAtOperationTime to open the cursor at a particular point in time.
415+
*
416+
* @param clusterTime must not be {@literal null}.
417+
* @return this.
418+
* @see ChangeStreamOptions#getStartAtOperationTime() ()
419+
* @see ChangeStreamOptionsBuilder#startAtOperationTime(java.time.Instant)
420+
*/
421+
public ChangeStreamRequestBuilder<T> startAtOperationTime(Instant clusterTime) {
422+
423+
Assert.notNull(clusterTime, "clusterTime must not be null!");
424+
this.delegate.startAtOperationTime(clusterTime);
425+
426+
return this;
427+
}
428+
412429
/**
413430
* Set the {@link FullDocument} lookup to {@link FullDocument#UPDATE_LOOKUP}.
414431
*

spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ChangeStreamOptionsUnitTests.java

+10
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@
1818
import static org.assertj.core.api.Assertions.*;
1919

2020
import org.bson.BsonDocument;
21+
import org.bson.BsonTimestamp;
2122
import org.junit.jupiter.api.Test;
2223

2324
/**
2425
* Unit tests for {@link ChangeStreamOptions}.
2526
*
2627
* @author Mark Paluch
28+
* @author Tudor Marc
2729
*/
2830
public class ChangeStreamOptionsUnitTests {
2931

@@ -53,4 +55,12 @@ public void shouldNotReportResumeStartAfter() {
5355
assertThat(options.isResumeAfter()).isFalse();
5456
assertThat(options.isStartAfter()).isFalse();
5557
}
58+
59+
@Test // DATAMONGO-2607
60+
public void shouldReportStartAtOperationTime() {
61+
62+
ChangeStreamOptions options = ChangeStreamOptions.builder().startAtOperationTime(new BsonTimestamp()).build();
63+
64+
assertThat(options.getStartAtOperationTime()).isNotNull();
65+
}
5666
}

0 commit comments

Comments
 (0)