From 4d3c5503f83cc2e1db893c7bf1f02ff59a9839c0 Mon Sep 17 00:00:00 2001 From: Christoph Strobl Date: Fri, 30 Nov 2018 13:02:17 +0100 Subject: [PATCH 1/2] DATAMONGO-2115 - Prepare issue branch. --- pom.xml | 2 +- spring-data-mongodb-benchmarks/pom.xml | 2 +- spring-data-mongodb-cross-store/pom.xml | 4 ++-- spring-data-mongodb-distribution/pom.xml | 2 +- spring-data-mongodb/pom.xml | 2 +- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pom.xml b/pom.xml index 28dc5c96c1..3bc9648f18 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.springframework.data spring-data-mongodb-parent - 2.2.0.BUILD-SNAPSHOT + 2.2.0.DATAMONGO-2115-SNAPSHOT pom Spring Data MongoDB diff --git a/spring-data-mongodb-benchmarks/pom.xml b/spring-data-mongodb-benchmarks/pom.xml index c2ff37b35c..fd5c06d1cf 100644 --- a/spring-data-mongodb-benchmarks/pom.xml +++ b/spring-data-mongodb-benchmarks/pom.xml @@ -7,7 +7,7 @@ org.springframework.data spring-data-mongodb-parent - 2.2.0.BUILD-SNAPSHOT + 2.2.0.DATAMONGO-2115-SNAPSHOT ../pom.xml diff --git a/spring-data-mongodb-cross-store/pom.xml b/spring-data-mongodb-cross-store/pom.xml index fd36f227c0..7911320f78 100644 --- a/spring-data-mongodb-cross-store/pom.xml +++ b/spring-data-mongodb-cross-store/pom.xml @@ -6,7 +6,7 @@ org.springframework.data spring-data-mongodb-parent - 2.2.0.BUILD-SNAPSHOT + 2.2.0.DATAMONGO-2115-SNAPSHOT ../pom.xml @@ -50,7 +50,7 @@ org.springframework.data spring-data-mongodb - 2.2.0.BUILD-SNAPSHOT + 2.2.0.DATAMONGO-2115-SNAPSHOT diff --git a/spring-data-mongodb-distribution/pom.xml b/spring-data-mongodb-distribution/pom.xml index cb441dd8ef..4dc5282953 100644 --- a/spring-data-mongodb-distribution/pom.xml +++ b/spring-data-mongodb-distribution/pom.xml @@ -13,7 +13,7 @@ org.springframework.data spring-data-mongodb-parent - 2.2.0.BUILD-SNAPSHOT + 2.2.0.DATAMONGO-2115-SNAPSHOT ../pom.xml diff --git a/spring-data-mongodb/pom.xml b/spring-data-mongodb/pom.xml index f3c85a046a..f8abca6954 100644 --- a/spring-data-mongodb/pom.xml +++ b/spring-data-mongodb/pom.xml @@ -11,7 +11,7 @@ org.springframework.data spring-data-mongodb-parent - 2.2.0.BUILD-SNAPSHOT + 2.2.0.DATAMONGO-2115-SNAPSHOT ../pom.xml From 87c777c6d1d4f9ba636f769f3de2cb0b599ec450 Mon Sep 17 00:00:00 2001 From: Christoph Strobl Date: Fri, 30 Nov 2018 14:20:22 +0100 Subject: [PATCH 2/2] DATAMONGO-2115 - Add support for BsonTimestamp when resuming Change Streams. --- .../data/mongodb/core/ChangeStreamEvent.java | 16 ++++- .../mongodb/core/ChangeStreamOptions.java | 59 ++++++++++++++++++- .../mongodb/core/ReactiveMongoTemplate.java | 4 +- .../core/messaging/ChangeStreamTask.java | 3 +- .../core/ReactiveMongoTemplateTests.java | 44 ++++++++++++++ .../asciidoc/reference/change-streams.adoc | 3 + 6 files changed, 119 insertions(+), 10 deletions(-) diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamEvent.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamEvent.java index 8f661c5af2..2702719fd1 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamEvent.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamEvent.java @@ -20,6 +20,7 @@ import java.time.Instant; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import org.bson.BsonTimestamp; import org.bson.BsonValue; import org.bson.Document; import org.springframework.data.mongodb.core.convert.MongoConverter; @@ -84,8 +85,19 @@ public ChangeStreamDocument getRaw() { @Nullable public Instant getTimestamp() { - return raw != null && raw.getClusterTime() != null - ? converter.getConversionService().convert(raw.getClusterTime(), Instant.class) : null; + return getBsonTimestamp() != null ? converter.getConversionService().convert(raw.getClusterTime(), Instant.class) + : null; + } + + /** + * Get the {@link ChangeStreamDocument#getClusterTime() cluster time}. + * + * @return can be {@literal null}. + * @since 2.2 + */ + @Nullable + public BsonTimestamp getBsonTimestamp() { + return raw != null ? raw.getClusterTime() : null; } /** diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamOptions.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamOptions.java index 42b0de8c1a..552a088c5b 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamOptions.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamOptions.java @@ -21,12 +21,15 @@ import java.util.Arrays; import java.util.Optional; +import org.bson.BsonTimestamp; import org.bson.BsonValue; import org.bson.Document; import org.springframework.data.mongodb.core.aggregation.Aggregation; import org.springframework.data.mongodb.core.query.Collation; import org.springframework.lang.Nullable; import org.springframework.util.Assert; +import org.springframework.util.ClassUtils; +import org.springframework.util.ObjectUtils; import com.mongodb.client.model.changestream.ChangeStreamDocument; import com.mongodb.client.model.changestream.FullDocument; @@ -47,7 +50,7 @@ public class ChangeStreamOptions { private @Nullable BsonValue resumeToken; private @Nullable FullDocument fullDocumentLookup; private @Nullable Collation collation; - private @Nullable Instant resumeTimestamp; + private @Nullable Object resumeTimestamp; protected ChangeStreamOptions() {} @@ -83,7 +86,15 @@ public Optional getCollation() { * @return {@link Optional#empty()} if not set. */ public Optional getResumeTimestamp() { - return Optional.ofNullable(resumeTimestamp); + return Optional.ofNullable(resumeTimestamp).map(this::asInstant); + } + + /** + * @return {@link Optional#empty()} if not set. + * @since 2.2 + */ + public Optional getResumeBsonTimestamp() { + return Optional.ofNullable(resumeTimestamp).map(this::asBsonTimestamp); } /** @@ -103,6 +114,33 @@ public static ChangeStreamOptionsBuilder builder() { return new ChangeStreamOptionsBuilder(); } + private Instant asInstant(Object timestamp) { + return asTimestampOfType(timestamp, Instant.class); + } + + private BsonTimestamp asBsonTimestamp(Object timestamp) { + return asTimestampOfType(timestamp, BsonTimestamp.class); + } + + private T asTimestampOfType(Object timestamp, Class targetType) { + + if (ClassUtils.isAssignableValue(targetType, timestamp)) { + return (T) timestamp; + } + + if (timestamp instanceof Instant) { + return (T) new BsonTimestamp((int) ((Instant) timestamp).getEpochSecond(), 0); + } + + if (timestamp instanceof BsonTimestamp) { + return (T) Instant.ofEpochSecond(((BsonTimestamp) timestamp).getTime()); + } + + throw new IllegalArgumentException( + "o_O that should actually not happen. The timestampt should be an Instant or a BsonTimestamp but was " + + ObjectUtils.nullSafeClassName(timestamp)); + } + /** * Builder for creating {@link ChangeStreamOptions}. * @@ -115,7 +153,7 @@ public static class ChangeStreamOptionsBuilder { private @Nullable BsonValue resumeToken; private @Nullable FullDocument fullDocumentLookup; private @Nullable Collation collation; - private @Nullable Instant resumeTimestamp; + private @Nullable Object resumeTimestamp; private ChangeStreamOptionsBuilder() {} @@ -224,6 +262,21 @@ public ChangeStreamOptionsBuilder resumeAt(Instant resumeTimestamp) { return this; } + /** + * Set the cluster time to resume from. + * + * @param resumeTimestamp must not be {@literal null}. + * @return this. + * @since 2.2 + */ + public ChangeStreamOptionsBuilder resumeAt(BsonTimestamp resumeTimestamp) { + + Assert.notNull(resumeTimestamp, "ResumeTimestamp must not be null!"); + + this.resumeTimestamp = resumeTimestamp; + return this; + } + /** * @return the built {@link ChangeStreamOptions} */ diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java index e5826ec663..0ea20fbf82 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java @@ -31,7 +31,6 @@ import java.util.function.Function; import java.util.stream.Collectors; -import org.bson.BsonTimestamp; import org.bson.BsonValue; import org.bson.Document; import org.bson.codecs.Codec; @@ -1927,8 +1926,7 @@ public Flux> changeStream(@Nullable String database, @N publisher = options.getResumeToken().map(BsonValue::asDocument).map(publisher::resumeAfter).orElse(publisher); publisher = options.getCollation().map(Collation::toMongoCollation).map(publisher::collation).orElse(publisher); - publisher = options.getResumeTimestamp().map(it -> new BsonTimestamp((int) it.getEpochSecond(), 0)) - .map(publisher::startAtOperationTime).orElse(publisher); + publisher = options.getResumeBsonTimestamp().map(publisher::startAtOperationTime).orElse(publisher); publisher = publisher.fullDocument(options.getFullDocumentLookup().orElse(fullDocument)); return Flux.from(publisher).map(document -> new ChangeStreamEvent<>(document, targetType, getConverter())); diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java index 5d0ff694d7..7daae31c2b 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java @@ -115,8 +115,7 @@ protected MongoCursor> initCursor(MongoTemplate t .orElseGet(() -> ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT : FullDocument.UPDATE_LOOKUP); - startAt = changeStreamOptions.getResumeTimestamp().map(it -> new BsonTimestamp((int) it.getEpochSecond(), 0)) - .orElse(null); + startAt = changeStreamOptions.getResumeBsonTimestamp().orElse(null); } MongoDatabase db = StringUtils.hasText(options.getDatabaseName()) diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateTests.java index 22fc49317d..660daa239d 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateTests.java @@ -24,6 +24,7 @@ import lombok.Data; import lombok.NoArgsConstructor; import lombok.experimental.Wither; +import org.bson.BsonTimestamp; import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -1399,6 +1400,49 @@ public void resumesAtTimestampCorrectly() throws InterruptedException { } } + @Test // DATAMONGO-2115 + public void resumesAtBsonTimestampCorrectly() throws InterruptedException { + + Assumptions.assumeThat(ReplicaSet.required().runsAsReplicaSet()).isTrue(); + + StepVerifier.create(template.createCollection(Person.class)).expectNextCount(1).verifyComplete(); + + BlockingQueue> documents = new LinkedBlockingQueue<>(100); + Disposable disposable = template.changeStream("person", ChangeStreamOptions.empty(), Person.class) + .doOnNext(documents::add).subscribe(); + + Thread.sleep(500); // just give it some time to link to the collection. + + Person person1 = new Person("Spring", 38); + Person person2 = new Person("Data", 37); + Person person3 = new Person("MongoDB", 39); + + StepVerifier.create(template.save(person1).delayElement(Duration.ofSeconds(1))).expectNextCount(1).verifyComplete(); + StepVerifier.create(template.save(person2)).expectNextCount(1).verifyComplete(); + + Thread.sleep(500); // just give it some time to link receive all events + + disposable.dispose(); + + documents.take(); // skip first + BsonTimestamp resumeAt = documents.take().getBsonTimestamp(); // take 2nd + + StepVerifier.create(template.save(person3)).expectNextCount(1).verifyComplete(); + + BlockingQueue> resumeDocuments = new LinkedBlockingQueue<>(100); + template.changeStream("person", ChangeStreamOptions.builder().resumeAt(resumeAt).build(), Person.class) + .doOnNext(resumeDocuments::add).subscribe(); + + Thread.sleep(500); // just give it some time to link receive all events + + try { + Assertions.assertThat(resumeDocuments.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList())) + .containsExactly(person2, person3); + } finally { + disposable.dispose(); + } + } + private PersonWithAList createPersonWithAList(String firstname, int age) { PersonWithAList p = new PersonWithAList(); diff --git a/src/main/asciidoc/reference/change-streams.adoc b/src/main/asciidoc/reference/change-streams.adoc index a31aad2a4d..c18bd0dd38 100644 --- a/src/main/asciidoc/reference/change-streams.adoc +++ b/src/main/asciidoc/reference/change-streams.adoc @@ -81,3 +81,6 @@ Flux> resumed = template.changeStream("person", option <1> You may obtain the server time of an `ChangeStreamEvent` through the `getTimestamp` method or use the `resumeToken` exposed through `getResumeToken`. ==== + +TIP: In some cases an `Instant` might not be a precise enough measure when resuming a Change Stream. Use a MongoDB native +https://docs.mongodb.com/manual/reference/bson-types/#timestamps[BsonTimestamp] for that purpose.