From 9e58c567a0e1989e4cc4c43eeb8d0d6e74beedeb Mon Sep 17 00:00:00 2001 From: Christoph Strobl Date: Thu, 2 Nov 2023 10:40:28 +0100 Subject: [PATCH 1/2] Prepare issue branch. --- pom.xml | 2 +- spring-data-mongodb-benchmarks/pom.xml | 2 +- spring-data-mongodb-distribution/pom.xml | 2 +- spring-data-mongodb/pom.xml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pom.xml b/pom.xml index f4eb82b8bb..8c2581e4e2 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.springframework.data spring-data-mongodb-parent - 4.2.0-SNAPSHOT + 4.2.x-4495-SNAPSHOT pom Spring Data MongoDB diff --git a/spring-data-mongodb-benchmarks/pom.xml b/spring-data-mongodb-benchmarks/pom.xml index 2de4b6b635..391e7296b2 100644 --- a/spring-data-mongodb-benchmarks/pom.xml +++ b/spring-data-mongodb-benchmarks/pom.xml @@ -7,7 +7,7 @@ org.springframework.data spring-data-mongodb-parent - 4.2.0-SNAPSHOT + 4.2.x-4495-SNAPSHOT ../pom.xml diff --git a/spring-data-mongodb-distribution/pom.xml b/spring-data-mongodb-distribution/pom.xml index 41b81f9aa6..440cb75563 100644 --- a/spring-data-mongodb-distribution/pom.xml +++ b/spring-data-mongodb-distribution/pom.xml @@ -15,7 +15,7 @@ org.springframework.data spring-data-mongodb-parent - 4.2.0-SNAPSHOT + 4.2.x-4495-SNAPSHOT ../pom.xml diff --git a/spring-data-mongodb/pom.xml b/spring-data-mongodb/pom.xml index d7a9ddaa63..7a925187b8 100644 --- a/spring-data-mongodb/pom.xml +++ b/spring-data-mongodb/pom.xml @@ -13,7 +13,7 @@ org.springframework.data spring-data-mongodb-parent - 4.2.0-SNAPSHOT + 4.2.x-4495-SNAPSHOT ../pom.xml From 304efcc278206f9f8ab21f54bcdc1ad0dacdf524 Mon Sep 17 00:00:00 2001 From: Christoph Strobl Date: Thu, 2 Nov 2023 11:41:19 +0100 Subject: [PATCH 2/2] Pass on FullDocumentBeforeChange option to change stream iterable/publisher. This commit ensures to pass on a potentially set FullDocumentBeforeChange option to the change stream iterable/publisher. It also corrects false optional behaviour within the change stream task which did some defaulting though the actual value is an optional one that must not be present. --- .../ReactiveChangeStreamOperationSupport.java | 1 + .../mongodb/core/ReactiveMongoTemplate.java | 4 ++++ .../core/messaging/ChangeStreamTask.java | 12 ++++++---- .../core/ReactiveMongoTemplateUnitTests.java | 23 +++++++++++++++++-- .../messaging/ChangeStreamTaskUnitTests.java | 20 +++++++++++++--- 5 files changed, 50 insertions(+), 10 deletions(-) diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupport.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupport.java index 10c806f558..e3e74c7d45 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupport.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupport.java @@ -168,6 +168,7 @@ private ChangeStreamOptionsBuilder initOptionsBuilder() { } }); options.getFullDocumentLookup().ifPresent(builder::fullDocumentLookup); + options.getFullDocumentBeforeChangeLookup().ifPresent(builder::fullDocumentBeforeChangeLookup); options.getCollation().ifPresent(builder::collation); if (options.isResumeAfter()) { diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java index 9b03f08209..3e926d8f97 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java @@ -2050,6 +2050,10 @@ public Flux> changeStream(@Nullable String database, @N publisher = options.getCollation().map(Collation::toMongoCollation).map(publisher::collation) .orElse(publisher); publisher = options.getResumeBsonTimestamp().map(publisher::startAtOperationTime).orElse(publisher); + + if(options.getFullDocumentBeforeChangeLookup().isPresent()) { + publisher = publisher.fullDocumentBeforeChange(options.getFullDocumentBeforeChangeLookup().get()); + } return publisher.fullDocument(options.getFullDocumentLookup().orElse(fullDocument)); }) // .flatMapMany(publisher -> Flux.from(publisher) diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java index a19b65521d..ebe61665a2 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java @@ -37,7 +37,6 @@ import org.springframework.data.mongodb.core.aggregation.TypedAggregation; import org.springframework.data.mongodb.core.convert.MongoConverter; import org.springframework.data.mongodb.core.convert.QueryMapper; -import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest.ChangeStreamRequestOptions; import org.springframework.data.mongodb.core.messaging.Message.MessageProperties; import org.springframework.data.mongodb.core.messaging.SubscriptionRequest.RequestOptions; import org.springframework.lang.Nullable; @@ -88,7 +87,7 @@ protected MongoCursor> initCursor(MongoTemplate t Collation collation = null; FullDocument fullDocument = ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT : FullDocument.UPDATE_LOOKUP; - FullDocumentBeforeChange fullDocumentBeforeChange = FullDocumentBeforeChange.DEFAULT; + FullDocumentBeforeChange fullDocumentBeforeChange = null; BsonTimestamp startAt = null; boolean resumeAfter = true; @@ -116,8 +115,9 @@ protected MongoCursor> initCursor(MongoTemplate t .orElseGet(() -> ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT : FullDocument.UPDATE_LOOKUP); - fullDocumentBeforeChange = changeStreamOptions.getFullDocumentBeforeChangeLookup() - .orElse(FullDocumentBeforeChange.DEFAULT); + if(changeStreamOptions.getFullDocumentBeforeChangeLookup().isPresent()) { + fullDocumentBeforeChange = changeStreamOptions.getFullDocumentBeforeChangeLookup().get(); + } startAt = changeStreamOptions.getResumeBsonTimestamp().orElse(null); } @@ -158,7 +158,9 @@ protected MongoCursor> initCursor(MongoTemplate t } iterable = iterable.fullDocument(fullDocument); - iterable = iterable.fullDocumentBeforeChange(fullDocumentBeforeChange); + if(fullDocumentBeforeChange != null) { + iterable = iterable.fullDocumentBeforeChange(fullDocumentBeforeChange); + } return iterable.iterator(); } diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java index c7b1e8f03c..ddd71fca2a 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java @@ -20,8 +20,6 @@ import static org.springframework.data.mongodb.core.aggregation.Aggregation.*; import static org.springframework.data.mongodb.test.util.Assertions.assertThat; -import com.mongodb.WriteConcern; -import org.springframework.data.mongodb.core.MongoTemplateUnitTests.Sith; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -101,6 +99,7 @@ import com.mongodb.MongoClientSettings; import com.mongodb.ReadConcern; import com.mongodb.ReadPreference; +import com.mongodb.WriteConcern; import com.mongodb.client.model.CountOptions; import com.mongodb.client.model.CreateCollectionOptions; import com.mongodb.client.model.DeleteOptions; @@ -110,6 +109,7 @@ import com.mongodb.client.model.ReplaceOptions; import com.mongodb.client.model.TimeSeriesGranularity; import com.mongodb.client.model.UpdateOptions; +import com.mongodb.client.model.changestream.FullDocumentBeforeChange; import com.mongodb.client.result.DeleteResult; import com.mongodb.client.result.InsertManyResult; import com.mongodb.client.result.InsertOneResult; @@ -1604,6 +1604,25 @@ void changeStreamOptionStartAftershouldApplied() { verify(changeStreamPublisher).startAfter(eq(token)); } + @Test // GH-4495 + void changeStreamOptionFullDocumentBeforeChangeShouldBeApplied() { + + when(factory.getMongoDatabase(anyString())).thenReturn(Mono.just(db)); + + when(collection.watch(any(Class.class))).thenReturn(changeStreamPublisher); + when(changeStreamPublisher.batchSize(anyInt())).thenReturn(changeStreamPublisher); + when(changeStreamPublisher.startAfter(any())).thenReturn(changeStreamPublisher); + when(changeStreamPublisher.fullDocument(any())).thenReturn(changeStreamPublisher); + when(changeStreamPublisher.fullDocumentBeforeChange(any())).thenReturn(changeStreamPublisher); + + template + .changeStream("database", "collection", ChangeStreamOptions.builder().fullDocumentBeforeChangeLookup(FullDocumentBeforeChange.REQUIRED).build(), Object.class) + .subscribe(); + + verify(changeStreamPublisher).fullDocumentBeforeChange(eq(FullDocumentBeforeChange.REQUIRED)); + + } + @Test // GH-4462 void replaceShouldUseCollationWhenPresent() { diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTaskUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTaskUnitTests.java index ae053277f4..a629cc8dea 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTaskUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTaskUnitTests.java @@ -27,7 +27,6 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; - import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.convert.MappingMongoConverter; import org.springframework.data.mongodb.core.convert.MongoConverter; @@ -39,6 +38,7 @@ import com.mongodb.client.MongoCursor; import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.changestream.ChangeStreamDocument; +import com.mongodb.client.model.changestream.FullDocumentBeforeChange; /** * @author Christoph Strobl @@ -68,8 +68,6 @@ void setUp() { when(mongoCollection.watch(eq(Document.class))).thenReturn(changeStreamIterable); when(changeStreamIterable.fullDocument(any())).thenReturn(changeStreamIterable); - - when(changeStreamIterable.fullDocumentBeforeChange(any())).thenReturn(changeStreamIterable); } @Test // DATAMONGO-2258 @@ -125,6 +123,22 @@ void shouldApplyStartAfterToChangeStream() { verify(changeStreamIterable).startAfter(eq(resumeToken)); } + @Test // GH-4495 + void shouldApplyFullDocumentBeforeChangeToChangeStream() { + + when(changeStreamIterable.fullDocumentBeforeChange(any())).thenReturn(changeStreamIterable); + + ChangeStreamRequest request = ChangeStreamRequest.builder() // + .collection("start-wars") // + .fullDocumentBeforeChangeLookup(FullDocumentBeforeChange.REQUIRED) // + .publishTo(message -> {}) // + .build(); + + initTask(request, Document.class); + + verify(changeStreamIterable).fullDocumentBeforeChange(eq(FullDocumentBeforeChange.REQUIRED)); + } + private MongoCursor> initTask(ChangeStreamRequest request, Class targetType) { ChangeStreamTask task = new ChangeStreamTask(template, request, targetType, er -> {});