Skip to content

DATAMONGO-2115 - Add support for BsonTimestamp when resuming Change Streams. #624

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>2.2.0.BUILD-SNAPSHOT</version>
<version>2.2.0.DATAMONGO-2115-SNAPSHOT</version>
<packaging>pom</packaging>

<name>Spring Data MongoDB</name>
Expand Down
2 changes: 1 addition & 1 deletion spring-data-mongodb-benchmarks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>2.2.0.BUILD-SNAPSHOT</version>
<version>2.2.0.DATAMONGO-2115-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
4 changes: 2 additions & 2 deletions spring-data-mongodb-cross-store/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>2.2.0.BUILD-SNAPSHOT</version>
<version>2.2.0.DATAMONGO-2115-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down Expand Up @@ -50,7 +50,7 @@
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb</artifactId>
<version>2.2.0.BUILD-SNAPSHOT</version>
<version>2.2.0.DATAMONGO-2115-SNAPSHOT</version>
</dependency>

<!-- reactive -->
Expand Down
2 changes: 1 addition & 1 deletion spring-data-mongodb-distribution/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>2.2.0.BUILD-SNAPSHOT</version>
<version>2.2.0.DATAMONGO-2115-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion spring-data-mongodb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>2.2.0.BUILD-SNAPSHOT</version>
<version>2.2.0.DATAMONGO-2115-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.time.Instant;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import org.bson.BsonTimestamp;
import org.bson.BsonValue;
import org.bson.Document;
import org.springframework.data.mongodb.core.convert.MongoConverter;
Expand Down Expand Up @@ -84,8 +85,19 @@ public ChangeStreamDocument<Document> getRaw() {
@Nullable
public Instant getTimestamp() {

return raw != null && raw.getClusterTime() != null
? converter.getConversionService().convert(raw.getClusterTime(), Instant.class) : null;
return getBsonTimestamp() != null ? converter.getConversionService().convert(raw.getClusterTime(), Instant.class)
: null;
}

/**
* Get the {@link ChangeStreamDocument#getClusterTime() cluster time}.
*
* @return can be {@literal null}.
* @since 2.2
*/
@Nullable
public BsonTimestamp getBsonTimestamp() {
return raw != null ? raw.getClusterTime() : null;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@
import java.util.Arrays;
import java.util.Optional;

import org.bson.BsonTimestamp;
import org.bson.BsonValue;
import org.bson.Document;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.query.Collation;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.ObjectUtils;

import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
Expand All @@ -47,7 +50,7 @@ public class ChangeStreamOptions {
private @Nullable BsonValue resumeToken;
private @Nullable FullDocument fullDocumentLookup;
private @Nullable Collation collation;
private @Nullable Instant resumeTimestamp;
private @Nullable Object resumeTimestamp;

protected ChangeStreamOptions() {}

Expand Down Expand Up @@ -83,7 +86,15 @@ public Optional<Collation> getCollation() {
* @return {@link Optional#empty()} if not set.
*/
public Optional<Instant> getResumeTimestamp() {
return Optional.ofNullable(resumeTimestamp);
return Optional.ofNullable(resumeTimestamp).map(this::asInstant);
}

/**
* @return {@link Optional#empty()} if not set.
* @since 2.2
*/
public Optional<BsonTimestamp> getResumeBsonTimestamp() {
return Optional.ofNullable(resumeTimestamp).map(this::asBsonTimestamp);
}

/**
Expand All @@ -103,6 +114,33 @@ public static ChangeStreamOptionsBuilder builder() {
return new ChangeStreamOptionsBuilder();
}

private Instant asInstant(Object timestamp) {
return asTimestampOfType(timestamp, Instant.class);
}

private BsonTimestamp asBsonTimestamp(Object timestamp) {
return asTimestampOfType(timestamp, BsonTimestamp.class);
}

private <T> T asTimestampOfType(Object timestamp, Class<T> targetType) {

if (ClassUtils.isAssignableValue(targetType, timestamp)) {
return (T) timestamp;
}

if (timestamp instanceof Instant) {
return (T) new BsonTimestamp((int) ((Instant) timestamp).getEpochSecond(), 0);
}

if (timestamp instanceof BsonTimestamp) {
return (T) Instant.ofEpochSecond(((BsonTimestamp) timestamp).getTime());
}

throw new IllegalArgumentException(
"o_O that should actually not happen. The timestampt should be an Instant or a BsonTimestamp but was "
+ ObjectUtils.nullSafeClassName(timestamp));
}

/**
* Builder for creating {@link ChangeStreamOptions}.
*
Expand All @@ -115,7 +153,7 @@ public static class ChangeStreamOptionsBuilder {
private @Nullable BsonValue resumeToken;
private @Nullable FullDocument fullDocumentLookup;
private @Nullable Collation collation;
private @Nullable Instant resumeTimestamp;
private @Nullable Object resumeTimestamp;

private ChangeStreamOptionsBuilder() {}

Expand Down Expand Up @@ -224,6 +262,21 @@ public ChangeStreamOptionsBuilder resumeAt(Instant resumeTimestamp) {
return this;
}

/**
* Set the cluster time to resume from.
*
* @param resumeTimestamp must not be {@literal null}.
* @return this.
* @since 2.2
*/
public ChangeStreamOptionsBuilder resumeAt(BsonTimestamp resumeTimestamp) {

Assert.notNull(resumeTimestamp, "ResumeTimestamp must not be null!");

this.resumeTimestamp = resumeTimestamp;
return this;
}

/**
* @return the built {@link ChangeStreamOptions}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import org.bson.BsonTimestamp;
import org.bson.BsonValue;
import org.bson.Document;
import org.bson.codecs.Codec;
Expand Down Expand Up @@ -1927,8 +1926,7 @@ public <T> Flux<ChangeStreamEvent<T>> changeStream(@Nullable String database, @N

publisher = options.getResumeToken().map(BsonValue::asDocument).map(publisher::resumeAfter).orElse(publisher);
publisher = options.getCollation().map(Collation::toMongoCollation).map(publisher::collation).orElse(publisher);
publisher = options.getResumeTimestamp().map(it -> new BsonTimestamp((int) it.getEpochSecond(), 0))
.map(publisher::startAtOperationTime).orElse(publisher);
publisher = options.getResumeBsonTimestamp().map(publisher::startAtOperationTime).orElse(publisher);
publisher = publisher.fullDocument(options.getFullDocumentLookup().orElse(fullDocument));

return Flux.from(publisher).map(document -> new ChangeStreamEvent<>(document, targetType, getConverter()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@ protected MongoCursor<ChangeStreamDocument<Document>> initCursor(MongoTemplate t
.orElseGet(() -> ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT
: FullDocument.UPDATE_LOOKUP);

startAt = changeStreamOptions.getResumeTimestamp().map(it -> new BsonTimestamp((int) it.getEpochSecond(), 0))
.orElse(null);
startAt = changeStreamOptions.getResumeBsonTimestamp().orElse(null);
}

MongoDatabase db = StringUtils.hasText(options.getDatabaseName())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Wither;
import org.bson.BsonTimestamp;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -1399,6 +1400,49 @@ public void resumesAtTimestampCorrectly() throws InterruptedException {
}
}

@Test // DATAMONGO-2115
public void resumesAtBsonTimestampCorrectly() throws InterruptedException {

Assumptions.assumeThat(ReplicaSet.required().runsAsReplicaSet()).isTrue();

StepVerifier.create(template.createCollection(Person.class)).expectNextCount(1).verifyComplete();

BlockingQueue<ChangeStreamEvent<Person>> documents = new LinkedBlockingQueue<>(100);
Disposable disposable = template.changeStream("person", ChangeStreamOptions.empty(), Person.class)
.doOnNext(documents::add).subscribe();

Thread.sleep(500); // just give it some time to link to the collection.

Person person1 = new Person("Spring", 38);
Person person2 = new Person("Data", 37);
Person person3 = new Person("MongoDB", 39);

StepVerifier.create(template.save(person1).delayElement(Duration.ofSeconds(1))).expectNextCount(1).verifyComplete();
StepVerifier.create(template.save(person2)).expectNextCount(1).verifyComplete();

Thread.sleep(500); // just give it some time to link receive all events

disposable.dispose();

documents.take(); // skip first
BsonTimestamp resumeAt = documents.take().getBsonTimestamp(); // take 2nd

StepVerifier.create(template.save(person3)).expectNextCount(1).verifyComplete();

BlockingQueue<ChangeStreamEvent<Person>> resumeDocuments = new LinkedBlockingQueue<>(100);
template.changeStream("person", ChangeStreamOptions.builder().resumeAt(resumeAt).build(), Person.class)
.doOnNext(resumeDocuments::add).subscribe();

Thread.sleep(500); // just give it some time to link receive all events

try {
Assertions.assertThat(resumeDocuments.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList()))
.containsExactly(person2, person3);
} finally {
disposable.dispose();
}
}

private PersonWithAList createPersonWithAList(String firstname, int age) {

PersonWithAList p = new PersonWithAList();
Expand Down
3 changes: 3 additions & 0 deletions src/main/asciidoc/reference/change-streams.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,6 @@ Flux<ChangeStreamEvent<Person>> resumed = template.changeStream("person", option
<1> You may obtain the server time of an `ChangeStreamEvent` through the `getTimestamp` method or use the `resumeToken`
exposed through `getResumeToken`.
====

TIP: In some cases an `Instant` might not be a precise enough measure when resuming a Change Stream. Use a MongoDB native
https://docs.mongodb.com/manual/reference/bson-types/#timestamps[BsonTimestamp] for that purpose.