From 2c163d80b28854aa90cad720dc1ec7448f30cce1 Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Wed, 28 Feb 2024 11:01:24 -0800 Subject: [PATCH 01/18] Add Client Operation Timeout to Change Streams - Trigger timeout error when maxAwaitTimeMS exceeds a timeoutMS. - Apply timeoutMS to initial aggregate and subsequent next calls, without appending maxTimeMS to getMore. - Utilize maxAwaitTimeMS as maxTimeMS in getMore commands. - Preserve change stream validity upon timeout errors. JAVA-4054 --- build.gradle | 4 +- .../com/mongodb/internal/TimeoutContext.java | 2 +- .../operation/AggregateOperationImpl.java | 29 +- .../operation/ChangeStreamBatchCursor.java | 47 ++- .../operation/ChangeStreamOperation.java | 36 +- .../operation/CommandBatchCursor.java | 8 +- .../operation/CursorResourceManager.java | 8 +- .../internal/operation/OperationHelper.java | 2 +- .../change-streams.json | 8 +- .../ChangeStreamBatchCursorTest.java | 337 ++++++++++++++++++ .../com/mongodb/kotlin/client/MongoCursor.kt | 3 + .../ClientSideOperationTimeoutTest.java | 1 + .../client/MongoChangeStreamCursor.java | 6 + .../internal/ChangeStreamIterableImpl.java | 3 +- .../client/internal/MongoCollectionImpl.java | 2 +- .../client/internal/MongoDatabaseImpl.java | 2 +- .../client/internal/MongoIterableImpl.java | 1 + .../ClientSideOperationTimeoutTest.java | 2 - .../client/unified/UnifiedCrudHelper.java | 65 ++-- .../mongodb/client/unified/UnifiedTest.java | 2 + 20 files changed, 517 insertions(+), 51 deletions(-) create mode 100644 driver-core/src/test/unit/com/mongodb/internal/operation/ChangeStreamBatchCursorTest.java diff --git a/build.gradle b/build.gradle index 9b20fb8763d..96e78e583aa 100644 --- a/build.gradle +++ b/build.gradle @@ -257,8 +257,8 @@ configure(javaCodeCheckedProjects) { testImplementation platform('org.spockframework:spock-bom:2.1-groovy-3.0') testImplementation 'org.spockframework:spock-core' testImplementation 'org.spockframework:spock-junit4' - testImplementation("org.mockito:mockito-core:3.8.0") - testImplementation("org.mockito:mockito-inline:3.8.0") + testImplementation("org.mockito:mockito-core:4.0.0") + testImplementation("org.mockito:mockito-inline:4.0.0") testImplementation 'cglib:cglib-nodep:2.2.2' testImplementation 'org.objenesis:objenesis:1.3' testImplementation 'org.hamcrest:hamcrest-all:1.3' diff --git a/driver-core/src/main/com/mongodb/internal/TimeoutContext.java b/driver-core/src/main/com/mongodb/internal/TimeoutContext.java index ee883530a3c..71ba0c47ea4 100644 --- a/driver-core/src/main/com/mongodb/internal/TimeoutContext.java +++ b/driver-core/src/main/com/mongodb/internal/TimeoutContext.java @@ -165,7 +165,7 @@ public TimeoutSettings getTimeoutSettings() { } public long getMaxAwaitTimeMS() { - return hasTimeoutMS() ? 0 : timeoutSettings.getMaxAwaitTimeMS(); + return timeoutSettings.getMaxAwaitTimeMS(); } public long getMaxTimeMS() { diff --git a/driver-core/src/main/com/mongodb/internal/operation/AggregateOperationImpl.java b/driver-core/src/main/com/mongodb/internal/operation/AggregateOperationImpl.java index 0edc474d9da..8373d119d8d 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/AggregateOperationImpl.java +++ b/driver-core/src/main/com/mongodb/internal/operation/AggregateOperationImpl.java @@ -16,9 +16,12 @@ package com.mongodb.internal.operation; +import com.mongodb.CursorType; import com.mongodb.MongoNamespace; import com.mongodb.client.cursor.TimeoutMode; import com.mongodb.client.model.Collation; +import com.mongodb.internal.TimeoutContext; +import com.mongodb.internal.TimeoutSettings; import com.mongodb.internal.async.AsyncBatchCursor; import com.mongodb.internal.async.SingleResultCallback; import com.mongodb.internal.binding.AsyncReadBinding; @@ -68,6 +71,7 @@ class AggregateOperationImpl implements AsyncReadOperation pipeline, final Decoder decoder, final AggregationLevel aggregationLevel) { @@ -146,6 +150,19 @@ AggregateOperationImpl retryReads(final boolean retryReads) { return this; } + /** + * When {@link TimeoutContext#hasTimeoutMS()} then {@link TimeoutSettings#getMaxAwaitTimeMS()} usage in {@code getMore} commands + * depends on the type of cursor. For {@link CursorType#TailableAwait} it is used, for others it is not. + * {@link CursorType#TailableAwait} is used mainly used for change streams in {@link AggregateOperationImpl}. + * + * @param cursorType + * @return this + */ + AggregateOperationImpl cursorType(final CursorType cursorType) { + this.cursorType = cursorType; + return this; + } + boolean getRetryReads() { return retryReads; } @@ -221,13 +238,13 @@ BsonDocument getCommand(final OperationContext operationContext, final int maxWi private CommandReadTransformer> transformer() { return (result, source, connection) -> new CommandBatchCursor<>(getTimeoutMode(), result, batchSize != null ? batchSize : 0, - getMaxTimeForCursor(source.getOperationContext()), decoder, comment, source, connection); + getMaxTimeForCursor(source.getOperationContext().getTimeoutContext()), decoder, comment, source, connection); } private CommandReadTransformerAsync> asyncTransformer() { return (result, source, connection) -> new AsyncCommandBatchCursor<>(getTimeoutMode(), result, batchSize != null ? batchSize : 0, - getMaxTimeForCursor(source.getOperationContext()), decoder, comment, source, connection); + getMaxTimeForCursor(source.getOperationContext().getTimeoutContext()), decoder, comment, source, connection); } private TimeoutMode getTimeoutMode() { @@ -238,8 +255,12 @@ private TimeoutMode getTimeoutMode() { return localTimeoutMode; } - private long getMaxTimeForCursor(final OperationContext operationContext) { - return operationContext.getTimeoutContext().getMaxAwaitTimeMS(); + private long getMaxTimeForCursor(final TimeoutContext timeoutContext) { + long maxAwaitTimeMS = timeoutContext.getMaxAwaitTimeMS(); + if (timeoutContext.hasTimeoutMS()){ + return CursorType.TailableAwait == cursorType ? maxAwaitTimeMS : 0; + } + return maxAwaitTimeMS; } interface AggregateTarget { diff --git a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java index a3c134b720c..bd20b2d5b2b 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java @@ -18,8 +18,10 @@ import com.mongodb.MongoChangeStreamException; import com.mongodb.MongoException; +import com.mongodb.MongoOperationTimeoutException; import com.mongodb.ServerAddress; import com.mongodb.ServerCursor; +import com.mongodb.internal.TimeoutContext; import com.mongodb.internal.binding.ReadBinding; import com.mongodb.lang.Nullable; import org.bson.BsonDocument; @@ -41,22 +43,31 @@ final class ChangeStreamBatchCursor implements AggregateResponseBatchCursor changeStreamOperation; private final int maxWireVersion; - + private final TimeoutContext timeoutContext; private CommandBatchCursor wrapped; private BsonDocument resumeToken; private final AtomicBoolean closed; + /** + * This flag is used to manage change stream resumption logic after a timeout error. + * Indicates whether the last {@code next} call resulted in a {@link MongoOperationTimeoutException}. If {@code true}, + * indicates a timeout occurred, prompting an attempt to resume the change stream on the next call. + */ + private boolean lastOpTimeoutStatus; + ChangeStreamBatchCursor(final ChangeStreamOperation changeStreamOperation, final CommandBatchCursor wrapped, final ReadBinding binding, @Nullable final BsonDocument resumeToken, final int maxWireVersion) { + this.timeoutContext = binding.getOperationContext().getTimeoutContext(); this.changeStreamOperation = changeStreamOperation; this.binding = binding.retain(); this.wrapped = wrapped; this.resumeToken = resumeToken; this.maxWireVersion = maxWireVersion; closed = new AtomicBoolean(); + lastOpTimeoutStatus = false; } CommandBatchCursor getWrapped() { @@ -107,6 +118,7 @@ public List tryNext() { @Override public void close() { if (!closed.getAndSet(true)) { + resetTimeout(); wrapped.close(); binding.release(); } @@ -184,6 +196,19 @@ static List convertAndProduceLastId(final List rawDocume } R resumeableOperation(final Function, R> function) { + resetTimeout(); + try { + resumeAfterTimeout(); + R result = execute(function); + lastOpTimeoutStatus = false; + return result; + } catch (Throwable exception) { + lastOpTimeoutStatus = isTimeoutException(exception); + throw exception; + } + } + + private R execute(final Function, R> function) { while (true) { try { return function.apply(wrapped); @@ -192,6 +217,11 @@ R resumeableOperation(final Function { @@ -200,6 +230,21 @@ R resumeableOperation(final Function) changeStreamOperation.execute(binding)).getWrapped(); binding.release(); // release the new change stream batch cursor's reference to the binding + } + + private void resumeAfterTimeout() { + if (lastOpTimeoutStatus && !closed.get()) { + resumeChangeStream(); + } + } + + private void resetTimeout() { + if (timeoutContext.hasTimeoutMS()) { + timeoutContext.resetTimeout(); } } + + private static boolean isTimeoutException(final Throwable exception) { + return exception instanceof MongoOperationTimeoutException; + } } diff --git a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java index 0afc4c5f1d0..12cc45d8b3f 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java @@ -16,10 +16,12 @@ package com.mongodb.internal.operation; +import com.mongodb.CursorType; import com.mongodb.MongoNamespace; import com.mongodb.client.model.Collation; import com.mongodb.client.model.changestream.FullDocument; import com.mongodb.client.model.changestream.FullDocumentBeforeChange; +import com.mongodb.internal.TimeoutContext; import com.mongodb.internal.async.AsyncBatchCursor; import com.mongodb.internal.async.SingleResultCallback; import com.mongodb.internal.binding.AsyncReadBinding; @@ -42,6 +44,7 @@ import static com.mongodb.assertions.Assertions.assertNotNull; import static com.mongodb.assertions.Assertions.notNull; +import static com.mongodb.client.cursor.TimeoutMode.CURSOR_LIFETIME; /** * An operation that executes an {@code $changeStream} aggregation. @@ -71,7 +74,7 @@ public ChangeStreamOperation(final MongoNamespace namespace, final FullDocument final FullDocumentBeforeChange fullDocumentBeforeChange, final List pipeline, final Decoder decoder, final ChangeStreamLevel changeStreamLevel) { this.wrapped = new AggregateOperationImpl<>(namespace, pipeline, RAW_BSON_DOCUMENT_CODEC, getAggregateTarget(), - getPipelineCreator()); + getPipelineCreator()).cursorType(CursorType.TailableAwait); this.fullDocument = notNull("fullDocument", fullDocument); this.fullDocumentBeforeChange = notNull("fullDocumentBeforeChange", fullDocumentBeforeChange); this.decoder = notNull("decoder", decoder); @@ -167,9 +170,34 @@ public ChangeStreamOperation showExpandedEvents(final boolean showExpandedEve return this; } + /** + * Gets an aggregate operation with consideration for timeout settings. + *

+ * Change streams act similarly to tailable awaitData cursors, with identical timeoutMS option behavior. + * Key distinctions include: + * - The timeoutMS option must be applied at the start of the aggregate operation for change streams. + * - Change streams support resumption on next() calls. The driver handles automatic resumption for transient errors. + *

+ * + * As a result, when {@code timeoutContext.hasTimeoutMS()} the CURSOR_LIFETIME setting is utilized to manage the underlying cursor's + * lifespan in change streams. + * + * @param timeoutContext + * @return An AggregateOperationImpl + */ + private AggregateOperationImpl getAggregateOperation(final TimeoutContext timeoutContext) { + if (timeoutContext.hasTimeoutMS()) { + return wrapped.timeoutMode(CURSOR_LIFETIME); + } + return wrapped; + } + @Override public BatchCursor execute(final ReadBinding binding) { - CommandBatchCursor cursor = (CommandBatchCursor) wrapped.execute(binding); + TimeoutContext timeoutContext = binding.getOperationContext().getTimeoutContext(); + CommandBatchCursor cursor = (CommandBatchCursor) getAggregateOperation(timeoutContext).execute(binding); + cursor.setCloseImmediately(true); + return new ChangeStreamBatchCursor<>(ChangeStreamOperation.this, cursor, binding, setChangeStreamOptions(cursor.getPostBatchResumeToken(), cursor.getOperationTime(), cursor.getMaxWireVersion(), cursor.isFirstBatchEmpty()), cursor.getMaxWireVersion()); @@ -177,11 +205,13 @@ public BatchCursor execute(final ReadBinding binding) { @Override public void executeAsync(final AsyncReadBinding binding, final SingleResultCallback> callback) { - wrapped.executeAsync(binding, (result, t) -> { + TimeoutContext timeoutContext = binding.getOperationContext().getTimeoutContext(); + getAggregateOperation(timeoutContext).executeAsync(binding, (result, t) -> { if (t != null) { callback.onResult(null, t); } else { AsyncCommandBatchCursor cursor = (AsyncCommandBatchCursor) assertNotNull(result); + callback.onResult(new AsyncChangeStreamBatchCursor<>(ChangeStreamOperation.this, cursor, binding, setChangeStreamOptions(cursor.getPostBatchResumeToken(), cursor.getOperationTime(), cursor.getMaxWireVersion(), cursor.isFirstBatchEmpty()), cursor.getMaxWireVersion()), null); diff --git a/driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java index d80803baa14..395e7119eda 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java @@ -24,13 +24,13 @@ import com.mongodb.ServerAddress; import com.mongodb.ServerCursor; import com.mongodb.annotations.ThreadSafe; -import com.mongodb.client.cursor.TimeoutMode; import com.mongodb.connection.ConnectionDescription; import com.mongodb.connection.ServerType; import com.mongodb.internal.VisibleForTesting; import com.mongodb.internal.binding.ConnectionSource; import com.mongodb.internal.connection.Connection; import com.mongodb.internal.connection.OperationContext; +import com.mongodb.client.cursor.TimeoutMode; import com.mongodb.lang.Nullable; import org.bson.BsonDocument; import org.bson.BsonTimestamp; @@ -59,6 +59,9 @@ class CommandBatchCursor implements AggregateResponseBatchCursor { private final MongoNamespace namespace; + /** + * maxAwaitTimeMS + */ private final long maxTimeMS; private final Decoder decoder; @Nullable @@ -258,6 +261,9 @@ private CommandCursorResult toCommandCursorResult(final ServerAddress serverA return commandCursorResult; } + void setCloseImmediately(final boolean closeImmediately) { + this.resourceManager.setCloseImmediately(closeImmediately); + } @ThreadSafe private static final class ResourceManager extends CursorResourceManager { diff --git a/driver-core/src/main/com/mongodb/internal/operation/CursorResourceManager.java b/driver-core/src/main/com/mongodb/internal/operation/CursorResourceManager.java index 48a64081b05..8e692cef357 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/CursorResourceManager.java +++ b/driver-core/src/main/com/mongodb/internal/operation/CursorResourceManager.java @@ -67,6 +67,7 @@ abstract class CursorResourceManager 0) { diff --git a/driver-core/src/test/resources/unified-test-format/client-side-operation-timeout/change-streams.json b/driver-core/src/test/resources/unified-test-format/client-side-operation-timeout/change-streams.json index 6f9abf549ea..b382ef21643 100644 --- a/driver-core/src/test/resources/unified-test-format/client-side-operation-timeout/change-streams.json +++ b/driver-core/src/test/resources/unified-test-format/client-side-operation-timeout/change-streams.json @@ -311,7 +311,7 @@ "object": "collection", "arguments": { "pipeline": [], - "timeoutMS": 20 + "timeoutMS": 100 }, "saveResultAsEntity": "changeStream" }, @@ -331,7 +331,7 @@ "aggregate" ], "blockConnection": true, - "blockTimeMS": 12, + "blockTimeMS": 40, "errorCode": 7, "errorLabels": [ "ResumableChangeStreamError" @@ -535,7 +535,7 @@ "getMore" ], "blockConnection": true, - "blockTimeMS": 15 + "blockTimeMS": 150 } } } @@ -545,7 +545,7 @@ "object": "collection", "arguments": { "pipeline": [], - "timeoutMS": 10 + "timeoutMS": 100 }, "saveResultAsEntity": "changeStream" }, diff --git a/driver-core/src/test/unit/com/mongodb/internal/operation/ChangeStreamBatchCursorTest.java b/driver-core/src/test/unit/com/mongodb/internal/operation/ChangeStreamBatchCursorTest.java new file mode 100644 index 00000000000..93787b45103 --- /dev/null +++ b/driver-core/src/test/unit/com/mongodb/internal/operation/ChangeStreamBatchCursorTest.java @@ -0,0 +1,337 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.mongodb.internal.operation; + +import com.mongodb.MongoException; +import com.mongodb.MongoNotPrimaryException; +import com.mongodb.MongoOperationTimeoutException; +import com.mongodb.ServerAddress; +import com.mongodb.connection.ServerDescription; +import com.mongodb.internal.TimeoutContext; +import com.mongodb.internal.binding.ConnectionSource; +import com.mongodb.internal.binding.ReadBinding; +import com.mongodb.internal.connection.Connection; +import com.mongodb.internal.connection.OperationContext; +import org.bson.BsonDocument; +import org.bson.BsonInt32; +import org.bson.Document; +import org.bson.RawBsonDocument; +import org.bson.codecs.DocumentCodec; +import org.junit.Rule; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; +import org.mockito.quality.Strictness; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static com.mongodb.internal.operation.CommandBatchCursorHelper.MESSAGE_IF_CLOSED_AS_CURSOR; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +final class ChangeStreamBatchCursorTest { + + private static final List RESULT_FROM_NEW_CURSOR = new ArrayList<>(); + @Rule + public MockitoRule rule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS); + private final int maxWireVersion = ServerVersionHelper.SIX_DOT_ZERO_WIRE_VERSION; + private ServerDescription serverDescription; + private TimeoutContext timeoutContext; + private OperationContext operationContext; + private Connection connection; + private ConnectionSource connectionSource; + private ReadBinding readBinding; + private BsonDocument resumeToken; + private CommandBatchCursor commandBatchCursor; + private CommandBatchCursor newCommandBatchCursor; + private ChangeStreamBatchCursor newChangeStreamCursor; + private ChangeStreamOperation changeStreamOperation; + + @Test + @DisplayName("should return result on next") + public void shouldReturnResultOnNext() { + when(commandBatchCursor.next()).thenReturn(RESULT_FROM_NEW_CURSOR); + ChangeStreamBatchCursor cursor = createChangeStreamCursor(); + + //when + List next = cursor.next(); + + //then + assertEquals(RESULT_FROM_NEW_CURSOR, next); + verify(timeoutContext, times(1)).resetTimeout(); + verify(commandBatchCursor, times(1)).next(); + verify(commandBatchCursor, atLeastOnce()).getPostBatchResumeToken(); + verifyNoMoreInteractions(commandBatchCursor); + verify(changeStreamOperation, times(1)).getDecoder(); + verifyNoMoreInteractions(changeStreamOperation); + } + + @Test + @DisplayName("should throw timeout exception without resume attempt on next") + void shouldThrowTimeoutExceptionWithoutResumeAttemptOnNext() { + when(commandBatchCursor.next()).thenThrow(new MongoOperationTimeoutException("timeout")); + ChangeStreamBatchCursor cursor = createChangeStreamCursor(); + //when + assertThrows(MongoOperationTimeoutException.class, cursor::next); + + //then + verify(timeoutContext, times(1)).resetTimeout(); + verify(commandBatchCursor, times(1)).next(); + verify(commandBatchCursor, atLeastOnce()).getPostBatchResumeToken(); + verifyNoMoreInteractions(commandBatchCursor); + verifyNoResumeAttemptCalled(); + } + + @Test + @DisplayName("should perform resume attempt on next when resumable error is thrown") + void shouldPerformResumeAttemptOnNextWhenResumableErrorIsThrown() { + when(commandBatchCursor.next()).thenThrow(new MongoNotPrimaryException(new BsonDocument(), new ServerAddress())); + ChangeStreamBatchCursor cursor = createChangeStreamCursor(); + //when + List next = cursor.next(); + + //then + assertEquals(RESULT_FROM_NEW_CURSOR, next); + verify(timeoutContext, times(1)).resetTimeout(); + verify(commandBatchCursor, times(1)).next(); + verify(commandBatchCursor, atLeastOnce()).getPostBatchResumeToken(); + verifyResumeAttemptCalled(); + verify(changeStreamOperation, times(1)).getDecoder(); + verify(newCommandBatchCursor, times(1)).next(); + verify(newCommandBatchCursor, atLeastOnce()).getPostBatchResumeToken(); + verifyNoMoreInteractions(newCommandBatchCursor); + verifyNoMoreInteractions(changeStreamOperation); + } + + + @Test + @DisplayName("should resume only once on subsequent calls after timeout error") + void shouldResumeOnlyOnceOnSubsequentCallsAfterTimeoutError() { + when(commandBatchCursor.next()).thenThrow(new MongoOperationTimeoutException("timeout")); + ChangeStreamBatchCursor cursor = createChangeStreamCursor(); + //when + assertThrows(MongoOperationTimeoutException.class, cursor::next); + + //then + verify(timeoutContext, times(1)).resetTimeout(); + verify(commandBatchCursor, times(1)).next(); + verify(commandBatchCursor, atLeastOnce()).getPostBatchResumeToken(); + verifyNoMoreInteractions(commandBatchCursor); + verifyNoResumeAttemptCalled(); + clearInvocations(commandBatchCursor, newCommandBatchCursor, timeoutContext, changeStreamOperation, readBinding); + + //when seconds next is called. Resume is attempted. + List next = cursor.next(); + + //then + assertEquals(Collections.emptyList(), next); + verify(timeoutContext, times(1)).resetTimeout(); + verify(commandBatchCursor, times(1)).close(); + verifyNoMoreInteractions(commandBatchCursor); + verify(changeStreamOperation).setChangeStreamOptionsForResume(resumeToken, maxWireVersion); + verify(changeStreamOperation, times(1)).getDecoder(); + verify(changeStreamOperation, times(1)).execute(readBinding); + verifyNoMoreInteractions(changeStreamOperation); + verify(newCommandBatchCursor, times(1)).next(); + verify(newCommandBatchCursor, atLeastOnce()).getPostBatchResumeToken(); + clearInvocations(commandBatchCursor, newCommandBatchCursor, timeoutContext, changeStreamOperation, readBinding); + + //when third next is called. No resume is attempted. + List next2 = cursor.next(); + + //then + assertEquals(Collections.emptyList(), next2); + verifyNoInteractions(commandBatchCursor); + verify(timeoutContext, times(1)).resetTimeout(); + verify(newCommandBatchCursor, times(1)).next(); + verify(newCommandBatchCursor, atLeastOnce()).getPostBatchResumeToken(); + verifyNoMoreInteractions(newCommandBatchCursor); + verify(changeStreamOperation, times(1)).getDecoder(); + verifyNoMoreInteractions(changeStreamOperation); + verifyNoInteractions(readBinding); + verifyNoMoreInteractions(changeStreamOperation); + } + + @Test + @DisplayName("should propagate any errors occurred in aggregate operation during creating new change stream when previous next timed out") + void shouldPropagateAnyErrorsOccurredInAggregateOperation() { + when(commandBatchCursor.next()).thenThrow(new MongoOperationTimeoutException("timeout")); + MongoNotPrimaryException resumableError = new MongoNotPrimaryException(new BsonDocument(), new ServerAddress()); + when(changeStreamOperation.execute(readBinding)).thenThrow(resumableError); + + ChangeStreamBatchCursor cursor = createChangeStreamCursor(); + //when + assertThrows(MongoOperationTimeoutException.class, cursor::next); + clearInvocations(commandBatchCursor, newCommandBatchCursor, timeoutContext, changeStreamOperation, readBinding); + assertThrows(MongoNotPrimaryException.class, cursor::next); + + //then + verify(timeoutContext, times(1)).resetTimeout(); + verifyResumeAttemptCalled(); + verifyNoMoreInteractions(changeStreamOperation); + verifyNoInteractions(newCommandBatchCursor); + } + + + @Test + @DisplayName("should perform a resume attempt in subsequent next call when previous resume attempt in next timed out") + void shouldResumeAfterTimeoutInAggregateOnNextCall() { + //given + ChangeStreamBatchCursor cursor = createChangeStreamCursor(); + + //first next operation times out on getMore + when(commandBatchCursor.next()).thenThrow(new MongoOperationTimeoutException("timeout during next call")); + assertThrows(MongoOperationTimeoutException.class, cursor::next); + clearInvocations(commandBatchCursor, newCommandBatchCursor, timeoutContext, changeStreamOperation, readBinding); + + //second next operation times out on resume attempt when creating change stream + when(changeStreamOperation.execute(readBinding)).thenThrow(new MongoOperationTimeoutException("timeout during resumption")); + assertThrows(MongoOperationTimeoutException.class, cursor::next); + clearInvocations(commandBatchCursor, newCommandBatchCursor, timeoutContext, changeStreamOperation); + + doReturn(newChangeStreamCursor).when(changeStreamOperation).execute(readBinding); + + //when third operation succeeds to resume and call next + List next = cursor.next(); + + //then + assertEquals(RESULT_FROM_NEW_CURSOR, next); + verify(timeoutContext, times(1)).resetTimeout(); + + verifyResumeAttemptCalled(); + verify(changeStreamOperation, times(1)).getDecoder(); + verifyNoMoreInteractions(changeStreamOperation); + + verify(newCommandBatchCursor, times(1)).next(); + verify(newCommandBatchCursor, atLeastOnce()).getPostBatchResumeToken(); + verifyNoMoreInteractions(newCommandBatchCursor); + } + + @Test + @DisplayName("should close change stream when resume operation fails due to non-timeout error") + void shouldCloseChangeStreamWhenResumeOperationFailsDueToNonTimeoutError() { + //given + ChangeStreamBatchCursor cursor = createChangeStreamCursor(); + + //first next operation times out on getMore + when(commandBatchCursor.next()).thenThrow(new MongoOperationTimeoutException("timeout during next call")); + assertThrows(MongoOperationTimeoutException.class, cursor::next); + clearInvocations(commandBatchCursor, newCommandBatchCursor, timeoutContext, changeStreamOperation, readBinding); + + //when second next operation errors on resume attempt when creating change stream + when(changeStreamOperation.execute(readBinding)).thenThrow(new MongoNotPrimaryException(new BsonDocument(), new ServerAddress())); + assertThrows(MongoNotPrimaryException.class, cursor::next); + + //then + verify(timeoutContext, times(1)).resetTimeout(); + verifyResumeAttemptCalled(); + verifyNoMoreInteractions(changeStreamOperation); + verifyNoInteractions(newCommandBatchCursor); + clearInvocations(commandBatchCursor, newCommandBatchCursor, timeoutContext, changeStreamOperation, readBinding); + + + //when third next operation errors with cursor closed exception + doThrow(new IllegalStateException(MESSAGE_IF_CLOSED_AS_CURSOR)).when(commandBatchCursor).next(); + MongoException mongoException = assertThrows(MongoException.class, cursor::next); + + //then + assertEquals(MESSAGE_IF_CLOSED_AS_CURSOR, mongoException.getMessage()); + verify(timeoutContext, times(1)).resetTimeout(); + verifyNoResumeAttemptCalled(); + } + + private ChangeStreamBatchCursor createChangeStreamCursor() { + ChangeStreamBatchCursor cursor = + new ChangeStreamBatchCursor<>(changeStreamOperation, commandBatchCursor, readBinding, null, maxWireVersion); + clearInvocations(commandBatchCursor, newCommandBatchCursor, timeoutContext, changeStreamOperation, readBinding); + return cursor; + } + + private void verifyNoResumeAttemptCalled() { + verifyNoInteractions(changeStreamOperation); + verifyNoInteractions(newCommandBatchCursor); + verifyNoInteractions(readBinding); + } + + + private void verifyResumeAttemptCalled() { + verify(commandBatchCursor, times(1)).close(); + verify(changeStreamOperation).setChangeStreamOptionsForResume(resumeToken, maxWireVersion); + verify(changeStreamOperation, times(1)).execute(readBinding); + verifyNoMoreInteractions(commandBatchCursor); + } + + @BeforeEach + void setUp() { + resumeToken = new BsonDocument("_id", new BsonInt32(1)); + serverDescription = mock(ServerDescription.class); + when(serverDescription.getMaxWireVersion()).thenReturn(maxWireVersion); + + timeoutContext = mock(TimeoutContext.class); + when(timeoutContext.hasTimeoutMS()).thenReturn(true); + doNothing().when(timeoutContext).resetTimeout(); + + operationContext = mock(OperationContext.class); + when(operationContext.getTimeoutContext()).thenReturn(timeoutContext); + connection = mock(Connection.class); + when(connection.command(any(), any(), any(), any(), any(), any())).thenReturn(null); + connectionSource = mock(ConnectionSource.class); + when(connectionSource.getConnection()).thenReturn(connection); + when(connectionSource.release()).thenReturn(1); + when(connectionSource.getServerDescription()).thenReturn(serverDescription); + + readBinding = mock(ReadBinding.class); + when(readBinding.getOperationContext()).thenReturn(operationContext); + when(readBinding.retain()).thenReturn(readBinding); + when(readBinding.release()).thenReturn(1); + when(readBinding.getReadConnectionSource()).thenReturn(connectionSource); + + + commandBatchCursor = mock(CommandBatchCursor.class); + when(commandBatchCursor.getPostBatchResumeToken()).thenReturn(resumeToken); + doNothing().when(commandBatchCursor).close(); + + newCommandBatchCursor = mock(CommandBatchCursor.class); + when(newCommandBatchCursor.getPostBatchResumeToken()).thenReturn(resumeToken); + when(newCommandBatchCursor.next()).thenReturn(RESULT_FROM_NEW_CURSOR); + doNothing().when(newCommandBatchCursor).close(); + + newChangeStreamCursor = mock(ChangeStreamBatchCursor.class); + when(newChangeStreamCursor.getWrapped()).thenReturn(newCommandBatchCursor); + + changeStreamOperation = mock(ChangeStreamOperation.class); + when(changeStreamOperation.getDecoder()).thenReturn(new DocumentCodec()); + doNothing().when(changeStreamOperation).setChangeStreamOptionsForResume(resumeToken, maxWireVersion); + when(changeStreamOperation.execute(readBinding)).thenReturn(newChangeStreamCursor); + } + +} diff --git a/driver-kotlin-sync/src/main/kotlin/com/mongodb/kotlin/client/MongoCursor.kt b/driver-kotlin-sync/src/main/kotlin/com/mongodb/kotlin/client/MongoCursor.kt index 5c757bf5e65..6267f30e4a9 100644 --- a/driver-kotlin-sync/src/main/kotlin/com/mongodb/kotlin/client/MongoCursor.kt +++ b/driver-kotlin-sync/src/main/kotlin/com/mongodb/kotlin/client/MongoCursor.kt @@ -75,6 +75,9 @@ public sealed interface MongoCursor : Iterator, Closeable { * } * } * ``` + * If a [com.mongodb.MongoOperationTimeoutException] occurs before any events are received, it indicates that the server + * has timed out before it could finish processing the existing oplog. In such cases, it is recommended to close the current + * stream and recreate it with a higher timeout setting. * * @param T The type of documents the cursor contains */ diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/ClientSideOperationTimeoutTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/ClientSideOperationTimeoutTest.java index d55c6a78baf..5493264e2c0 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/ClientSideOperationTimeoutTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/ClientSideOperationTimeoutTest.java @@ -52,6 +52,7 @@ public ClientSideOperationTimeoutTest(final String fileDescription, final String final BsonArray initialData, final BsonDocument definition) { super(schemaVersion, runOnRequirements, entities, initialData, definition); this.testDescription = testDescription; + assumeFalse("TODO (CSOT) - JAVA-4054", fileDescription.equals("timeoutMS behaves correctly for change streams")); // Time sensitive - cannot just create a cursor with publishers assumeFalse(testDescription.endsWith("createChangeStream on client")); diff --git a/driver-sync/src/main/com/mongodb/client/MongoChangeStreamCursor.java b/driver-sync/src/main/com/mongodb/client/MongoChangeStreamCursor.java index 38e33c8ae8e..1408b43b7a1 100644 --- a/driver-sync/src/main/com/mongodb/client/MongoChangeStreamCursor.java +++ b/driver-sync/src/main/com/mongodb/client/MongoChangeStreamCursor.java @@ -33,6 +33,12 @@ * } * } * + *

+ * If a {@link com.mongodb.MongoOperationTimeoutException} occurs before any events are received, it indicates that the server + * has timed out before it could finish processing the existing oplog. In such cases, it is recommended to close the current stream + * and recreate it with a higher timeout setting. + *

+ * * @since 3.11 * @param The type of documents the cursor contains */ diff --git a/driver-sync/src/main/com/mongodb/client/internal/ChangeStreamIterableImpl.java b/driver-sync/src/main/com/mongodb/client/internal/ChangeStreamIterableImpl.java index 3a24da53984..51da1c8fd1e 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/ChangeStreamIterableImpl.java +++ b/driver-sync/src/main/com/mongodb/client/internal/ChangeStreamIterableImpl.java @@ -48,7 +48,6 @@ import java.util.concurrent.TimeUnit; import static com.mongodb.assertions.Assertions.notNull; -import static java.util.concurrent.TimeUnit.MILLISECONDS; /** *

This class is not part of the public API and may be removed or changed at any time

@@ -117,7 +116,7 @@ public ChangeStreamIterable batchSize(final int batchSize) { @Override public ChangeStreamIterable maxAwaitTime(final long maxAwaitTime, final TimeUnit timeUnit) { notNull("timeUnit", timeUnit); - this.maxAwaitTimeMS = MILLISECONDS.convert(maxAwaitTime, timeUnit); + this.maxAwaitTimeMS = TimeUnit.MILLISECONDS.convert(maxAwaitTime, timeUnit); return this; } diff --git a/driver-sync/src/main/com/mongodb/client/internal/MongoCollectionImpl.java b/driver-sync/src/main/com/mongodb/client/internal/MongoCollectionImpl.java index ce7f517545d..292537231cb 100755 --- a/driver-sync/src/main/com/mongodb/client/internal/MongoCollectionImpl.java +++ b/driver-sync/src/main/com/mongodb/client/internal/MongoCollectionImpl.java @@ -199,7 +199,7 @@ public MongoCollection withReadConcern(final ReadConcern readConcern) public MongoCollection withTimeout(final long timeout, final TimeUnit timeUnit) { isTrueArgument("timeout >= 0", timeout >= 0); notNull("timeUnit", timeUnit); - TimeoutSettings newTimeoutSettings = timeoutSettings.withTimeoutMS(timeUnit.convert(timeout, TimeUnit.MILLISECONDS)); + TimeoutSettings newTimeoutSettings = timeoutSettings.withTimeoutMS(TimeUnit.MILLISECONDS.convert(timeout, timeUnit)); return new MongoCollectionImpl<>(namespace, documentClass, codecRegistry, readPreference, writeConcern, retryWrites, retryReads, readConcern, uuidRepresentation, autoEncryptionSettings, newTimeoutSettings, executor); } diff --git a/driver-sync/src/main/com/mongodb/client/internal/MongoDatabaseImpl.java b/driver-sync/src/main/com/mongodb/client/internal/MongoDatabaseImpl.java index 80b91796ca0..21c73a2bde0 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/MongoDatabaseImpl.java +++ b/driver-sync/src/main/com/mongodb/client/internal/MongoDatabaseImpl.java @@ -152,7 +152,7 @@ public MongoDatabase withReadConcern(final ReadConcern readConcern) { public MongoDatabase withTimeout(final long timeout, final TimeUnit timeUnit) { isTrueArgument("timeout >= 0", timeout >= 0); notNull("timeUnit", timeUnit); - TimeoutSettings newTimeoutSettings = timeoutSettings.withTimeoutMS(timeUnit.convert(timeout, TimeUnit.MILLISECONDS)); + TimeoutSettings newTimeoutSettings = timeoutSettings.withTimeoutMS(TimeUnit.MILLISECONDS.convert(timeout, timeUnit)); return new MongoDatabaseImpl(name, codecRegistry, readPreference, writeConcern, retryWrites, retryReads, readConcern, uuidRepresentation, autoEncryptionSettings, newTimeoutSettings, executor); } diff --git a/driver-sync/src/main/com/mongodb/client/internal/MongoIterableImpl.java b/driver-sync/src/main/com/mongodb/client/internal/MongoIterableImpl.java index 0e1a46a8dd7..0e6da7b906b 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/MongoIterableImpl.java +++ b/driver-sync/src/main/com/mongodb/client/internal/MongoIterableImpl.java @@ -153,4 +153,5 @@ public > A into(final A target) { private BatchCursor execute() { return getExecutor().execute(asReadOperation(), readPreference, readConcern, clientSession); } + } diff --git a/driver-sync/src/test/functional/com/mongodb/client/ClientSideOperationTimeoutTest.java b/driver-sync/src/test/functional/com/mongodb/client/ClientSideOperationTimeoutTest.java index 2091bbe975f..ddfd80ef1ec 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/ClientSideOperationTimeoutTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/ClientSideOperationTimeoutTest.java @@ -79,13 +79,11 @@ public static void checkSkipCSOTTest(final String fileDescription, final String || testDescription.contains("runCommand on database")); assumeFalse("No count command helper", testDescription.endsWith("count on collection")); assumeFalse("No operation based overrides", fileDescription.equals("timeoutMS can be overridden for an operation")); - assumeFalse("No iterateOnce support", testDescription.equals("timeoutMS is refreshed for getMore if maxAwaitTimeMS is not set")); assumeFalse("TODO (CSOT) - JAVA-5259 No client.withTimeout", testDescription.endsWith("on client")); checkTransactionSessionSupport(fileDescription, testDescription); - assumeFalse("TODO (CSOT) - JAVA-4054", fileDescription.equals("timeoutMS behaves correctly for change streams")); assumeFalse("TODO (CSOT) - JAVA-4052", fileDescription.startsWith("timeoutMS behaves correctly for retryable operations")); assumeFalse("TODO (CSOT) - JAVA-4052", fileDescription.startsWith("legacy timeouts behave correctly for retryable operations")); diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedCrudHelper.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedCrudHelper.java index 33e22167163..a017e350157 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedCrudHelper.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedCrudHelper.java @@ -1556,34 +1556,33 @@ public OperationResult createChangeStreamCursor(final BsonDocument operation) { throw new UnsupportedOperationException("No entity found for id: " + entityName); } - for (Map.Entry cur : arguments.entrySet()) { - switch (cur.getKey()) { - case "batchSize": - iterable.batchSize(cur.getValue().asNumber().intValue()); - break; - case "pipeline": - break; - case "comment": - iterable.comment(cur.getValue()); - break; - case "fullDocument": - iterable.fullDocument(FullDocument.fromString(cur.getValue().asString().getValue())); - break; - case "fullDocumentBeforeChange": - iterable.fullDocumentBeforeChange(FullDocumentBeforeChange.fromString(cur.getValue().asString().getValue())); - break; - case "maxAwaitTimeMS": - iterable.maxAwaitTime(cur.getValue().asNumber().longValue(), TimeUnit.MILLISECONDS); - break; - case "showExpandedEvents": - iterable.showExpandedEvents(cur.getValue().asBoolean().getValue()); - break; - default: - throw new UnsupportedOperationException("Unsupported argument: " + cur.getKey()); - } - } - return resultOf(() -> { + for (Map.Entry cur : arguments.entrySet()) { + switch (cur.getKey()) { + case "batchSize": + iterable.batchSize(cur.getValue().asNumber().intValue()); + break; + case "pipeline": + break; + case "comment": + iterable.comment(cur.getValue()); + break; + case "fullDocument": + iterable.fullDocument(FullDocument.fromString(cur.getValue().asString().getValue())); + break; + case "fullDocumentBeforeChange": + iterable.fullDocumentBeforeChange(FullDocumentBeforeChange.fromString(cur.getValue().asString().getValue())); + break; + case "maxAwaitTimeMS": + iterable.maxAwaitTime(cur.getValue().asNumber().longValue(), TimeUnit.MILLISECONDS); + break; + case "showExpandedEvents": + iterable.showExpandedEvents(cur.getValue().asBoolean().getValue()); + break; + default: + throw new UnsupportedOperationException("Unsupported argument: " + cur.getKey()); + } + } MongoCursor changeStreamWrappingCursor = createChangeStreamWrappingCursor(iterable); entities.addCursor(operation.getString("saveResultAsEntity", new BsonString(createRandomEntityId())).getValue(), changeStreamWrappingCursor); @@ -1602,6 +1601,18 @@ public OperationResult executeIterateUntilDocumentOrError(final BsonDocument ope return resultOf(cursor::next); } + + public OperationResult executeIterateOnce(final BsonDocument operation) { + String id = operation.getString("object").getValue(); + MongoCursor cursor = entities.getCursor(id); + + if (operation.containsKey("arguments")) { + throw new UnsupportedOperationException("Unexpected arguments " + operation.get("arguments")); + } + + return resultOf(cursor::tryNext); + } + public OperationResult close(final BsonDocument operation) { String id = operation.getString("object").getValue(); diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java index 6827ac5e39b..f972d47805a 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java @@ -468,6 +468,8 @@ private OperationResult executeOperation(final UnifiedTestContext context, final return crudHelper.close(operation); case "iterateUntilDocumentOrError": return crudHelper.executeIterateUntilDocumentOrError(operation); + case "iterateOnce": + return crudHelper.executeIterateOnce(operation); case "delete": return gridFSHelper.executeDelete(operation); case "drop": From d8760abb195583ae82f41c16a34d3f6426a4834f Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Wed, 28 Feb 2024 15:13:01 -0800 Subject: [PATCH 02/18] Format code JAVA-4054 --- .../operation/ChangeStreamBatchCursor.java | 14 +++++++------- .../com/mongodb/kotlin/client/MongoCursor.kt | 5 +++-- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java index bd20b2d5b2b..0c79cb708ec 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java @@ -222,14 +222,14 @@ private R execute(final Function { - changeStreamOperation.setChangeStreamOptionsForResume(resumeToken, source.getServerDescription().getMaxWireVersion()); - return null; - }); - wrapped = ((ChangeStreamBatchCursor) changeStreamOperation.execute(binding)).getWrapped(); - binding.release(); // release the new change stream batch cursor's reference to the binding + withReadConnectionSource(binding, source -> { + changeStreamOperation.setChangeStreamOptionsForResume(resumeToken, source.getServerDescription().getMaxWireVersion()); + return null; + }); + wrapped = ((ChangeStreamBatchCursor) changeStreamOperation.execute(binding)).getWrapped(); + binding.release(); // release the new change stream batch cursor's reference to the binding } private void resumeAfterTimeout() { diff --git a/driver-kotlin-sync/src/main/kotlin/com/mongodb/kotlin/client/MongoCursor.kt b/driver-kotlin-sync/src/main/kotlin/com/mongodb/kotlin/client/MongoCursor.kt index 6267f30e4a9..69c755af9c0 100644 --- a/driver-kotlin-sync/src/main/kotlin/com/mongodb/kotlin/client/MongoCursor.kt +++ b/driver-kotlin-sync/src/main/kotlin/com/mongodb/kotlin/client/MongoCursor.kt @@ -75,9 +75,10 @@ public sealed interface MongoCursor : Iterator, Closeable { * } * } * ``` + * * If a [com.mongodb.MongoOperationTimeoutException] occurs before any events are received, it indicates that the server - * has timed out before it could finish processing the existing oplog. In such cases, it is recommended to close the current - * stream and recreate it with a higher timeout setting. + * has timed out before it could finish processing the existing oplog. In such cases, it is recommended to close the + * current stream and recreate it with a higher timeout setting. * * @param T The type of documents the cursor contains */ From 961481e76c948147490689e948e22a39c3bdee7c Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Wed, 28 Feb 2024 20:56:33 -0800 Subject: [PATCH 03/18] Fix test. JAVA-4054 --- .../unit/com/mongodb/internal/TimeoutContextTest.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/driver-core/src/test/unit/com/mongodb/internal/TimeoutContextTest.java b/driver-core/src/test/unit/com/mongodb/internal/TimeoutContextTest.java index 018e11712e0..8770dbe3b24 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/TimeoutContextTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/TimeoutContextTest.java @@ -61,7 +61,6 @@ Collection timeoutContextTest() { assertAll( () -> assertTrue(timeoutContext.hasTimeoutMS()), () -> assertTrue(timeoutContext.getMaxTimeMS() > 0), - () -> assertEquals(0, timeoutContext.getMaxAwaitTimeMS()), () -> assertTrue(timeoutContext.getMaxCommitTimeMS() > 0) ); }), @@ -91,6 +90,13 @@ Collection timeoutContextTest() { () -> assertEquals(0, timeoutContext.getMaxCommitTimeMS()) ); }), + dynamicTest("MaxAwaitTimeMS set with timeoutMS", () -> { + TimeoutContext timeoutContext = + new TimeoutContext(TIMEOUT_SETTINGS_WITH_MAX_AWAIT_TIME.withWTimeoutMS(1L)); + assertAll( + () -> assertEquals(101, timeoutContext.getMaxAwaitTimeMS()) + ); + }), dynamicTest("MaxTimeMS and MaxAwaitTimeMS set", () -> { TimeoutContext timeoutContext = new TimeoutContext(TIMEOUT_SETTINGS_WITH_MAX_TIME_AND_AWAIT_TIME); From 37a99945bb5040003d687dda37e0dd3ae32aa319 Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Wed, 28 Feb 2024 21:38:33 -0800 Subject: [PATCH 04/18] Adjust timeouts in tests. JAVA-4054 --- .../client-side-operation-timeout/change-streams.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/driver-core/src/test/resources/unified-test-format/client-side-operation-timeout/change-streams.json b/driver-core/src/test/resources/unified-test-format/client-side-operation-timeout/change-streams.json index b382ef21643..7cc28dbb05a 100644 --- a/driver-core/src/test/resources/unified-test-format/client-side-operation-timeout/change-streams.json +++ b/driver-core/src/test/resources/unified-test-format/client-side-operation-timeout/change-streams.json @@ -311,7 +311,7 @@ "object": "collection", "arguments": { "pipeline": [], - "timeoutMS": 100 + "timeoutMS": 90 }, "saveResultAsEntity": "changeStream" }, From 728d6114caa4445e4133f1494e8ca26995535856 Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Wed, 28 Feb 2024 22:06:48 -0800 Subject: [PATCH 05/18] Adjust timeouts in tests. JAVA-4054 --- .../client-side-operation-timeout/change-streams.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/driver-core/src/test/resources/unified-test-format/client-side-operation-timeout/change-streams.json b/driver-core/src/test/resources/unified-test-format/client-side-operation-timeout/change-streams.json index 7cc28dbb05a..9cb4db6cf47 100644 --- a/driver-core/src/test/resources/unified-test-format/client-side-operation-timeout/change-streams.json +++ b/driver-core/src/test/resources/unified-test-format/client-side-operation-timeout/change-streams.json @@ -311,7 +311,7 @@ "object": "collection", "arguments": { "pipeline": [], - "timeoutMS": 90 + "timeoutMS": 80 }, "saveResultAsEntity": "changeStream" }, From de200fe57796503d5e24d2e357537ac37d295f58 Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Fri, 1 Mar 2024 14:30:27 -0800 Subject: [PATCH 06/18] Add CSOT to asynchronous change stream cursor. JAVA-4054 --- .../AsyncChangeStreamBatchCursor.java | 15 +- .../operation/AsyncCommandBatchCursor.java | 4 + .../operation/ChangeStreamOperation.java | 1 + .../mongodb/client/test/CollectionHelper.java | 4 + .../connection/TestCommandListener.java | 4 + ...hangeStreamBatchCursorSpecification.groovy | 27 ++ .../ClientSideOperationTimeoutProseTest.java | 289 ++++++++++++++++++ .../client/syncadapter/SyncMongoCursor.java | 66 +++- .../ClientSideOperationTimeoutTest.java | 19 +- ...tClientSideOperationsTimeoutProseTest.java | 5 +- 10 files changed, 425 insertions(+), 9 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncChangeStreamBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncChangeStreamBatchCursor.java index 7e55f05cac5..a5d958880d3 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/AsyncChangeStreamBatchCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncChangeStreamBatchCursor.java @@ -17,6 +17,7 @@ package com.mongodb.internal.operation; import com.mongodb.MongoException; +import com.mongodb.internal.TimeoutContext; import com.mongodb.internal.async.AsyncAggregateResponseBatchCursor; import com.mongodb.internal.async.AsyncBatchCursor; import com.mongodb.internal.async.SingleResultCallback; @@ -42,6 +43,7 @@ final class AsyncChangeStreamBatchCursor implements AsyncAggregateResponseBatchCursor { private final AsyncReadBinding binding; + private final TimeoutContext timeoutContext; private final ChangeStreamOperation changeStreamOperation; private final int maxWireVersion; @@ -63,6 +65,7 @@ final class AsyncChangeStreamBatchCursor implements AsyncAggregateResponseBat this.wrapped = new AtomicReference<>(assertNotNull(wrapped)); this.binding = binding; binding.retain(); + this.timeoutContext = binding.getOperationContext().getTimeoutContext(); this.resumeToken = resumeToken; this.maxWireVersion = maxWireVersion; isClosed = new AtomicBoolean(); @@ -80,6 +83,7 @@ public void next(final SingleResultCallback> callback) { @Override public void close() { + resetTimeout(); if (isClosed.compareAndSet(false, true)) { try { nullifyAndCloseWrapped(); @@ -177,6 +181,7 @@ private interface AsyncBlock { } private void resumeableOperation(final AsyncBlock asyncBlock, final SingleResultCallback> callback, final boolean tryNext) { + resetTimeout(); SingleResultCallback> errHandlingCallback = errorHandlingCallback(callback, LOGGER); if (isClosed()) { errHandlingCallback.onResult(null, new MongoException(format("%s called after the cursor was closed.", @@ -219,12 +224,12 @@ private void retryOperation(final AsyncBlock asyncBlock, final SingleResultCallb changeStreamOperation.setChangeStreamOptionsForResume(resumeToken, assertNotNull(source).getServerDescription().getMaxWireVersion()); source.release(); - changeStreamOperation.executeAsync(binding, (result, t1) -> { + changeStreamOperation.executeAsync(binding, (asyncBatchCursor, t1) -> { if (t1 != null) { callback.onResult(null, t1); } else { try { - setWrappedOrCloseIt(assertNotNull((AsyncChangeStreamBatchCursor) result).getWrapped()); + setWrappedOrCloseIt(assertNotNull((AsyncChangeStreamBatchCursor) asyncBatchCursor).getWrapped()); } finally { try { binding.release(); // release the new change stream batch cursor's reference to the binding @@ -237,4 +242,10 @@ private void retryOperation(final AsyncBlock asyncBlock, final SingleResultCallb } }); } + + private void resetTimeout() { + if (timeoutContext.hasTimeoutMS()) { + timeoutContext.resetTimeout(); + } + } } diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java index 53bda3a505d..11792d09cf2 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java @@ -212,6 +212,10 @@ private CommandCursorResult toCommandCursorResult(final ServerAddress serverA return commandCursorResult; } + void setCloseImmediately(final boolean closeImmediately) { + this.resourceManager.setCloseImmediately(closeImmediately); + } + @ThreadSafe private static final class ResourceManager extends CursorResourceManager { diff --git a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java index 12cc45d8b3f..005a91ff52f 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java @@ -211,6 +211,7 @@ public void executeAsync(final AsyncReadBinding binding, final SingleResultCallb callback.onResult(null, t); } else { AsyncCommandBatchCursor cursor = (AsyncCommandBatchCursor) assertNotNull(result); + cursor.setCloseImmediately(true); callback.onResult(new AsyncChangeStreamBatchCursor<>(ChangeStreamOperation.this, cursor, binding, setChangeStreamOptions(cursor.getPostBatchResumeToken(), cursor.getOperationTime(), diff --git a/driver-core/src/test/functional/com/mongodb/client/test/CollectionHelper.java b/driver-core/src/test/functional/com/mongodb/client/test/CollectionHelper.java index 5be40320040..32703bd39c5 100644 --- a/driver-core/src/test/functional/com/mongodb/client/test/CollectionHelper.java +++ b/driver-core/src/test/functional/com/mongodb/client/test/CollectionHelper.java @@ -176,6 +176,10 @@ public void insertDocuments(final BsonDocument... documents) { insertDocuments(asList(documents)); } + public void insertDocuments(final WriteConcern writeConcern, final BsonDocument... documents) { + insertDocuments(asList(documents), writeConcern); + } + public void insertDocuments(final List documents) { insertDocuments(documents, getBinding()); } diff --git a/driver-core/src/test/functional/com/mongodb/internal/connection/TestCommandListener.java b/driver-core/src/test/functional/com/mongodb/internal/connection/TestCommandListener.java index c02065363f9..c246a75cfcf 100644 --- a/driver-core/src/test/functional/com/mongodb/internal/connection/TestCommandListener.java +++ b/driver-core/src/test/functional/com/mongodb/internal/connection/TestCommandListener.java @@ -144,6 +144,10 @@ public CommandFailedEvent getCommandFailedEvent(final String commandName) { .orElseThrow(() -> new IllegalArgumentException(commandName + " not found in command failed event list")); } + public List getCommandFailedEvents() { + return getEvents(CommandFailedEvent.class, Integer.MAX_VALUE); + } + public List getCommandStartedEvents() { return getEvents(CommandStartedEvent.class, Integer.MAX_VALUE); } diff --git a/driver-core/src/test/unit/com/mongodb/internal/operation/AsyncChangeStreamBatchCursorSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/operation/AsyncChangeStreamBatchCursorSpecification.groovy index 4381e54f2e5..998c0a28b6e 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/operation/AsyncChangeStreamBatchCursorSpecification.groovy +++ b/driver-core/src/test/unit/com/mongodb/internal/operation/AsyncChangeStreamBatchCursorSpecification.groovy @@ -18,8 +18,10 @@ package com.mongodb.internal.operation import com.mongodb.MongoException import com.mongodb.async.FutureResultCallback +import com.mongodb.internal.TimeoutContext import com.mongodb.internal.async.SingleResultCallback import com.mongodb.internal.binding.AsyncReadBinding +import com.mongodb.internal.connection.OperationContext import org.bson.Document import spock.lang.Specification @@ -31,6 +33,12 @@ class AsyncChangeStreamBatchCursorSpecification extends Specification { given: def changeStreamOpertation = Stub(ChangeStreamOperation) def binding = Mock(AsyncReadBinding) + def operationContext = Mock(OperationContext) + def timeoutContext = Mock(TimeoutContext) + binding.getOperationContext() >> operationContext + operationContext.getTimeoutContext() >> timeoutContext + timeoutContext.hasTimeoutMS() >> hasTimeoutMS + def wrapped = Mock(AsyncCommandBatchCursor) def callback = Stub(SingleResultCallback) def cursor = new AsyncChangeStreamBatchCursor(changeStreamOpertation, wrapped, binding, null, @@ -61,11 +69,19 @@ class AsyncChangeStreamBatchCursorSpecification extends Specification { then: 0 * wrapped.close() 0 * binding.release() + + where: + hasTimeoutMS << [true, false] } def 'should not close the cursor in next if the cursor was closed before next completed'() { def changeStreamOpertation = Stub(ChangeStreamOperation) def binding = Mock(AsyncReadBinding) + def operationContext = Mock(OperationContext) + def timeoutContext = Mock(TimeoutContext) + binding.getOperationContext() >> operationContext + operationContext.getTimeoutContext() >> timeoutContext + timeoutContext.hasTimeoutMS() >> hasTimeoutMS def wrapped = Mock(AsyncCommandBatchCursor) def callback = Stub(SingleResultCallback) def cursor = new AsyncChangeStreamBatchCursor(changeStreamOpertation, wrapped, binding, null, @@ -86,11 +102,19 @@ class AsyncChangeStreamBatchCursorSpecification extends Specification { then: cursor.isClosed() + + where: + hasTimeoutMS << [true, false] } def 'should throw a MongoException when next/tryNext is called after the cursor is closed'() { def changeStreamOpertation = Stub(ChangeStreamOperation) def binding = Mock(AsyncReadBinding) + def operationContext = Mock(OperationContext) + def timeoutContext = Mock(TimeoutContext) + binding.getOperationContext() >> operationContext + operationContext.getTimeoutContext() >> timeoutContext + timeoutContext.hasTimeoutMS() >> hasTimeoutMS def wrapped = Mock(AsyncCommandBatchCursor) def cursor = new AsyncChangeStreamBatchCursor(changeStreamOpertation, wrapped, binding, null, ServerVersionHelper.FOUR_DOT_FOUR_WIRE_VERSION) @@ -104,6 +128,9 @@ class AsyncChangeStreamBatchCursorSpecification extends Specification { then: def exception = thrown(MongoException) exception.getMessage() == 'next() called after the cursor was closed.' + + where: + hasTimeoutMS << [true, false] } List nextBatch(AsyncChangeStreamBatchCursor cursor) { diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java index 141c60c6233..f3d033a74cc 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java @@ -18,21 +18,37 @@ import com.mongodb.ClusterFixture; import com.mongodb.MongoClientSettings; +import com.mongodb.MongoCommandException; +import com.mongodb.MongoNamespace; import com.mongodb.MongoOperationTimeoutException; import com.mongodb.MongoSocketReadTimeoutException; +import com.mongodb.ReadPreference; +import com.mongodb.WriteConcern; import com.mongodb.client.AbstractClientSideOperationsTimeoutProseTest; +import com.mongodb.client.model.CreateCollectionOptions; +import com.mongodb.client.model.changestream.FullDocument; import com.mongodb.event.CommandFailedEvent; +import com.mongodb.event.CommandStartedEvent; import com.mongodb.reactivestreams.client.gridfs.GridFSBucket; import com.mongodb.reactivestreams.client.gridfs.GridFSBuckets; import com.mongodb.reactivestreams.client.syncadapter.SyncGridFSBucket; import com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient; +import org.bson.BsonDocument; +import org.bson.BsonTimestamp; +import org.bson.Document; import org.bson.types.ObjectId; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; import reactor.core.publisher.Hooks; +import reactor.test.StepVerifier; import java.nio.ByteBuffer; +import java.time.Duration; +import java.time.Instant; +import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -41,6 +57,8 @@ import static com.mongodb.ClusterFixture.TIMEOUT_DURATION; import static com.mongodb.ClusterFixture.serverVersionAtLeast; +import static com.mongodb.ClusterFixture.sleep; +import static java.util.Collections.singletonList; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -85,6 +103,7 @@ protected boolean isAsync() { @Tag("setsFailPoint") @DisplayName("6. GridFS Upload - uploads via openUploadStream can be timed out") @Test + @Override public void testGridFSUploadViaOpenUploadStreamTimeout() { assumeTrue(serverVersionAtLeast(4, 4)); long rtt = ClusterFixture.getPrimaryRTT(); @@ -141,6 +160,7 @@ public void testGridFSUploadViaOpenUploadStreamTimeout() { @Tag("setsFailPoint") @DisplayName("6. GridFS Upload - Aborting an upload stream can be timed out") @Test + @Override public void testAbortingGridFsUploadStreamTimeout() throws ExecutionException, InterruptedException, TimeoutException { assumeTrue(serverVersionAtLeast(4, 4)); long rtt = ClusterFixture.getPrimaryRTT(); @@ -196,4 +216,273 @@ public void testAbortingGridFsUploadStreamTimeout() throws ExecutionException, I testSubscriber.assertNoTerminalEvent(); } } + + /** + * Not a prose spec test. However, it is additional test case for better coverage. + */ + @Tag("setsFailPoint") + @DisplayName("TimeoutMS applies to full resume attempt in a next call") + @Test + public void testTimeoutMSAppliesToFullResumeAttemptInNextCall() { + assumeTrue(serverVersionAtLeast(4, 4)); + + //given + long rtt = ClusterFixture.getPrimaryRTT(); + try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder() + .timeout(rtt + 500, TimeUnit.MILLISECONDS))) { + + MongoNamespace namespace = generateNamespace(); + MongoCollection collection = client.getDatabase(namespace.getDatabaseName()) + .getCollection(namespace.getCollectionName()).withReadPreference(ReadPreference.primary()); + + collectionHelper.runAdminCommand("{" + + " configureFailPoint: \"failCommand\"," + + " mode: { times: 1}," + + " data: {" + + " failCommands: [\"getMore\" ]," + + " errorCode: 7," + + " errorLabels: [\"ResumableChangeStreamError\" ]" + + " }" + + "}"); + + //when + ChangeStreamPublisher documentChangeStreamPublisher = collection.watch( + singletonList(Document.parse("{ '$match': {'operationType': 'insert'}}"))); + + Assertions.assertThrows(MongoOperationTimeoutException.class, + () -> Flux.from(documentChangeStreamPublisher).blockFirst(TIMEOUT_DURATION)); + //then + sleep(200); //let publisher invalidate the cursor after the error. + List commandStartedEvents = commandListener.getCommandStartedEvents(); + + List expectedCommandNames = Arrays.asList("aggregate", "getMore", "killCursors", "aggregate", "getMore", "killCursors"); + assertCommandStartedEventsInOder(expectedCommandNames, commandStartedEvents); + + List commandFailedEvents = commandListener.getCommandFailedEvents(); + assertEquals(2, commandFailedEvents.size()); + + CommandFailedEvent firstGetMoreFailedEvent = commandFailedEvents.get(0); + assertEquals("getMore", firstGetMoreFailedEvent.getCommandName()); + assertInstanceOf(MongoCommandException.class, firstGetMoreFailedEvent.getThrowable()); + + CommandFailedEvent secondGetMoreFailedEvent = commandFailedEvents.get(1); + assertEquals("getMore", secondGetMoreFailedEvent.getCommandName()); + assertInstanceOf(MongoOperationTimeoutException.class, secondGetMoreFailedEvent.getThrowable()); + } + } + + /** + * Not a prose spec test. However, it is additional test case for better coverage. + */ + @Tag("setsFailPoint") + @DisplayName("TimeoutMS applied to initial aggregate") + @Test + public void testTimeoutMSAppliedToInitialAggregate() { + assumeTrue(serverVersionAtLeast(4, 4)); + + //given + long rtt = ClusterFixture.getPrimaryRTT(); + try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder() + .timeout(rtt + 200, TimeUnit.MILLISECONDS))) { + + MongoNamespace namespace = generateNamespace(); + MongoCollection collection = client.getDatabase(namespace.getDatabaseName()) + .getCollection(namespace.getCollectionName()).withReadPreference(ReadPreference.primary()); + ChangeStreamPublisher documentChangeStreamPublisher = collection.watch( + singletonList(Document.parse("{ '$match': {'operationType': 'insert'}}"))) + .fullDocument(FullDocument.UPDATE_LOOKUP); + + collectionHelper.runAdminCommand("{" + + " configureFailPoint: \"failCommand\"," + + " mode: { times: 1}," + + " data: {" + + " failCommands: [\"aggregate\" ]," + + " blockConnection: true," + + " blockTimeMS: " + (rtt + 201) + + " }" + + "}"); + + //when + Assertions.assertThrows(MongoOperationTimeoutException.class, + () -> Flux.from(documentChangeStreamPublisher).blockFirst(TIMEOUT_DURATION)); + + //We do not expect cursor to have been created. However, publisher closes cursor asynchronously, thus we give it some time + // to make sure that cursor has not been closed (which would indicate that it was created). + sleep(200); + + //then + List commandStartedEvents = commandListener.getCommandStartedEvents(); + assertEquals(1, commandStartedEvents.size()); + assertEquals("aggregate", commandStartedEvents.get(0).getCommandName()); + assertOnlyOneCommandTimeoutFailure("aggregate"); + } + } + + /** + * Not a prose spec test. However, it is additional test case for better coverage. + */ + @Tag("setsFailPoint") + @DisplayName("TimeoutMS is refreshed for getMore if maxAwaitTimeMS is not set") + @Test + public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsNotSet() { + assumeTrue(serverVersionAtLeast(4, 4)); + + //given + BsonTimestamp startTime = new BsonTimestamp((int) Instant.now().getEpochSecond(), 0); + collectionHelper.create(namespace.getCollectionName(), new CreateCollectionOptions()); + sleep(2000); + + + long rtt = ClusterFixture.getPrimaryRTT(); + try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder() + .timeout(1000, TimeUnit.MILLISECONDS))) { + + MongoCollection collection = client.getDatabase(namespace.getDatabaseName()) + .getCollection(namespace.getCollectionName()).withReadPreference(ReadPreference.primary()); + + collectionHelper.runAdminCommand("{" + + " configureFailPoint: \"failCommand\"," + + " mode: { times: 2}," + + " data: {" + + " failCommands: [\"getMore\" ]," + + " blockConnection: true," + + " blockTimeMS: " + (rtt + 800) + + " }" + + "}"); + + //when + ChangeStreamPublisher documentChangeStreamPublisher = collection.watch() + .startAtOperationTime(startTime); + StepVerifier.create(documentChangeStreamPublisher, 2) + //then + .thenAwait(Duration.ofMillis(300)) + .then(() -> collectionHelper.insertDocuments(WriteConcern.MAJORITY, + BsonDocument.parse("{x: 1}"), + BsonDocument.parse("{x: 2}"), + BsonDocument.parse("{x: 3}"), + BsonDocument.parse("{x: 4}"))) + .expectNextCount(2) + .thenAwait(Duration.ofMillis(1000)) + .thenRequest(2) + .expectNextCount(2) + .thenAwait(Duration.ofMillis(1000)) + .thenRequest(2) + .expectError(MongoOperationTimeoutException.class) + .verify(); + + sleep(200); //let publisher invalidate the cursor after the error. + + List commandStartedEvents = commandListener.getCommandStartedEvents(); + List expectedCommandNames = Arrays.asList("aggregate", "getMore", "getMore", "getMore", "killCursors"); + assertCommandStartedEventsInOder(expectedCommandNames, commandStartedEvents); + assertOnlyOneCommandTimeoutFailure("getMore"); + } + } + + /** + * Not a prose spec test. However, it is additional test case for better coverage. + */ + @Tag("setsFailPoint") + @DisplayName("TimeoutMS is refreshed for getMore if maxAwaitTimeMS is set") + @Test + public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsSet() { + assumeTrue(serverVersionAtLeast(4, 4)); + + //given + BsonTimestamp startTime = new BsonTimestamp((int) Instant.now().getEpochSecond(), 0); + collectionHelper.create(namespace.getCollectionName(), new CreateCollectionOptions()); + sleep(2000); + + long rtt = ClusterFixture.getPrimaryRTT(); + try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder() + .timeout(300, TimeUnit.MILLISECONDS))) { + + MongoCollection collection = client.getDatabase(namespace.getDatabaseName()) + .getCollection(namespace.getCollectionName()).withReadPreference(ReadPreference.primary()); + + collectionHelper.runAdminCommand("{" + + " configureFailPoint: \"failCommand\"," + + " mode: { times: 2}," + + " data: {" + + " failCommands: [\"aggregate\", \"getMore\"]," + + " blockConnection: true," + + " blockTimeMS: " + (rtt + 200) + + " }" + + "}"); + + //when + ChangeStreamPublisher documentChangeStreamPublisher = collection.watch() + .maxAwaitTime(1, TimeUnit.MILLISECONDS) + .startAtOperationTime(startTime); + StepVerifier.create(documentChangeStreamPublisher, 2) + //then + .thenAwait(Duration.ofMillis(300)) + .then(() -> collectionHelper.insertDocuments(WriteConcern.MAJORITY, + BsonDocument.parse("{x: 1}"), + BsonDocument.parse("{x: 2}"))) + .thenAwait(Duration.ofMillis(300)) + .thenRequest(2) + .thenCancel() + .verify(); + + sleep(200); //let publisher invalidate the cursor after the error. + + List commandStartedEvents = commandListener.getCommandStartedEvents(); + List expectedCommandNames = Arrays.asList("aggregate", "getMore", "killCursors"); + assertCommandStartedEventsInOder(expectedCommandNames, commandStartedEvents); + } + } + + /** + * Not a prose spec test. However, it is additional test case for better coverage. + */ + @Tag("setsFailPoint") + @DisplayName("TimeoutMS is honored for next operation when several getMore executed internally") + @Test + public void testTimeoutMsISHonoredForNnextOperationWhenSeveralGetMoreExecutedInternally() { + assumeTrue(serverVersionAtLeast(4, 4)); + + //given + long rtt = ClusterFixture.getPrimaryRTT(); + try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder() + .timeout(rtt + 2500, TimeUnit.MILLISECONDS))) { + + MongoCollection collection = client.getDatabase(namespace.getDatabaseName()) + .getCollection(namespace.getCollectionName()).withReadPreference(ReadPreference.primary()); + + //when + ChangeStreamPublisher documentChangeStreamPublisher = collection.watch(); + StepVerifier.create(documentChangeStreamPublisher, 2) + //then + .expectError(MongoOperationTimeoutException.class) + .verify(); + + sleep(200); //let publisher invalidate the cursor after the error. + + List commandStartedEvents = commandListener.getCommandStartedEvents(); + assertCommandStartedEventsInOder(Arrays.asList("aggregate", "getMore", "getMore", "getMore", "killCursors"), + commandStartedEvents); + assertOnlyOneCommandTimeoutFailure("getMore"); + } + } + + private static void assertCommandStartedEventsInOder(final List expectedCommandNames, + final List commandStartedEvents) { + assertEquals(expectedCommandNames.size(), commandStartedEvents.size()); + + for (int i = 0; i < expectedCommandNames.size(); i++) { + CommandStartedEvent commandStartedEvent = commandStartedEvents.get(i); + + assertEquals(expectedCommandNames.get(i), commandStartedEvent.getCommandName()); + } + } + + private void assertOnlyOneCommandTimeoutFailure(final String command) { + List commandFailedEvents = commandListener.getCommandFailedEvents(); + assertEquals(1, commandFailedEvents.size()); + + CommandFailedEvent failedAggregateCommandEvent = commandFailedEvents.get(0); + assertEquals(command, commandFailedEvents.get(0).getCommandName()); + assertInstanceOf(MongoOperationTimeoutException.class, failedAggregateCommandEvent.getThrowable()); + } } diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoCursor.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoCursor.java index 04f79455b43..4c571bbd52c 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoCursor.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoCursor.java @@ -21,16 +21,23 @@ import com.mongodb.ServerCursor; import com.mongodb.client.MongoCursor; import com.mongodb.lang.Nullable; +import com.mongodb.reactivestreams.client.internal.BatchCursor; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import reactor.core.CoreSubscriber; import reactor.core.publisher.Flux; +import reactor.core.publisher.Hooks; +import reactor.core.publisher.Operators; import java.util.NoSuchElementException; import java.util.concurrent.BlockingDeque; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static com.mongodb.ClusterFixture.TIMEOUT; import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException; @@ -42,6 +49,7 @@ class SyncMongoCursor implements MongoCursor { private static final Object COMPLETED = new Object(); private final BlockingDeque results = new LinkedBlockingDeque<>(); + private final CompletableFuture batchCursorCompletableFuture = new CompletableFuture<>(); private final Integer batchSize; private int countToBatchSize; private Subscription subscription; @@ -51,7 +59,11 @@ class SyncMongoCursor implements MongoCursor { SyncMongoCursor(final Publisher publisher, @Nullable final Integer batchSize) { this.batchSize = batchSize; - CountDownLatch latch = new CountDownLatch(1); + CountDownLatch subscriptionLatch = new CountDownLatch(1); + Hooks.onEachOperator(Operators.lift((sc, sub) -> + new BatchCursorInterceptSubscriber(sub, batchCursorCompletableFuture + ))); + //noinspection ReactiveStreamsSubscriberImplementation Flux.from(publisher).contextWrite(CONTEXT).subscribe(new Subscriber() { @Override @@ -62,7 +74,7 @@ public void onSubscribe(final Subscription s) { } else { subscription.request(batchSize); } - latch.countDown(); + subscriptionLatch.countDown(); } @Override @@ -82,12 +94,19 @@ public void onComplete() { } }); try { - if (!latch.await(TIMEOUT, TimeUnit.SECONDS)) { + if (!subscriptionLatch.await(TIMEOUT, TimeUnit.SECONDS)) { throw new MongoTimeoutException("Timeout waiting for subscription"); } + batchCursorCompletableFuture.get(TIMEOUT, TimeUnit.SECONDS); sleep(getSleepAfterCursorOpen()); } catch (InterruptedException e) { throw interruptAndCreateMongoInterruptedException("Interrupted waiting for asynchronous cursor establishment", e); + } catch (ExecutionException | TimeoutException e) { + Throwable cause = e.getCause(); + if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } + throw new RuntimeException(e); } } @@ -184,4 +203,45 @@ private RuntimeException translateError(final Throwable throwable) { } return new RuntimeException(throwable); } + + + static class BatchCursorInterceptSubscriber implements CoreSubscriber { + + private final CoreSubscriber sub; + private final CompletableFuture batchCursorCompletableFuture; + + + BatchCursorInterceptSubscriber(final CoreSubscriber sub, + final CompletableFuture batchCursorCompletableFuture) { + this.sub = sub; + this.batchCursorCompletableFuture = batchCursorCompletableFuture; + } + + @Override + public void onSubscribe(final Subscription s) { + sub.onSubscribe(s); + } + + @Override + public void onNext(final Object o) { + if (o instanceof BatchCursor) { + // Interception of a cursor means that it has been created at this point. + batchCursorCompletableFuture.complete((BatchCursor) o); + } + sub.onNext(o); + } + + @Override + public void onError(final Throwable t) { + if (!batchCursorCompletableFuture.isDone()) { // cursor has not been created yet but an error occurred. + batchCursorCompletableFuture.completeExceptionally(t); + } + sub.onError(t); + } + + @Override + public void onComplete() { + sub.onComplete(); + } + } } diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/ClientSideOperationTimeoutTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/ClientSideOperationTimeoutTest.java index 5493264e2c0..e50a3a9b30e 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/ClientSideOperationTimeoutTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/ClientSideOperationTimeoutTest.java @@ -52,9 +52,24 @@ public ClientSideOperationTimeoutTest(final String fileDescription, final String final BsonArray initialData, final BsonDocument definition) { super(schemaVersion, runOnRequirements, entities, initialData, definition); this.testDescription = testDescription; - assumeFalse("TODO (CSOT) - JAVA-4054", fileDescription.equals("timeoutMS behaves correctly for change streams")); - // Time sensitive - cannot just create a cursor with publishers + + assumeFalse("No iterateOnce support. There is alternative prose test for it.", + testDescription.equals("timeoutMS is refreshed for getMore if maxAwaitTimeMS is not set")); + + assumeFalse("No iterateOnce support. There is alternative prose test for it.", + testDescription.equals("timeoutMS is refreshed for getMore if maxAwaitTimeMS is set")); + /* + The Reactive Streams specification prevents us from allowing a subsequent next call (event in reactive terms) after a timeout error, + conflicting with the CSOT spec requirement not to invalidate the change stream and to try resuming and establishing a new change + stream on the server. We immediately let users know about a timeout error, which then closes the stream/publisher. + */ + assumeFalse("It is not possible due to a conflict with the Reactive Streams specification .", + testDescription.equals("change stream can be iterated again if previous iteration times out")); + + assumeFalse("Flaky and racy due to asynchronous behaviour. There is alternative prose test for it.", + testDescription.equals("timeoutMS applies to full resume attempt in a next call")); + assumeFalse(testDescription.endsWith("createChangeStream on client")); assumeFalse(testDescription.endsWith("createChangeStream on database")); assumeFalse(testDescription.endsWith("createChangeStream on collection")); diff --git a/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java index 69784b4bb6d..43b4897a318 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java @@ -96,7 +96,7 @@ public abstract class AbstractClientSideOperationsTimeoutProseTest { protected static final String GRID_FS_BUCKET_NAME = "db.fs"; private static final AtomicInteger COUNTER = new AtomicInteger(); - private MongoNamespace namespace; + protected MongoNamespace namespace; protected MongoNamespace gridFsFileNamespace; protected MongoNamespace gridFsChunksNamespace; @@ -275,6 +275,7 @@ public void testBlockingIterationMethodsChangeStream() { @Tag("setsFailPoint") @DisplayName("6. GridFS Upload - uploads via openUploadStream can be timed out") + @Test public void testGridFSUploadViaOpenUploadStreamTimeout() { assumeTrue(serverVersionAtLeast(4, 4)); long rtt = ClusterFixture.getPrimaryRTT(); @@ -475,7 +476,7 @@ private static Stream test8ServerSelectionHandshakeArguments() { ); } - private MongoNamespace generateNamespace() { + protected MongoNamespace generateNamespace() { return new MongoNamespace(getDefaultDatabaseName(), getClass().getSimpleName() + "_" + COUNTER.incrementAndGet()); } From da6eaea4dc98cdbb068d1b4086fee1fef38110f4 Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Fri, 1 Mar 2024 15:38:49 -0800 Subject: [PATCH 07/18] Fix tests. JAVA-4054 --- .../reactivestreams/client/syncadapter/SyncMongoCursor.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoCursor.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoCursor.java index 4c571bbd52c..6b1d83e4812 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoCursor.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoCursor.java @@ -29,6 +29,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Hooks; import reactor.core.publisher.Operators; +import reactor.util.context.Context; import java.util.NoSuchElementException; import java.util.concurrent.BlockingDeque; @@ -217,6 +218,11 @@ static class BatchCursorInterceptSubscriber implements CoreSubscriber { this.batchCursorCompletableFuture = batchCursorCompletableFuture; } + @Override + public Context currentContext() { + return sub.currentContext(); + } + @Override public void onSubscribe(final Subscription s) { sub.onSubscribe(s); From 4c44db92b1e0e4e266ec7d9c77ee4d65fd59db55 Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Fri, 1 Mar 2024 16:18:21 -0800 Subject: [PATCH 08/18] Account for RTT. JAVA-4054 --- .../client/ClientSideOperationTimeoutProseTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java index f3d033a74cc..934f939170f 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java @@ -335,7 +335,7 @@ public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsNotSet() { long rtt = ClusterFixture.getPrimaryRTT(); try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder() - .timeout(1000, TimeUnit.MILLISECONDS))) { + .timeout(rtt + 1000, TimeUnit.MILLISECONDS))) { MongoCollection collection = client.getDatabase(namespace.getDatabaseName()) .getCollection(namespace.getCollectionName()).withReadPreference(ReadPreference.primary()); @@ -395,7 +395,7 @@ public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsSet() { long rtt = ClusterFixture.getPrimaryRTT(); try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder() - .timeout(300, TimeUnit.MILLISECONDS))) { + .timeout(rtt + 300, TimeUnit.MILLISECONDS))) { MongoCollection collection = client.getDatabase(namespace.getDatabaseName()) .getCollection(namespace.getCollectionName()).withReadPreference(ReadPreference.primary()); From 4fd4d96e768c83e8b1d056c417076cbac0a64cec Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Fri, 1 Mar 2024 17:02:11 -0800 Subject: [PATCH 09/18] Add assumption to the test. JAVA-4054 --- .../client/ClientSideOperationTimeoutProseTest.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java index 934f939170f..9d5e14c84f5 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java @@ -56,12 +56,15 @@ import java.util.concurrent.TimeoutException; import static com.mongodb.ClusterFixture.TIMEOUT_DURATION; +import static com.mongodb.ClusterFixture.isDiscoverableReplicaSet; +import static com.mongodb.ClusterFixture.isServerlessTest; import static com.mongodb.ClusterFixture.serverVersionAtLeast; import static com.mongodb.ClusterFixture.sleep; import static java.util.Collections.singletonList; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assumptions.assumeFalse; import static org.junit.jupiter.api.Assumptions.assumeTrue; @@ -225,6 +228,8 @@ public void testAbortingGridFsUploadStreamTimeout() throws ExecutionException, I @Test public void testTimeoutMSAppliesToFullResumeAttemptInNextCall() { assumeTrue(serverVersionAtLeast(4, 4)); + assumeTrue(isDiscoverableReplicaSet()); + assumeFalse(isServerlessTest()); //given long rtt = ClusterFixture.getPrimaryRTT(); @@ -279,6 +284,8 @@ public void testTimeoutMSAppliesToFullResumeAttemptInNextCall() { @Test public void testTimeoutMSAppliedToInitialAggregate() { assumeTrue(serverVersionAtLeast(4, 4)); + assumeTrue(isDiscoverableReplicaSet()); + assumeFalse(isServerlessTest()); //given long rtt = ClusterFixture.getPrimaryRTT(); @@ -326,6 +333,8 @@ public void testTimeoutMSAppliedToInitialAggregate() { @Test public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsNotSet() { assumeTrue(serverVersionAtLeast(4, 4)); + assumeTrue(isDiscoverableReplicaSet()); + assumeFalse(isServerlessTest()); //given BsonTimestamp startTime = new BsonTimestamp((int) Instant.now().getEpochSecond(), 0); @@ -387,6 +396,8 @@ public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsNotSet() { @Test public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsSet() { assumeTrue(serverVersionAtLeast(4, 4)); + assumeTrue(isDiscoverableReplicaSet()); + assumeFalse(isServerlessTest()); //given BsonTimestamp startTime = new BsonTimestamp((int) Instant.now().getEpochSecond(), 0); @@ -441,6 +452,8 @@ public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsSet() { @Test public void testTimeoutMsISHonoredForNnextOperationWhenSeveralGetMoreExecutedInternally() { assumeTrue(serverVersionAtLeast(4, 4)); + assumeTrue(isDiscoverableReplicaSet()); + assumeFalse(isServerlessTest()); //given long rtt = ClusterFixture.getPrimaryRTT(); From de6a9b35349cfb62c9eac8e99d753e25fb80001e Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Mon, 4 Mar 2024 12:16:46 -0800 Subject: [PATCH 10/18] Reset onEachOperator hook after tests. JAVA-4054 --- .../reactivestreams/client/syncadapter/SyncMongoCursor.java | 1 + 1 file changed, 1 insertion(+) diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoCursor.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoCursor.java index 6b1d83e4812..651aba29c32 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoCursor.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoCursor.java @@ -99,6 +99,7 @@ public void onComplete() { throw new MongoTimeoutException("Timeout waiting for subscription"); } batchCursorCompletableFuture.get(TIMEOUT, TimeUnit.SECONDS); + Hooks.resetOnEachOperator(); sleep(getSleepAfterCursorOpen()); } catch (InterruptedException e) { throw interruptAndCreateMongoInterruptedException("Interrupted waiting for asynchronous cursor establishment", e); From cb69eb44949e6dde4c9bd4499fff3ee9843d38ce Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Mon, 4 Mar 2024 13:20:17 -0800 Subject: [PATCH 11/18] Improve test error output. JAVA-4054 --- .../client/ClientSideOperationTimeoutProseTest.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java index 9d5e14c84f5..8874ce73dc2 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java @@ -54,6 +54,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; import static com.mongodb.ClusterFixture.TIMEOUT_DURATION; import static com.mongodb.ClusterFixture.isDiscoverableReplicaSet; @@ -353,7 +354,7 @@ public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsNotSet() { + " configureFailPoint: \"failCommand\"," + " mode: { times: 2}," + " data: {" - + " failCommands: [\"getMore\" ]," + + " failCommands: [\"getMore\", \"aggregate\"]," + " blockConnection: true," + " blockTimeMS: " + (rtt + 800) + " }" @@ -364,7 +365,7 @@ public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsNotSet() { .startAtOperationTime(startTime); StepVerifier.create(documentChangeStreamPublisher, 2) //then - .thenAwait(Duration.ofMillis(300)) + .thenAwait(Duration.ofMillis(1000)) .then(() -> collectionHelper.insertDocuments(WriteConcern.MAJORITY, BsonDocument.parse("{x: 1}"), BsonDocument.parse("{x: 2}"), @@ -481,7 +482,8 @@ public void testTimeoutMsISHonoredForNnextOperationWhenSeveralGetMoreExecutedInt private static void assertCommandStartedEventsInOder(final List expectedCommandNames, final List commandStartedEvents) { - assertEquals(expectedCommandNames.size(), commandStartedEvents.size()); + assertEquals(expectedCommandNames.size(), commandStartedEvents.size(), "Expected: " + expectedCommandNames + ". Actual: " + + commandStartedEvents.stream().map(CommandStartedEvent::getCommandName).collect(Collectors.toList())); for (int i = 0; i < expectedCommandNames.size(); i++) { CommandStartedEvent commandStartedEvent = commandStartedEvents.get(i); From 9507840d87fd2ff0744737be6b44938c7adc0d85 Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Mon, 4 Mar 2024 14:15:37 -0800 Subject: [PATCH 12/18] Improve test error output. JAVA-4054 --- .../ClientSideOperationTimeoutProseTest.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java index 8874ce73dc2..6d9b2f2c8eb 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java @@ -345,7 +345,7 @@ public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsNotSet() { long rtt = ClusterFixture.getPrimaryRTT(); try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder() - .timeout(rtt + 1000, TimeUnit.MILLISECONDS))) { + .timeout(rtt + 300, TimeUnit.MILLISECONDS))) { MongoCollection collection = client.getDatabase(namespace.getDatabaseName()) .getCollection(namespace.getCollectionName()).withReadPreference(ReadPreference.primary()); @@ -356,7 +356,7 @@ public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsNotSet() { + " data: {" + " failCommands: [\"getMore\", \"aggregate\"]," + " blockConnection: true," - + " blockTimeMS: " + (rtt + 800) + + " blockTimeMS: " + (rtt + 200) + " }" + "}"); @@ -365,17 +365,17 @@ public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsNotSet() { .startAtOperationTime(startTime); StepVerifier.create(documentChangeStreamPublisher, 2) //then - .thenAwait(Duration.ofMillis(1000)) + .thenAwait(Duration.ofMillis(300)) .then(() -> collectionHelper.insertDocuments(WriteConcern.MAJORITY, BsonDocument.parse("{x: 1}"), BsonDocument.parse("{x: 2}"), BsonDocument.parse("{x: 3}"), BsonDocument.parse("{x: 4}"))) .expectNextCount(2) - .thenAwait(Duration.ofMillis(1000)) + .thenAwait(Duration.ofMillis(300)) .thenRequest(2) .expectNextCount(2) - .thenAwait(Duration.ofMillis(1000)) + .thenAwait(Duration.ofMillis(300)) .thenRequest(2) .expectError(MongoOperationTimeoutException.class) .verify(); @@ -483,7 +483,10 @@ public void testTimeoutMsISHonoredForNnextOperationWhenSeveralGetMoreExecutedInt private static void assertCommandStartedEventsInOder(final List expectedCommandNames, final List commandStartedEvents) { assertEquals(expectedCommandNames.size(), commandStartedEvents.size(), "Expected: " + expectedCommandNames + ". Actual: " + - commandStartedEvents.stream().map(CommandStartedEvent::getCommandName).collect(Collectors.toList())); + commandStartedEvents.stream() + .map(CommandStartedEvent::getCommand) + .map(BsonDocument::toJson) + .collect(Collectors.toList())); for (int i = 0; i < expectedCommandNames.size(); i++) { CommandStartedEvent commandStartedEvent = commandStartedEvents.get(i); From 0a0e606ad6d161d08a2a4b447c9fc43b20f6ddc0 Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Mon, 4 Mar 2024 16:25:54 -0800 Subject: [PATCH 13/18] Change test data order. JAVA-4054 --- .../ClientSideOperationTimeoutProseTest.java | 38 ++++++++++++------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java index 6d9b2f2c8eb..083cca505f3 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java @@ -352,7 +352,7 @@ public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsNotSet() { collectionHelper.runAdminCommand("{" + " configureFailPoint: \"failCommand\"," - + " mode: { times: 2}," + + " mode: { times: 3}," + " data: {" + " failCommands: [\"getMore\", \"aggregate\"]," + " blockConnection: true," @@ -360,17 +360,24 @@ public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsNotSet() { + " }" + "}"); + collectionHelper.insertDocuments(WriteConcern.MAJORITY, + BsonDocument.parse("{x: 1}"), + BsonDocument.parse("{x: 2}"), + + BsonDocument.parse("{x: 3}"), + BsonDocument.parse("{x: 4}"), + + BsonDocument.parse("{x: 5}"), + BsonDocument.parse("{x: 6}")); + //when ChangeStreamPublisher documentChangeStreamPublisher = collection.watch() .startAtOperationTime(startTime); StepVerifier.create(documentChangeStreamPublisher, 2) //then + .expectNextCount(2) .thenAwait(Duration.ofMillis(300)) - .then(() -> collectionHelper.insertDocuments(WriteConcern.MAJORITY, - BsonDocument.parse("{x: 1}"), - BsonDocument.parse("{x: 2}"), - BsonDocument.parse("{x: 3}"), - BsonDocument.parse("{x: 4}"))) + .thenRequest(2) .expectNextCount(2) .thenAwait(Duration.ofMillis(300)) .thenRequest(2) @@ -380,7 +387,7 @@ public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsNotSet() { .expectError(MongoOperationTimeoutException.class) .verify(); - sleep(200); //let publisher invalidate the cursor after the error. + sleep(500); //let publisher invalidate the cursor after the error. List commandStartedEvents = commandListener.getCommandStartedEvents(); List expectedCommandNames = Arrays.asList("aggregate", "getMore", "getMore", "getMore", "killCursors"); @@ -422,22 +429,27 @@ public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsSet() { + " }" + "}"); + + collectionHelper.insertDocuments(WriteConcern.MAJORITY, + BsonDocument.parse("{x: 1}"), + BsonDocument.parse("{x: 2}"), + + BsonDocument.parse("{x: 3}"), + BsonDocument.parse("{x: 4}")); + //when ChangeStreamPublisher documentChangeStreamPublisher = collection.watch() .maxAwaitTime(1, TimeUnit.MILLISECONDS) .startAtOperationTime(startTime); StepVerifier.create(documentChangeStreamPublisher, 2) //then - .thenAwait(Duration.ofMillis(300)) - .then(() -> collectionHelper.insertDocuments(WriteConcern.MAJORITY, - BsonDocument.parse("{x: 1}"), - BsonDocument.parse("{x: 2}"))) - .thenAwait(Duration.ofMillis(300)) + .expectNextCount(2) + .thenAwait(Duration.ofMillis(600)) .thenRequest(2) .thenCancel() .verify(); - sleep(200); //let publisher invalidate the cursor after the error. + sleep(500); //let publisher invalidate the cursor after the error. List commandStartedEvents = commandListener.getCommandStartedEvents(); List expectedCommandNames = Arrays.asList("aggregate", "getMore", "killCursors"); From 161667eb8a2f99101a2704c5a0c5c120ff1bee73 Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Mon, 4 Mar 2024 23:00:25 -0800 Subject: [PATCH 14/18] Remove BatchCursorInterceptor. JAVA-4054 --- .../internal/AggregatePublisherImpl.java | 3 +- .../client/internal/BatchCursorPublisher.java | 13 ++++ .../internal/ChangeStreamPublisherImpl.java | 3 +- .../client/internal/FindPublisherImpl.java | 2 +- .../ClientSideOperationTimeoutProseTest.java | 4 +- .../client/syncadapter/SyncMongoCursor.java | 73 +------------------ .../ClientSideOperationTimeoutTest.java | 4 +- .../internal/AggregateIterableImpl.java | 3 +- .../internal/ChangeStreamIterableImpl.java | 3 +- .../client/internal/FindIterableImpl.java | 2 +- .../client/internal/MongoIterableImpl.java | 13 ++++ 11 files changed, 39 insertions(+), 84 deletions(-) diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/AggregatePublisherImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/AggregatePublisherImpl.java index 73ae8a05a6f..26aabdbe6b2 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/AggregatePublisherImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/AggregatePublisherImpl.java @@ -94,8 +94,7 @@ public AggregatePublisher maxTime(final long maxTime, final TimeUnit timeUnit @Override public AggregatePublisher maxAwaitTime(final long maxAwaitTime, final TimeUnit timeUnit) { - notNull("timeUnit", timeUnit); - this.maxAwaitTimeMS = TimeUnit.MILLISECONDS.convert(maxAwaitTime, timeUnit); + this.maxAwaitTimeMS = validateMaxAwaitTime(maxAwaitTime, timeUnit); return this; } diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/BatchCursorPublisher.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/BatchCursorPublisher.java index a45e691b72e..cf5a9d9f25b 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/BatchCursorPublisher.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/BatchCursorPublisher.java @@ -31,10 +31,12 @@ import org.reactivestreams.Subscriber; import reactor.core.publisher.Mono; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Supplier; import static com.mongodb.assertions.Assertions.assertNotNull; +import static com.mongodb.assertions.Assertions.isTrueArgument; import static com.mongodb.assertions.Assertions.notNull; @VisibleForTesting(otherwise = VisibleForTesting.AccessModifier.PROTECTED) @@ -151,4 +153,15 @@ Mono> batchCursor(final Supplier maxAwaitTimeMS); + + return maxAwaitTimeMS; + } } diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ChangeStreamPublisherImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ChangeStreamPublisherImpl.java index ab7ade7d354..03c54f4f942 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ChangeStreamPublisherImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ChangeStreamPublisherImpl.java @@ -124,8 +124,7 @@ public ChangeStreamPublisher comment(@Nullable final BsonValue comment) { @Override public ChangeStreamPublisher maxAwaitTime(final long maxAwaitTime, final TimeUnit timeUnit) { - notNull("timeUnit", timeUnit); - this.maxAwaitTimeMS = MILLISECONDS.convert(maxAwaitTime, timeUnit); + this.maxAwaitTimeMS = validateMaxAwaitTime(maxAwaitTime, timeUnit); return this; } diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/FindPublisherImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/FindPublisherImpl.java index 069954d15a1..eeddef5449a 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/FindPublisherImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/FindPublisherImpl.java @@ -79,7 +79,7 @@ public FindPublisher maxTime(final long maxTime, final TimeUnit timeUnit) { @Override public FindPublisher maxAwaitTime(final long maxAwaitTime, final TimeUnit timeUnit) { - notNull("timeUnit", timeUnit); + validateMaxAwaitTime(maxAwaitTime, timeUnit); findOptions.maxAwaitTime(maxAwaitTime, timeUnit); return this; } diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java index 083cca505f3..8fcf38c787e 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java @@ -494,8 +494,8 @@ public void testTimeoutMsISHonoredForNnextOperationWhenSeveralGetMoreExecutedInt private static void assertCommandStartedEventsInOder(final List expectedCommandNames, final List commandStartedEvents) { - assertEquals(expectedCommandNames.size(), commandStartedEvents.size(), "Expected: " + expectedCommandNames + ". Actual: " + - commandStartedEvents.stream() + assertEquals(expectedCommandNames.size(), commandStartedEvents.size(), "Expected: " + expectedCommandNames + ". Actual: " + + commandStartedEvents.stream() .map(CommandStartedEvent::getCommand) .map(BsonDocument::toJson) .collect(Collectors.toList())); diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoCursor.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoCursor.java index 651aba29c32..04f79455b43 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoCursor.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoCursor.java @@ -21,24 +21,16 @@ import com.mongodb.ServerCursor; import com.mongodb.client.MongoCursor; import com.mongodb.lang.Nullable; -import com.mongodb.reactivestreams.client.internal.BatchCursor; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; -import reactor.core.CoreSubscriber; import reactor.core.publisher.Flux; -import reactor.core.publisher.Hooks; -import reactor.core.publisher.Operators; -import reactor.util.context.Context; import java.util.NoSuchElementException; import java.util.concurrent.BlockingDeque; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import static com.mongodb.ClusterFixture.TIMEOUT; import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException; @@ -50,7 +42,6 @@ class SyncMongoCursor implements MongoCursor { private static final Object COMPLETED = new Object(); private final BlockingDeque results = new LinkedBlockingDeque<>(); - private final CompletableFuture batchCursorCompletableFuture = new CompletableFuture<>(); private final Integer batchSize; private int countToBatchSize; private Subscription subscription; @@ -60,11 +51,7 @@ class SyncMongoCursor implements MongoCursor { SyncMongoCursor(final Publisher publisher, @Nullable final Integer batchSize) { this.batchSize = batchSize; - CountDownLatch subscriptionLatch = new CountDownLatch(1); - Hooks.onEachOperator(Operators.lift((sc, sub) -> - new BatchCursorInterceptSubscriber(sub, batchCursorCompletableFuture - ))); - + CountDownLatch latch = new CountDownLatch(1); //noinspection ReactiveStreamsSubscriberImplementation Flux.from(publisher).contextWrite(CONTEXT).subscribe(new Subscriber() { @Override @@ -75,7 +62,7 @@ public void onSubscribe(final Subscription s) { } else { subscription.request(batchSize); } - subscriptionLatch.countDown(); + latch.countDown(); } @Override @@ -95,20 +82,12 @@ public void onComplete() { } }); try { - if (!subscriptionLatch.await(TIMEOUT, TimeUnit.SECONDS)) { + if (!latch.await(TIMEOUT, TimeUnit.SECONDS)) { throw new MongoTimeoutException("Timeout waiting for subscription"); } - batchCursorCompletableFuture.get(TIMEOUT, TimeUnit.SECONDS); - Hooks.resetOnEachOperator(); sleep(getSleepAfterCursorOpen()); } catch (InterruptedException e) { throw interruptAndCreateMongoInterruptedException("Interrupted waiting for asynchronous cursor establishment", e); - } catch (ExecutionException | TimeoutException e) { - Throwable cause = e.getCause(); - if (cause instanceof RuntimeException) { - throw (RuntimeException) cause; - } - throw new RuntimeException(e); } } @@ -205,50 +184,4 @@ private RuntimeException translateError(final Throwable throwable) { } return new RuntimeException(throwable); } - - - static class BatchCursorInterceptSubscriber implements CoreSubscriber { - - private final CoreSubscriber sub; - private final CompletableFuture batchCursorCompletableFuture; - - - BatchCursorInterceptSubscriber(final CoreSubscriber sub, - final CompletableFuture batchCursorCompletableFuture) { - this.sub = sub; - this.batchCursorCompletableFuture = batchCursorCompletableFuture; - } - - @Override - public Context currentContext() { - return sub.currentContext(); - } - - @Override - public void onSubscribe(final Subscription s) { - sub.onSubscribe(s); - } - - @Override - public void onNext(final Object o) { - if (o instanceof BatchCursor) { - // Interception of a cursor means that it has been created at this point. - batchCursorCompletableFuture.complete((BatchCursor) o); - } - sub.onNext(o); - } - - @Override - public void onError(final Throwable t) { - if (!batchCursorCompletableFuture.isDone()) { // cursor has not been created yet but an error occurred. - batchCursorCompletableFuture.completeExceptionally(t); - } - sub.onError(t); - } - - @Override - public void onComplete() { - sub.onComplete(); - } - } } diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/ClientSideOperationTimeoutTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/ClientSideOperationTimeoutTest.java index e50a3a9b30e..cdf21cadc8d 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/ClientSideOperationTimeoutTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/ClientSideOperationTimeoutTest.java @@ -56,7 +56,6 @@ public ClientSideOperationTimeoutTest(final String fileDescription, final String assumeFalse("No iterateOnce support. There is alternative prose test for it.", testDescription.equals("timeoutMS is refreshed for getMore if maxAwaitTimeMS is not set")); - assumeFalse("No iterateOnce support. There is alternative prose test for it.", testDescription.equals("timeoutMS is refreshed for getMore if maxAwaitTimeMS is set")); /* @@ -66,9 +65,10 @@ The Reactive Streams specification prevents us from allowing a subsequent next c */ assumeFalse("It is not possible due to a conflict with the Reactive Streams specification .", testDescription.equals("change stream can be iterated again if previous iteration times out")); - assumeFalse("Flaky and racy due to asynchronous behaviour. There is alternative prose test for it.", testDescription.equals("timeoutMS applies to full resume attempt in a next call")); + assumeFalse("No way to catch an error on BarchCursor creation. There is alternative prose test for it.", + testDescription.equals("timeoutMS applied to initial aggregate")); assumeFalse(testDescription.endsWith("createChangeStream on client")); assumeFalse(testDescription.endsWith("createChangeStream on database")); diff --git a/driver-sync/src/main/com/mongodb/client/internal/AggregateIterableImpl.java b/driver-sync/src/main/com/mongodb/client/internal/AggregateIterableImpl.java index ad052738948..4feb06fa9b9 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/AggregateIterableImpl.java +++ b/driver-sync/src/main/com/mongodb/client/internal/AggregateIterableImpl.java @@ -132,8 +132,7 @@ public AggregateIterable maxTime(final long maxTime, final TimeUnit tim @Override public AggregateIterable maxAwaitTime(final long maxAwaitTime, final TimeUnit timeUnit) { - notNull("timeUnit", timeUnit); - this.maxAwaitTimeMS = TimeUnit.MILLISECONDS.convert(maxAwaitTime, timeUnit); + this.maxAwaitTimeMS = validateMaxAwaitTime(maxAwaitTime, timeUnit); return this; } diff --git a/driver-sync/src/main/com/mongodb/client/internal/ChangeStreamIterableImpl.java b/driver-sync/src/main/com/mongodb/client/internal/ChangeStreamIterableImpl.java index 51da1c8fd1e..4b7b3865569 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/ChangeStreamIterableImpl.java +++ b/driver-sync/src/main/com/mongodb/client/internal/ChangeStreamIterableImpl.java @@ -115,8 +115,7 @@ public ChangeStreamIterable batchSize(final int batchSize) { @Override public ChangeStreamIterable maxAwaitTime(final long maxAwaitTime, final TimeUnit timeUnit) { - notNull("timeUnit", timeUnit); - this.maxAwaitTimeMS = TimeUnit.MILLISECONDS.convert(maxAwaitTime, timeUnit); + this.maxAwaitTimeMS = validateMaxAwaitTime(maxAwaitTime, timeUnit); return this; } diff --git a/driver-sync/src/main/com/mongodb/client/internal/FindIterableImpl.java b/driver-sync/src/main/com/mongodb/client/internal/FindIterableImpl.java index 2d6fb54b333..df8b17866ac 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/FindIterableImpl.java +++ b/driver-sync/src/main/com/mongodb/client/internal/FindIterableImpl.java @@ -90,7 +90,7 @@ public FindIterable maxTime(final long maxTime, final TimeUnit timeUnit @Override public FindIterable maxAwaitTime(final long maxAwaitTime, final TimeUnit timeUnit) { - notNull("timeUnit", timeUnit); + validateMaxAwaitTime(maxAwaitTime, timeUnit); findOptions.maxAwaitTime(maxAwaitTime, timeUnit); return this; } diff --git a/driver-sync/src/main/com/mongodb/client/internal/MongoIterableImpl.java b/driver-sync/src/main/com/mongodb/client/internal/MongoIterableImpl.java index 0e6da7b906b..d4b948c07a1 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/MongoIterableImpl.java +++ b/driver-sync/src/main/com/mongodb/client/internal/MongoIterableImpl.java @@ -29,8 +29,10 @@ import com.mongodb.lang.Nullable; import java.util.Collection; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import static com.mongodb.assertions.Assertions.isTrueArgument; import static com.mongodb.assertions.Assertions.notNull; /** @@ -154,4 +156,15 @@ private BatchCursor execute() { return getExecutor().execute(asReadOperation(), readPreference, readConcern, clientSession); } + + protected long validateMaxAwaitTime(final long maxAwaitTime, final TimeUnit timeUnit) { + notNull("timeUnit", timeUnit); + Long timeoutMS = timeoutSettings.getTimeoutMS(); + long maxAwaitTimeMS = TimeUnit.MILLISECONDS.convert(maxAwaitTime, timeUnit); + + isTrueArgument("maxAwaitTimeMS must be less than timeoutMS", timeoutMS == null || timeoutMS == 0 + || timeoutMS > maxAwaitTimeMS); + + return maxAwaitTimeMS; + } } From ecac5444692b216ab122d6b522a5612618488cc1 Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Mon, 4 Mar 2024 23:19:16 -0800 Subject: [PATCH 15/18] Fix checkstyle. JAVA-4054 --- .../client/internal/ChangeStreamPublisherImpl.java | 1 - 1 file changed, 1 deletion(-) diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ChangeStreamPublisherImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ChangeStreamPublisherImpl.java index 03c54f4f942..8fc1a093aab 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ChangeStreamPublisherImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ChangeStreamPublisherImpl.java @@ -41,7 +41,6 @@ import java.util.function.Function; import static com.mongodb.assertions.Assertions.notNull; -import static java.util.concurrent.TimeUnit.MILLISECONDS; final class ChangeStreamPublisherImpl extends BatchCursorPublisher> From 6798ceb3530235a65328aedfe698ab16561c5825 Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Wed, 6 Mar 2024 16:50:53 -0800 Subject: [PATCH 16/18] Add additional javadoc. Remove additional call to resumeChangeStream. JAVA-4054 --- build.gradle | 4 +- .../operation/AsyncCommandBatchCursor.java | 4 +- .../operation/ChangeStreamBatchCursor.java | 40 +++++++++++++------ .../operation/ChangeStreamOperation.java | 4 +- .../operation/CommandBatchCursor.java | 17 +++++--- .../operation/CursorResourceManager.java | 10 ++--- .../ChangeStreamBatchCursorTest.java | 8 +--- 7 files changed, 51 insertions(+), 36 deletions(-) diff --git a/build.gradle b/build.gradle index 96e78e583aa..8a79e98a26e 100644 --- a/build.gradle +++ b/build.gradle @@ -257,8 +257,8 @@ configure(javaCodeCheckedProjects) { testImplementation platform('org.spockframework:spock-bom:2.1-groovy-3.0') testImplementation 'org.spockframework:spock-core' testImplementation 'org.spockframework:spock-junit4' - testImplementation("org.mockito:mockito-core:4.0.0") - testImplementation("org.mockito:mockito-inline:4.0.0") + testImplementation 'org.mockito:mockito-core:4.0.0' + testImplementation 'org.mockito:mockito-inline:4.0.0' testImplementation 'cglib:cglib-nodep:2.2.2' testImplementation 'org.objenesis:objenesis:1.3' testImplementation 'org.hamcrest:hamcrest-all:1.3' diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java index 11792d09cf2..0f29c14dada 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java @@ -212,8 +212,8 @@ private CommandCursorResult toCommandCursorResult(final ServerAddress serverA return commandCursorResult; } - void setCloseImmediately(final boolean closeImmediately) { - this.resourceManager.setCloseImmediately(closeImmediately); + void setCloseWithoutTimeoutReset(final boolean closeWithoutTimeoutReset) { + this.resourceManager.setCloseWithoutTimeoutReset(closeWithoutTimeoutReset); } @ThreadSafe diff --git a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java index 0c79cb708ec..85df6396d0f 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java @@ -39,6 +39,21 @@ import static com.mongodb.internal.operation.ChangeStreamBatchCursorHelper.isResumableError; import static com.mongodb.internal.operation.SyncOperationHelper.withReadConnectionSource; +/** + * A change stream cursor that wraps {@link CommandBatchCursor} with automatic resumption capabilities in the event + * of timeouts or transient errors. + *

+ * Upon encountering a resumable error during {@code hasNext()}, {@code next()}, or {@code tryNext()} calls, the {@link ChangeStreamBatchCursor} + * attempts to establish a new change stream on the server. + *

+ * If an error occurring during any of these method calls is not resumable, it is immediately propagated to the caller, and the {@link ChangeStreamBatchCursor} + * is closed and invalidated on the server. Server errors that occur during this invalidation process are not propagated to the caller. + *

+ * A {@link MongoOperationTimeoutException} does not invalidate the {@link ChangeStreamBatchCursor}, but is immediately propagated to the caller. + * Subsequent method call will attempt to resume operation by establishing a new change stream on the server, without doing {@code getMore} + * request first. + *

+ */ final class ChangeStreamBatchCursor implements AggregateResponseBatchCursor { private final ReadBinding binding; private final ChangeStreamOperation changeStreamOperation; @@ -50,10 +65,10 @@ final class ChangeStreamBatchCursor implements AggregateResponseBatchCursor changeStreamOperation, final CommandBatchCursor wrapped, @@ -67,7 +82,7 @@ final class ChangeStreamBatchCursor implements AggregateResponseBatchCursor getWrapped() { @@ -198,26 +213,29 @@ static List convertAndProduceLastId(final List rawDocume R resumeableOperation(final Function, R> function) { resetTimeout(); try { - resumeAfterTimeout(); R result = execute(function); - lastOpTimeoutStatus = false; + lastOperationTimedOut = false; return result; } catch (Throwable exception) { - lastOpTimeoutStatus = isTimeoutException(exception); + lastOperationTimedOut = isTimeoutException(exception); throw exception; } } private R execute(final Function, R> function) { + boolean shouldBeResumed = hasPreviousNextTimedOut(); while (true) { + if (shouldBeResumed) { + resumeChangeStream(); + } try { return function.apply(wrapped); } catch (Throwable t) { if (!isResumableError(t, maxWireVersion)) { throw MongoException.fromThrowableNonNull(t); } + shouldBeResumed = true; } - resumeChangeStream(); } } @@ -232,10 +250,8 @@ private void resumeChangeStream() { binding.release(); // release the new change stream batch cursor's reference to the binding } - private void resumeAfterTimeout() { - if (lastOpTimeoutStatus && !closed.get()) { - resumeChangeStream(); - } + private boolean hasPreviousNextTimedOut() { + return lastOperationTimedOut && !closed.get(); } private void resetTimeout() { diff --git a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java index 005a91ff52f..6231e98de12 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java @@ -196,7 +196,7 @@ private AggregateOperationImpl getAggregateOperation(final Time public BatchCursor execute(final ReadBinding binding) { TimeoutContext timeoutContext = binding.getOperationContext().getTimeoutContext(); CommandBatchCursor cursor = (CommandBatchCursor) getAggregateOperation(timeoutContext).execute(binding); - cursor.setCloseImmediately(true); + cursor.setCloseWithoutTimeoutReset(true); return new ChangeStreamBatchCursor<>(ChangeStreamOperation.this, cursor, binding, setChangeStreamOptions(cursor.getPostBatchResumeToken(), cursor.getOperationTime(), @@ -211,7 +211,7 @@ public void executeAsync(final AsyncReadBinding binding, final SingleResultCallb callback.onResult(null, t); } else { AsyncCommandBatchCursor cursor = (AsyncCommandBatchCursor) assertNotNull(result); - cursor.setCloseImmediately(true); + cursor.setCloseWithoutTimeoutReset(true); callback.onResult(new AsyncChangeStreamBatchCursor<>(ChangeStreamOperation.this, cursor, binding, setChangeStreamOptions(cursor.getPostBatchResumeToken(), cursor.getOperationTime(), diff --git a/driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java index 395e7119eda..c9582091655 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java @@ -24,13 +24,14 @@ import com.mongodb.ServerAddress; import com.mongodb.ServerCursor; import com.mongodb.annotations.ThreadSafe; +import com.mongodb.client.cursor.TimeoutMode; import com.mongodb.connection.ConnectionDescription; import com.mongodb.connection.ServerType; +import com.mongodb.internal.TimeoutContext; import com.mongodb.internal.VisibleForTesting; import com.mongodb.internal.binding.ConnectionSource; import com.mongodb.internal.connection.Connection; import com.mongodb.internal.connection.OperationContext; -import com.mongodb.client.cursor.TimeoutMode; import com.mongodb.lang.Nullable; import org.bson.BsonDocument; import org.bson.BsonTimestamp; @@ -59,9 +60,6 @@ class CommandBatchCursor implements AggregateResponseBatchCursor { private final MongoNamespace namespace; - /** - * maxAwaitTimeMS - */ private final long maxTimeMS; private final Decoder decoder; @Nullable @@ -261,9 +259,16 @@ private CommandCursorResult toCommandCursorResult(final ServerAddress serverA return commandCursorResult; } - void setCloseImmediately(final boolean closeImmediately) { - this.resourceManager.setCloseImmediately(closeImmediately); + /** + * Configures the cursor's behavior to close without resetting its timeout. If {@code true}, the cursor attempts to close immediately + * without resetting its {@link TimeoutContext#getTimeout()} if present. This is useful when managing the cursor's close behavior externally. + * + * @param closeWithoutTimeoutReset + */ + void setCloseWithoutTimeoutReset(final boolean closeWithoutTimeoutReset) { + this.resourceManager.setCloseWithoutTimeoutReset(closeWithoutTimeoutReset); } + @ThreadSafe private static final class ResourceManager extends CursorResourceManager { diff --git a/driver-core/src/main/com/mongodb/internal/operation/CursorResourceManager.java b/driver-core/src/main/com/mongodb/internal/operation/CursorResourceManager.java index 8e692cef357..93f474ec981 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/CursorResourceManager.java +++ b/driver-core/src/main/com/mongodb/internal/operation/CursorResourceManager.java @@ -67,7 +67,7 @@ abstract class CursorResourceManager RESULT_FROM_NEW_CURSOR = new ArrayList<>(); - @Rule - public MockitoRule rule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS); private final int maxWireVersion = ServerVersionHelper.SIX_DOT_ZERO_WIRE_VERSION; private ServerDescription serverDescription; private TimeoutContext timeoutContext; @@ -78,7 +72,7 @@ final class ChangeStreamBatchCursorTest { @Test @DisplayName("should return result on next") - public void shouldReturnResultOnNext() { + void shouldReturnResultOnNext() { when(commandBatchCursor.next()).thenReturn(RESULT_FROM_NEW_CURSOR); ChangeStreamBatchCursor cursor = createChangeStreamCursor(); From e8f3cf0712f144a8639acf46d06b6d2b817628dd Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Wed, 6 Mar 2024 17:04:40 -0800 Subject: [PATCH 17/18] Fix typo. JAVA-4054 --- .../com/mongodb/internal/operation/AggregateOperationImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/driver-core/src/main/com/mongodb/internal/operation/AggregateOperationImpl.java b/driver-core/src/main/com/mongodb/internal/operation/AggregateOperationImpl.java index 8373d119d8d..1785e343e94 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/AggregateOperationImpl.java +++ b/driver-core/src/main/com/mongodb/internal/operation/AggregateOperationImpl.java @@ -153,7 +153,7 @@ AggregateOperationImpl retryReads(final boolean retryReads) { /** * When {@link TimeoutContext#hasTimeoutMS()} then {@link TimeoutSettings#getMaxAwaitTimeMS()} usage in {@code getMore} commands * depends on the type of cursor. For {@link CursorType#TailableAwait} it is used, for others it is not. - * {@link CursorType#TailableAwait} is used mainly used for change streams in {@link AggregateOperationImpl}. + * {@link CursorType#TailableAwait} is used mainly for change streams in {@link AggregateOperationImpl}. * * @param cursorType * @return this From 84b0971ab21d64d456b2f4a093ba228ef2fc8076 Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Mon, 11 Mar 2024 21:44:57 -0700 Subject: [PATCH 18/18] Add javadoc. JAVA-5322 --- .../main/kotlin/com/mongodb/kotlin/client/MongoCursor.kt | 4 ++++ .../main/com/mongodb/client/MongoChangeStreamCursor.java | 6 +++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/driver-kotlin-sync/src/main/kotlin/com/mongodb/kotlin/client/MongoCursor.kt b/driver-kotlin-sync/src/main/kotlin/com/mongodb/kotlin/client/MongoCursor.kt index 69c755af9c0..32f25fe0fd2 100644 --- a/driver-kotlin-sync/src/main/kotlin/com/mongodb/kotlin/client/MongoCursor.kt +++ b/driver-kotlin-sync/src/main/kotlin/com/mongodb/kotlin/client/MongoCursor.kt @@ -76,6 +76,10 @@ public sealed interface MongoCursor : Iterator, Closeable { * } * ``` * + * A [com.mongodb.MongoOperationTimeoutException] does not invalidate the [MongoChangeStreamCursor], but is immediately + * propagated to the caller. Subsequent method calls will attempt to resume operation by establishing a new change + * stream on the server, without performing a `getMore` request first. + * * If a [com.mongodb.MongoOperationTimeoutException] occurs before any events are received, it indicates that the server * has timed out before it could finish processing the existing oplog. In such cases, it is recommended to close the * current stream and recreate it with a higher timeout setting. diff --git a/driver-sync/src/main/com/mongodb/client/MongoChangeStreamCursor.java b/driver-sync/src/main/com/mongodb/client/MongoChangeStreamCursor.java index 1408b43b7a1..ed58412496d 100644 --- a/driver-sync/src/main/com/mongodb/client/MongoChangeStreamCursor.java +++ b/driver-sync/src/main/com/mongodb/client/MongoChangeStreamCursor.java @@ -33,11 +33,15 @@ * } * } * + * + *

+ * A {@link com.mongodb.MongoOperationTimeoutException} does not invalidate the {@link MongoChangeStreamCursor}, but is immediately + * propagated to the caller. Subsequent method call will attempt to resume operation by establishing a new change stream on the server, + * without doing {@code getMore} request first.

*

* If a {@link com.mongodb.MongoOperationTimeoutException} occurs before any events are received, it indicates that the server * has timed out before it could finish processing the existing oplog. In such cases, it is recommended to close the current stream * and recreate it with a higher timeout setting. - *

* * @since 3.11 * @param The type of documents the cursor contains