Skip to content

Make ChangeStreamOptionsBuilder support startAtOperationTime #4071

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
*
* @author Christoph Strobl
* @author Mark Paluch
* @author Tudor Marc
* @since 2.1
*/
public class ChangeStreamOptions {
Expand All @@ -49,6 +50,7 @@ public class ChangeStreamOptions {
private @Nullable FullDocument fullDocumentLookup;
private @Nullable Collation collation;
private @Nullable Object resumeTimestamp;
private @Nullable Object startAtOperationTime;
private Resume resume = Resume.UNDEFINED;

protected ChangeStreamOptions() {}
Expand Down Expand Up @@ -96,6 +98,20 @@ public Optional<BsonTimestamp> getResumeBsonTimestamp() {
return Optional.ofNullable(resumeTimestamp).map(timestamp -> asTimestampOfType(timestamp, BsonTimestamp.class));
}

/**
* @return {@link Optional#empty()} if not set.
*/
public Optional<Instant> getStartAtOperationTime() {
return Optional.ofNullable(startAtOperationTime).map(timestamp -> asTimestampOfType(timestamp, Instant.class));
}

/**
* @return {@link Optional#empty()} if not set.
*/
public Optional<BsonTimestamp> getStartAtOperationTimeBson() {
return Optional.ofNullable(startAtOperationTime).map(timestamp -> asTimestampOfType(timestamp, BsonTimestamp.class));
}

/**
* @return {@literal true} if the change stream should be started after the {@link #getResumeToken() token}.
* @since 2.2
Expand Down Expand Up @@ -176,6 +192,9 @@ public boolean equals(Object o) {
if (!ObjectUtils.nullSafeEquals(this.resumeTimestamp, that.resumeTimestamp)) {
return false;
}
if (!ObjectUtils.nullSafeEquals(this.startAtOperationTime, that.startAtOperationTime)) {
return false;
}
return resume == that.resume;
}

Expand All @@ -186,6 +205,7 @@ public int hashCode() {
result = 31 * result + ObjectUtils.nullSafeHashCode(fullDocumentLookup);
result = 31 * result + ObjectUtils.nullSafeHashCode(collation);
result = 31 * result + ObjectUtils.nullSafeHashCode(resumeTimestamp);
result = 31 * result + ObjectUtils.nullSafeHashCode(startAtOperationTime);
result = 31 * result + ObjectUtils.nullSafeHashCode(resume);
return result;
}
Expand Down Expand Up @@ -213,6 +233,7 @@ enum Resume {
* Builder for creating {@link ChangeStreamOptions}.
*
* @author Christoph Strobl
* @author Tudor Marc
* @since 2.1
*/
public static class ChangeStreamOptionsBuilder {
Expand All @@ -222,6 +243,7 @@ public static class ChangeStreamOptionsBuilder {
private @Nullable FullDocument fullDocumentLookup;
private @Nullable Collation collation;
private @Nullable Object resumeTimestamp;
private @Nullable Object startAtOperationTime;
private Resume resume = Resume.UNDEFINED;

private ChangeStreamOptionsBuilder() {}
Expand Down Expand Up @@ -351,6 +373,34 @@ public ChangeStreamOptionsBuilder resumeAt(BsonTimestamp resumeTimestamp) {
return this;
}

/**
* Set the cluster startAtOperationTime to open the cursor at a particular point in time.
*
* @param startAtOperationTime must not be {@literal null}.
* @return this.
*/
public ChangeStreamOptionsBuilder startAtOperationTime(Instant startAtOperationTime) {

Assert.notNull(startAtOperationTime, "startAtOperationTime must not be null!");

this.startAtOperationTime = startAtOperationTime;
return this;
}

/**
* Set the cluster startAtOperationTime to open the cursor at a particular point in time.
*
* @param startAtOperationTime must not be {@literal null}.
* @return this.
*/
public ChangeStreamOptionsBuilder startAtOperationTime(BsonTimestamp startAtOperationTime) {

Assert.notNull(startAtOperationTime, "startAtOperationTime must not be null!");

this.startAtOperationTime = startAtOperationTime;
return this;
}

/**
* Set the resume token after which to continue emitting notifications.
*
Expand Down Expand Up @@ -393,6 +443,7 @@ public ChangeStreamOptions build() {
options.fullDocumentLookup = this.fullDocumentLookup;
options.collation = this.collation;
options.resumeTimestamp = this.resumeTimestamp;
options.startAtOperationTime = this.startAtOperationTime;
options.resume = this.resume;

return options;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

/**
* @author Christoph Strobl
* @author Tudor Marc
* @since 2.2
*/
class ReactiveChangeStreamOperationSupport implements ReactiveChangeStreamOperation {
Expand Down Expand Up @@ -178,6 +179,8 @@ private ChangeStreamOptionsBuilder initOptionsBuilder() {
} else {
options.getResumeTimestamp().ifPresent(builder::resumeAt);
options.getResumeBsonTimestamp().ifPresent(builder::resumeAt);
options.getStartAtOperationTime().ifPresent(builder::startAtOperationTime);
options.getStartAtOperationTimeBson().ifPresent(builder::startAtOperationTime);
}

return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ public Duration maxAwaitTime() {
* Builder for creating {@link ChangeStreamRequest}.
*
* @author Christoph Strobl
* @author Tudor Marc
* @since 2.1
* @see ChangeStreamOptions
*/
Expand Down Expand Up @@ -409,6 +410,22 @@ public ChangeStreamRequestBuilder<T> startAfter(BsonValue resumeToken) {
return this;
}

/**
* Set the cluster startAtOperationTime to open the cursor at a particular point in time.
*
* @param clusterTime must not be {@literal null}.
* @return this.
* @see ChangeStreamOptions#getStartAtOperationTime() ()
* @see ChangeStreamOptionsBuilder#startAtOperationTime(java.time.Instant)
*/
public ChangeStreamRequestBuilder<T> startAtOperationTime(Instant clusterTime) {

Assert.notNull(clusterTime, "clusterTime must not be null!");
this.delegate.startAtOperationTime(clusterTime);

return this;
}

/**
* Set the {@link FullDocument} lookup to {@link FullDocument#UPDATE_LOOKUP}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
import static org.assertj.core.api.Assertions.*;

import org.bson.BsonDocument;
import org.bson.BsonTimestamp;
import org.junit.jupiter.api.Test;

/**
* Unit tests for {@link ChangeStreamOptions}.
*
* @author Mark Paluch
* @author Tudor Marc
*/
public class ChangeStreamOptionsUnitTests {

Expand Down Expand Up @@ -53,4 +55,12 @@ public void shouldNotReportResumeStartAfter() {
assertThat(options.isResumeAfter()).isFalse();
assertThat(options.isStartAfter()).isFalse();
}

@Test // DATAMONGO-2607
public void shouldReportStartAtOperationTime() {

ChangeStreamOptions options = ChangeStreamOptions.builder().startAtOperationTime(new BsonTimestamp()).build();

assertThat(options.getStartAtOperationTime()).isNotNull();
}
}