diff --git a/build.gradle b/build.gradle index 9b20fb8763d..8a79e98a26e 100644 --- a/build.gradle +++ b/build.gradle @@ -257,8 +257,8 @@ configure(javaCodeCheckedProjects) { testImplementation platform('org.spockframework:spock-bom:2.1-groovy-3.0') testImplementation 'org.spockframework:spock-core' testImplementation 'org.spockframework:spock-junit4' - testImplementation("org.mockito:mockito-core:3.8.0") - testImplementation("org.mockito:mockito-inline:3.8.0") + testImplementation 'org.mockito:mockito-core:4.0.0' + testImplementation 'org.mockito:mockito-inline:4.0.0' testImplementation 'cglib:cglib-nodep:2.2.2' testImplementation 'org.objenesis:objenesis:1.3' testImplementation 'org.hamcrest:hamcrest-all:1.3' diff --git a/driver-core/src/main/com/mongodb/internal/TimeoutContext.java b/driver-core/src/main/com/mongodb/internal/TimeoutContext.java index 7fb706b767e..1651a626c6e 100644 --- a/driver-core/src/main/com/mongodb/internal/TimeoutContext.java +++ b/driver-core/src/main/com/mongodb/internal/TimeoutContext.java @@ -167,7 +167,7 @@ public TimeoutSettings getTimeoutSettings() { } public long getMaxAwaitTimeMS() { - return hasTimeoutMS() ? 0 : timeoutSettings.getMaxAwaitTimeMS(); + return timeoutSettings.getMaxAwaitTimeMS(); } public long getMaxTimeMS() { diff --git a/driver-core/src/main/com/mongodb/internal/operation/AggregateOperationImpl.java b/driver-core/src/main/com/mongodb/internal/operation/AggregateOperationImpl.java index 0edc474d9da..1785e343e94 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/AggregateOperationImpl.java +++ b/driver-core/src/main/com/mongodb/internal/operation/AggregateOperationImpl.java @@ -16,9 +16,12 @@ package com.mongodb.internal.operation; +import com.mongodb.CursorType; import com.mongodb.MongoNamespace; import com.mongodb.client.cursor.TimeoutMode; import com.mongodb.client.model.Collation; +import com.mongodb.internal.TimeoutContext; +import com.mongodb.internal.TimeoutSettings; import com.mongodb.internal.async.AsyncBatchCursor; import com.mongodb.internal.async.SingleResultCallback; import com.mongodb.internal.binding.AsyncReadBinding; @@ -68,6 +71,7 @@ class AggregateOperationImpl implements AsyncReadOperation pipeline, final Decoder decoder, final AggregationLevel aggregationLevel) { @@ -146,6 +150,19 @@ AggregateOperationImpl retryReads(final boolean retryReads) { return this; } + /** + * When {@link TimeoutContext#hasTimeoutMS()} then {@link TimeoutSettings#getMaxAwaitTimeMS()} usage in {@code getMore} commands + * depends on the type of cursor. For {@link CursorType#TailableAwait} it is used, for others it is not. + * {@link CursorType#TailableAwait} is used mainly for change streams in {@link AggregateOperationImpl}. + * + * @param cursorType + * @return this + */ + AggregateOperationImpl cursorType(final CursorType cursorType) { + this.cursorType = cursorType; + return this; + } + boolean getRetryReads() { return retryReads; } @@ -221,13 +238,13 @@ BsonDocument getCommand(final OperationContext operationContext, final int maxWi private CommandReadTransformer> transformer() { return (result, source, connection) -> new CommandBatchCursor<>(getTimeoutMode(), result, batchSize != null ? batchSize : 0, - getMaxTimeForCursor(source.getOperationContext()), decoder, comment, source, connection); + getMaxTimeForCursor(source.getOperationContext().getTimeoutContext()), decoder, comment, source, connection); } private CommandReadTransformerAsync> asyncTransformer() { return (result, source, connection) -> new AsyncCommandBatchCursor<>(getTimeoutMode(), result, batchSize != null ? batchSize : 0, - getMaxTimeForCursor(source.getOperationContext()), decoder, comment, source, connection); + getMaxTimeForCursor(source.getOperationContext().getTimeoutContext()), decoder, comment, source, connection); } private TimeoutMode getTimeoutMode() { @@ -238,8 +255,12 @@ private TimeoutMode getTimeoutMode() { return localTimeoutMode; } - private long getMaxTimeForCursor(final OperationContext operationContext) { - return operationContext.getTimeoutContext().getMaxAwaitTimeMS(); + private long getMaxTimeForCursor(final TimeoutContext timeoutContext) { + long maxAwaitTimeMS = timeoutContext.getMaxAwaitTimeMS(); + if (timeoutContext.hasTimeoutMS()){ + return CursorType.TailableAwait == cursorType ? maxAwaitTimeMS : 0; + } + return maxAwaitTimeMS; } interface AggregateTarget { diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncChangeStreamBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncChangeStreamBatchCursor.java index 7e55f05cac5..a5d958880d3 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/AsyncChangeStreamBatchCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncChangeStreamBatchCursor.java @@ -17,6 +17,7 @@ package com.mongodb.internal.operation; import com.mongodb.MongoException; +import com.mongodb.internal.TimeoutContext; import com.mongodb.internal.async.AsyncAggregateResponseBatchCursor; import com.mongodb.internal.async.AsyncBatchCursor; import com.mongodb.internal.async.SingleResultCallback; @@ -42,6 +43,7 @@ final class AsyncChangeStreamBatchCursor implements AsyncAggregateResponseBatchCursor { private final AsyncReadBinding binding; + private final TimeoutContext timeoutContext; private final ChangeStreamOperation changeStreamOperation; private final int maxWireVersion; @@ -63,6 +65,7 @@ final class AsyncChangeStreamBatchCursor implements AsyncAggregateResponseBat this.wrapped = new AtomicReference<>(assertNotNull(wrapped)); this.binding = binding; binding.retain(); + this.timeoutContext = binding.getOperationContext().getTimeoutContext(); this.resumeToken = resumeToken; this.maxWireVersion = maxWireVersion; isClosed = new AtomicBoolean(); @@ -80,6 +83,7 @@ public void next(final SingleResultCallback> callback) { @Override public void close() { + resetTimeout(); if (isClosed.compareAndSet(false, true)) { try { nullifyAndCloseWrapped(); @@ -177,6 +181,7 @@ private interface AsyncBlock { } private void resumeableOperation(final AsyncBlock asyncBlock, final SingleResultCallback> callback, final boolean tryNext) { + resetTimeout(); SingleResultCallback> errHandlingCallback = errorHandlingCallback(callback, LOGGER); if (isClosed()) { errHandlingCallback.onResult(null, new MongoException(format("%s called after the cursor was closed.", @@ -219,12 +224,12 @@ private void retryOperation(final AsyncBlock asyncBlock, final SingleResultCallb changeStreamOperation.setChangeStreamOptionsForResume(resumeToken, assertNotNull(source).getServerDescription().getMaxWireVersion()); source.release(); - changeStreamOperation.executeAsync(binding, (result, t1) -> { + changeStreamOperation.executeAsync(binding, (asyncBatchCursor, t1) -> { if (t1 != null) { callback.onResult(null, t1); } else { try { - setWrappedOrCloseIt(assertNotNull((AsyncChangeStreamBatchCursor) result).getWrapped()); + setWrappedOrCloseIt(assertNotNull((AsyncChangeStreamBatchCursor) asyncBatchCursor).getWrapped()); } finally { try { binding.release(); // release the new change stream batch cursor's reference to the binding @@ -237,4 +242,10 @@ private void retryOperation(final AsyncBlock asyncBlock, final SingleResultCallb } }); } + + private void resetTimeout() { + if (timeoutContext.hasTimeoutMS()) { + timeoutContext.resetTimeout(); + } + } } diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java index 53bda3a505d..0f29c14dada 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java @@ -212,6 +212,10 @@ private CommandCursorResult toCommandCursorResult(final ServerAddress serverA return commandCursorResult; } + void setCloseWithoutTimeoutReset(final boolean closeWithoutTimeoutReset) { + this.resourceManager.setCloseWithoutTimeoutReset(closeWithoutTimeoutReset); + } + @ThreadSafe private static final class ResourceManager extends CursorResourceManager { diff --git a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java index a3c134b720c..85df6396d0f 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java @@ -18,8 +18,10 @@ import com.mongodb.MongoChangeStreamException; import com.mongodb.MongoException; +import com.mongodb.MongoOperationTimeoutException; import com.mongodb.ServerAddress; import com.mongodb.ServerCursor; +import com.mongodb.internal.TimeoutContext; import com.mongodb.internal.binding.ReadBinding; import com.mongodb.lang.Nullable; import org.bson.BsonDocument; @@ -37,26 +39,50 @@ import static com.mongodb.internal.operation.ChangeStreamBatchCursorHelper.isResumableError; import static com.mongodb.internal.operation.SyncOperationHelper.withReadConnectionSource; +/** + * A change stream cursor that wraps {@link CommandBatchCursor} with automatic resumption capabilities in the event + * of timeouts or transient errors. + *

+ * Upon encountering a resumable error during {@code hasNext()}, {@code next()}, or {@code tryNext()} calls, the {@link ChangeStreamBatchCursor} + * attempts to establish a new change stream on the server. + *

+ * If an error occurring during any of these method calls is not resumable, it is immediately propagated to the caller, and the {@link ChangeStreamBatchCursor} + * is closed and invalidated on the server. Server errors that occur during this invalidation process are not propagated to the caller. + *

+ * A {@link MongoOperationTimeoutException} does not invalidate the {@link ChangeStreamBatchCursor}, but is immediately propagated to the caller. + * Subsequent method call will attempt to resume operation by establishing a new change stream on the server, without doing {@code getMore} + * request first. + *

+ */ final class ChangeStreamBatchCursor implements AggregateResponseBatchCursor { private final ReadBinding binding; private final ChangeStreamOperation changeStreamOperation; private final int maxWireVersion; - + private final TimeoutContext timeoutContext; private CommandBatchCursor wrapped; private BsonDocument resumeToken; private final AtomicBoolean closed; + /** + * This flag is used to manage change stream resumption logic after a timeout error. + * Indicates whether the last {@code hasNext()}, {@code next()}, or {@code tryNext()} call resulted in a {@link MongoOperationTimeoutException}. + * If {@code true}, indicates a timeout occurred, prompting an attempt to resume the change stream on the subsequent call. + */ + private boolean lastOperationTimedOut; + ChangeStreamBatchCursor(final ChangeStreamOperation changeStreamOperation, final CommandBatchCursor wrapped, final ReadBinding binding, @Nullable final BsonDocument resumeToken, final int maxWireVersion) { + this.timeoutContext = binding.getOperationContext().getTimeoutContext(); this.changeStreamOperation = changeStreamOperation; this.binding = binding.retain(); this.wrapped = wrapped; this.resumeToken = resumeToken; this.maxWireVersion = maxWireVersion; closed = new AtomicBoolean(); + lastOperationTimedOut = false; } CommandBatchCursor getWrapped() { @@ -107,6 +133,7 @@ public List tryNext() { @Override public void close() { if (!closed.getAndSet(true)) { + resetTimeout(); wrapped.close(); binding.release(); } @@ -184,22 +211,56 @@ static List convertAndProduceLastId(final List rawDocume } R resumeableOperation(final Function, R> function) { + resetTimeout(); + try { + R result = execute(function); + lastOperationTimedOut = false; + return result; + } catch (Throwable exception) { + lastOperationTimedOut = isTimeoutException(exception); + throw exception; + } + } + + private R execute(final Function, R> function) { + boolean shouldBeResumed = hasPreviousNextTimedOut(); while (true) { + if (shouldBeResumed) { + resumeChangeStream(); + } try { return function.apply(wrapped); } catch (Throwable t) { if (!isResumableError(t, maxWireVersion)) { throw MongoException.fromThrowableNonNull(t); } + shouldBeResumed = true; } - wrapped.close(); + } + } + + private void resumeChangeStream() { + wrapped.close(); + + withReadConnectionSource(binding, source -> { + changeStreamOperation.setChangeStreamOptionsForResume(resumeToken, source.getServerDescription().getMaxWireVersion()); + return null; + }); + wrapped = ((ChangeStreamBatchCursor) changeStreamOperation.execute(binding)).getWrapped(); + binding.release(); // release the new change stream batch cursor's reference to the binding + } + + private boolean hasPreviousNextTimedOut() { + return lastOperationTimedOut && !closed.get(); + } - withReadConnectionSource(binding, source -> { - changeStreamOperation.setChangeStreamOptionsForResume(resumeToken, source.getServerDescription().getMaxWireVersion()); - return null; - }); - wrapped = ((ChangeStreamBatchCursor) changeStreamOperation.execute(binding)).getWrapped(); - binding.release(); // release the new change stream batch cursor's reference to the binding + private void resetTimeout() { + if (timeoutContext.hasTimeoutMS()) { + timeoutContext.resetTimeout(); } } + + private static boolean isTimeoutException(final Throwable exception) { + return exception instanceof MongoOperationTimeoutException; + } } diff --git a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java index 0afc4c5f1d0..6231e98de12 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java @@ -16,10 +16,12 @@ package com.mongodb.internal.operation; +import com.mongodb.CursorType; import com.mongodb.MongoNamespace; import com.mongodb.client.model.Collation; import com.mongodb.client.model.changestream.FullDocument; import com.mongodb.client.model.changestream.FullDocumentBeforeChange; +import com.mongodb.internal.TimeoutContext; import com.mongodb.internal.async.AsyncBatchCursor; import com.mongodb.internal.async.SingleResultCallback; import com.mongodb.internal.binding.AsyncReadBinding; @@ -42,6 +44,7 @@ import static com.mongodb.assertions.Assertions.assertNotNull; import static com.mongodb.assertions.Assertions.notNull; +import static com.mongodb.client.cursor.TimeoutMode.CURSOR_LIFETIME; /** * An operation that executes an {@code $changeStream} aggregation. @@ -71,7 +74,7 @@ public ChangeStreamOperation(final MongoNamespace namespace, final FullDocument final FullDocumentBeforeChange fullDocumentBeforeChange, final List pipeline, final Decoder decoder, final ChangeStreamLevel changeStreamLevel) { this.wrapped = new AggregateOperationImpl<>(namespace, pipeline, RAW_BSON_DOCUMENT_CODEC, getAggregateTarget(), - getPipelineCreator()); + getPipelineCreator()).cursorType(CursorType.TailableAwait); this.fullDocument = notNull("fullDocument", fullDocument); this.fullDocumentBeforeChange = notNull("fullDocumentBeforeChange", fullDocumentBeforeChange); this.decoder = notNull("decoder", decoder); @@ -167,9 +170,34 @@ public ChangeStreamOperation showExpandedEvents(final boolean showExpandedEve return this; } + /** + * Gets an aggregate operation with consideration for timeout settings. + *

+ * Change streams act similarly to tailable awaitData cursors, with identical timeoutMS option behavior. + * Key distinctions include: + * - The timeoutMS option must be applied at the start of the aggregate operation for change streams. + * - Change streams support resumption on next() calls. The driver handles automatic resumption for transient errors. + *

+ * + * As a result, when {@code timeoutContext.hasTimeoutMS()} the CURSOR_LIFETIME setting is utilized to manage the underlying cursor's + * lifespan in change streams. + * + * @param timeoutContext + * @return An AggregateOperationImpl + */ + private AggregateOperationImpl getAggregateOperation(final TimeoutContext timeoutContext) { + if (timeoutContext.hasTimeoutMS()) { + return wrapped.timeoutMode(CURSOR_LIFETIME); + } + return wrapped; + } + @Override public BatchCursor execute(final ReadBinding binding) { - CommandBatchCursor cursor = (CommandBatchCursor) wrapped.execute(binding); + TimeoutContext timeoutContext = binding.getOperationContext().getTimeoutContext(); + CommandBatchCursor cursor = (CommandBatchCursor) getAggregateOperation(timeoutContext).execute(binding); + cursor.setCloseWithoutTimeoutReset(true); + return new ChangeStreamBatchCursor<>(ChangeStreamOperation.this, cursor, binding, setChangeStreamOptions(cursor.getPostBatchResumeToken(), cursor.getOperationTime(), cursor.getMaxWireVersion(), cursor.isFirstBatchEmpty()), cursor.getMaxWireVersion()); @@ -177,11 +205,14 @@ public BatchCursor execute(final ReadBinding binding) { @Override public void executeAsync(final AsyncReadBinding binding, final SingleResultCallback> callback) { - wrapped.executeAsync(binding, (result, t) -> { + TimeoutContext timeoutContext = binding.getOperationContext().getTimeoutContext(); + getAggregateOperation(timeoutContext).executeAsync(binding, (result, t) -> { if (t != null) { callback.onResult(null, t); } else { AsyncCommandBatchCursor cursor = (AsyncCommandBatchCursor) assertNotNull(result); + cursor.setCloseWithoutTimeoutReset(true); + callback.onResult(new AsyncChangeStreamBatchCursor<>(ChangeStreamOperation.this, cursor, binding, setChangeStreamOptions(cursor.getPostBatchResumeToken(), cursor.getOperationTime(), cursor.getMaxWireVersion(), cursor.isFirstBatchEmpty()), cursor.getMaxWireVersion()), null); diff --git a/driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java index d80803baa14..c9582091655 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java @@ -27,6 +27,7 @@ import com.mongodb.client.cursor.TimeoutMode; import com.mongodb.connection.ConnectionDescription; import com.mongodb.connection.ServerType; +import com.mongodb.internal.TimeoutContext; import com.mongodb.internal.VisibleForTesting; import com.mongodb.internal.binding.ConnectionSource; import com.mongodb.internal.connection.Connection; @@ -258,6 +259,16 @@ private CommandCursorResult toCommandCursorResult(final ServerAddress serverA return commandCursorResult; } + /** + * Configures the cursor's behavior to close without resetting its timeout. If {@code true}, the cursor attempts to close immediately + * without resetting its {@link TimeoutContext#getTimeout()} if present. This is useful when managing the cursor's close behavior externally. + * + * @param closeWithoutTimeoutReset + */ + void setCloseWithoutTimeoutReset(final boolean closeWithoutTimeoutReset) { + this.resourceManager.setCloseWithoutTimeoutReset(closeWithoutTimeoutReset); + } + @ThreadSafe private static final class ResourceManager extends CursorResourceManager { diff --git a/driver-core/src/main/com/mongodb/internal/operation/CursorResourceManager.java b/driver-core/src/main/com/mongodb/internal/operation/CursorResourceManager.java index 48a64081b05..93f474ec981 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/CursorResourceManager.java +++ b/driver-core/src/main/com/mongodb/internal/operation/CursorResourceManager.java @@ -67,6 +67,7 @@ abstract class CursorResourceManager 0) { diff --git a/driver-core/src/test/functional/com/mongodb/client/test/CollectionHelper.java b/driver-core/src/test/functional/com/mongodb/client/test/CollectionHelper.java index 5be40320040..32703bd39c5 100644 --- a/driver-core/src/test/functional/com/mongodb/client/test/CollectionHelper.java +++ b/driver-core/src/test/functional/com/mongodb/client/test/CollectionHelper.java @@ -176,6 +176,10 @@ public void insertDocuments(final BsonDocument... documents) { insertDocuments(asList(documents)); } + public void insertDocuments(final WriteConcern writeConcern, final BsonDocument... documents) { + insertDocuments(asList(documents), writeConcern); + } + public void insertDocuments(final List documents) { insertDocuments(documents, getBinding()); } diff --git a/driver-core/src/test/functional/com/mongodb/internal/connection/TestCommandListener.java b/driver-core/src/test/functional/com/mongodb/internal/connection/TestCommandListener.java index c02065363f9..c246a75cfcf 100644 --- a/driver-core/src/test/functional/com/mongodb/internal/connection/TestCommandListener.java +++ b/driver-core/src/test/functional/com/mongodb/internal/connection/TestCommandListener.java @@ -144,6 +144,10 @@ public CommandFailedEvent getCommandFailedEvent(final String commandName) { .orElseThrow(() -> new IllegalArgumentException(commandName + " not found in command failed event list")); } + public List getCommandFailedEvents() { + return getEvents(CommandFailedEvent.class, Integer.MAX_VALUE); + } + public List getCommandStartedEvents() { return getEvents(CommandStartedEvent.class, Integer.MAX_VALUE); } diff --git a/driver-core/src/test/resources/unified-test-format/client-side-operation-timeout/change-streams.json b/driver-core/src/test/resources/unified-test-format/client-side-operation-timeout/change-streams.json index 6f9abf549ea..9cb4db6cf47 100644 --- a/driver-core/src/test/resources/unified-test-format/client-side-operation-timeout/change-streams.json +++ b/driver-core/src/test/resources/unified-test-format/client-side-operation-timeout/change-streams.json @@ -311,7 +311,7 @@ "object": "collection", "arguments": { "pipeline": [], - "timeoutMS": 20 + "timeoutMS": 80 }, "saveResultAsEntity": "changeStream" }, @@ -331,7 +331,7 @@ "aggregate" ], "blockConnection": true, - "blockTimeMS": 12, + "blockTimeMS": 40, "errorCode": 7, "errorLabels": [ "ResumableChangeStreamError" @@ -535,7 +535,7 @@ "getMore" ], "blockConnection": true, - "blockTimeMS": 15 + "blockTimeMS": 150 } } } @@ -545,7 +545,7 @@ "object": "collection", "arguments": { "pipeline": [], - "timeoutMS": 10 + "timeoutMS": 100 }, "saveResultAsEntity": "changeStream" }, diff --git a/driver-core/src/test/unit/com/mongodb/internal/TimeoutContextTest.java b/driver-core/src/test/unit/com/mongodb/internal/TimeoutContextTest.java index 018e11712e0..8770dbe3b24 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/TimeoutContextTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/TimeoutContextTest.java @@ -61,7 +61,6 @@ Collection timeoutContextTest() { assertAll( () -> assertTrue(timeoutContext.hasTimeoutMS()), () -> assertTrue(timeoutContext.getMaxTimeMS() > 0), - () -> assertEquals(0, timeoutContext.getMaxAwaitTimeMS()), () -> assertTrue(timeoutContext.getMaxCommitTimeMS() > 0) ); }), @@ -91,6 +90,13 @@ Collection timeoutContextTest() { () -> assertEquals(0, timeoutContext.getMaxCommitTimeMS()) ); }), + dynamicTest("MaxAwaitTimeMS set with timeoutMS", () -> { + TimeoutContext timeoutContext = + new TimeoutContext(TIMEOUT_SETTINGS_WITH_MAX_AWAIT_TIME.withWTimeoutMS(1L)); + assertAll( + () -> assertEquals(101, timeoutContext.getMaxAwaitTimeMS()) + ); + }), dynamicTest("MaxTimeMS and MaxAwaitTimeMS set", () -> { TimeoutContext timeoutContext = new TimeoutContext(TIMEOUT_SETTINGS_WITH_MAX_TIME_AND_AWAIT_TIME); diff --git a/driver-core/src/test/unit/com/mongodb/internal/operation/AsyncChangeStreamBatchCursorSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/operation/AsyncChangeStreamBatchCursorSpecification.groovy index 4381e54f2e5..998c0a28b6e 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/operation/AsyncChangeStreamBatchCursorSpecification.groovy +++ b/driver-core/src/test/unit/com/mongodb/internal/operation/AsyncChangeStreamBatchCursorSpecification.groovy @@ -18,8 +18,10 @@ package com.mongodb.internal.operation import com.mongodb.MongoException import com.mongodb.async.FutureResultCallback +import com.mongodb.internal.TimeoutContext import com.mongodb.internal.async.SingleResultCallback import com.mongodb.internal.binding.AsyncReadBinding +import com.mongodb.internal.connection.OperationContext import org.bson.Document import spock.lang.Specification @@ -31,6 +33,12 @@ class AsyncChangeStreamBatchCursorSpecification extends Specification { given: def changeStreamOpertation = Stub(ChangeStreamOperation) def binding = Mock(AsyncReadBinding) + def operationContext = Mock(OperationContext) + def timeoutContext = Mock(TimeoutContext) + binding.getOperationContext() >> operationContext + operationContext.getTimeoutContext() >> timeoutContext + timeoutContext.hasTimeoutMS() >> hasTimeoutMS + def wrapped = Mock(AsyncCommandBatchCursor) def callback = Stub(SingleResultCallback) def cursor = new AsyncChangeStreamBatchCursor(changeStreamOpertation, wrapped, binding, null, @@ -61,11 +69,19 @@ class AsyncChangeStreamBatchCursorSpecification extends Specification { then: 0 * wrapped.close() 0 * binding.release() + + where: + hasTimeoutMS << [true, false] } def 'should not close the cursor in next if the cursor was closed before next completed'() { def changeStreamOpertation = Stub(ChangeStreamOperation) def binding = Mock(AsyncReadBinding) + def operationContext = Mock(OperationContext) + def timeoutContext = Mock(TimeoutContext) + binding.getOperationContext() >> operationContext + operationContext.getTimeoutContext() >> timeoutContext + timeoutContext.hasTimeoutMS() >> hasTimeoutMS def wrapped = Mock(AsyncCommandBatchCursor) def callback = Stub(SingleResultCallback) def cursor = new AsyncChangeStreamBatchCursor(changeStreamOpertation, wrapped, binding, null, @@ -86,11 +102,19 @@ class AsyncChangeStreamBatchCursorSpecification extends Specification { then: cursor.isClosed() + + where: + hasTimeoutMS << [true, false] } def 'should throw a MongoException when next/tryNext is called after the cursor is closed'() { def changeStreamOpertation = Stub(ChangeStreamOperation) def binding = Mock(AsyncReadBinding) + def operationContext = Mock(OperationContext) + def timeoutContext = Mock(TimeoutContext) + binding.getOperationContext() >> operationContext + operationContext.getTimeoutContext() >> timeoutContext + timeoutContext.hasTimeoutMS() >> hasTimeoutMS def wrapped = Mock(AsyncCommandBatchCursor) def cursor = new AsyncChangeStreamBatchCursor(changeStreamOpertation, wrapped, binding, null, ServerVersionHelper.FOUR_DOT_FOUR_WIRE_VERSION) @@ -104,6 +128,9 @@ class AsyncChangeStreamBatchCursorSpecification extends Specification { then: def exception = thrown(MongoException) exception.getMessage() == 'next() called after the cursor was closed.' + + where: + hasTimeoutMS << [true, false] } List nextBatch(AsyncChangeStreamBatchCursor cursor) { diff --git a/driver-core/src/test/unit/com/mongodb/internal/operation/ChangeStreamBatchCursorTest.java b/driver-core/src/test/unit/com/mongodb/internal/operation/ChangeStreamBatchCursorTest.java new file mode 100644 index 00000000000..63ba9214663 --- /dev/null +++ b/driver-core/src/test/unit/com/mongodb/internal/operation/ChangeStreamBatchCursorTest.java @@ -0,0 +1,331 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.mongodb.internal.operation; + +import com.mongodb.MongoException; +import com.mongodb.MongoNotPrimaryException; +import com.mongodb.MongoOperationTimeoutException; +import com.mongodb.ServerAddress; +import com.mongodb.connection.ServerDescription; +import com.mongodb.internal.TimeoutContext; +import com.mongodb.internal.binding.ConnectionSource; +import com.mongodb.internal.binding.ReadBinding; +import com.mongodb.internal.connection.Connection; +import com.mongodb.internal.connection.OperationContext; +import org.bson.BsonDocument; +import org.bson.BsonInt32; +import org.bson.Document; +import org.bson.RawBsonDocument; +import org.bson.codecs.DocumentCodec; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static com.mongodb.internal.operation.CommandBatchCursorHelper.MESSAGE_IF_CLOSED_AS_CURSOR; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +final class ChangeStreamBatchCursorTest { + + private static final List RESULT_FROM_NEW_CURSOR = new ArrayList<>(); + private final int maxWireVersion = ServerVersionHelper.SIX_DOT_ZERO_WIRE_VERSION; + private ServerDescription serverDescription; + private TimeoutContext timeoutContext; + private OperationContext operationContext; + private Connection connection; + private ConnectionSource connectionSource; + private ReadBinding readBinding; + private BsonDocument resumeToken; + private CommandBatchCursor commandBatchCursor; + private CommandBatchCursor newCommandBatchCursor; + private ChangeStreamBatchCursor newChangeStreamCursor; + private ChangeStreamOperation changeStreamOperation; + + @Test + @DisplayName("should return result on next") + void shouldReturnResultOnNext() { + when(commandBatchCursor.next()).thenReturn(RESULT_FROM_NEW_CURSOR); + ChangeStreamBatchCursor cursor = createChangeStreamCursor(); + + //when + List next = cursor.next(); + + //then + assertEquals(RESULT_FROM_NEW_CURSOR, next); + verify(timeoutContext, times(1)).resetTimeout(); + verify(commandBatchCursor, times(1)).next(); + verify(commandBatchCursor, atLeastOnce()).getPostBatchResumeToken(); + verifyNoMoreInteractions(commandBatchCursor); + verify(changeStreamOperation, times(1)).getDecoder(); + verifyNoMoreInteractions(changeStreamOperation); + } + + @Test + @DisplayName("should throw timeout exception without resume attempt on next") + void shouldThrowTimeoutExceptionWithoutResumeAttemptOnNext() { + when(commandBatchCursor.next()).thenThrow(new MongoOperationTimeoutException("timeout")); + ChangeStreamBatchCursor cursor = createChangeStreamCursor(); + //when + assertThrows(MongoOperationTimeoutException.class, cursor::next); + + //then + verify(timeoutContext, times(1)).resetTimeout(); + verify(commandBatchCursor, times(1)).next(); + verify(commandBatchCursor, atLeastOnce()).getPostBatchResumeToken(); + verifyNoMoreInteractions(commandBatchCursor); + verifyNoResumeAttemptCalled(); + } + + @Test + @DisplayName("should perform resume attempt on next when resumable error is thrown") + void shouldPerformResumeAttemptOnNextWhenResumableErrorIsThrown() { + when(commandBatchCursor.next()).thenThrow(new MongoNotPrimaryException(new BsonDocument(), new ServerAddress())); + ChangeStreamBatchCursor cursor = createChangeStreamCursor(); + //when + List next = cursor.next(); + + //then + assertEquals(RESULT_FROM_NEW_CURSOR, next); + verify(timeoutContext, times(1)).resetTimeout(); + verify(commandBatchCursor, times(1)).next(); + verify(commandBatchCursor, atLeastOnce()).getPostBatchResumeToken(); + verifyResumeAttemptCalled(); + verify(changeStreamOperation, times(1)).getDecoder(); + verify(newCommandBatchCursor, times(1)).next(); + verify(newCommandBatchCursor, atLeastOnce()).getPostBatchResumeToken(); + verifyNoMoreInteractions(newCommandBatchCursor); + verifyNoMoreInteractions(changeStreamOperation); + } + + + @Test + @DisplayName("should resume only once on subsequent calls after timeout error") + void shouldResumeOnlyOnceOnSubsequentCallsAfterTimeoutError() { + when(commandBatchCursor.next()).thenThrow(new MongoOperationTimeoutException("timeout")); + ChangeStreamBatchCursor cursor = createChangeStreamCursor(); + //when + assertThrows(MongoOperationTimeoutException.class, cursor::next); + + //then + verify(timeoutContext, times(1)).resetTimeout(); + verify(commandBatchCursor, times(1)).next(); + verify(commandBatchCursor, atLeastOnce()).getPostBatchResumeToken(); + verifyNoMoreInteractions(commandBatchCursor); + verifyNoResumeAttemptCalled(); + clearInvocations(commandBatchCursor, newCommandBatchCursor, timeoutContext, changeStreamOperation, readBinding); + + //when seconds next is called. Resume is attempted. + List next = cursor.next(); + + //then + assertEquals(Collections.emptyList(), next); + verify(timeoutContext, times(1)).resetTimeout(); + verify(commandBatchCursor, times(1)).close(); + verifyNoMoreInteractions(commandBatchCursor); + verify(changeStreamOperation).setChangeStreamOptionsForResume(resumeToken, maxWireVersion); + verify(changeStreamOperation, times(1)).getDecoder(); + verify(changeStreamOperation, times(1)).execute(readBinding); + verifyNoMoreInteractions(changeStreamOperation); + verify(newCommandBatchCursor, times(1)).next(); + verify(newCommandBatchCursor, atLeastOnce()).getPostBatchResumeToken(); + clearInvocations(commandBatchCursor, newCommandBatchCursor, timeoutContext, changeStreamOperation, readBinding); + + //when third next is called. No resume is attempted. + List next2 = cursor.next(); + + //then + assertEquals(Collections.emptyList(), next2); + verifyNoInteractions(commandBatchCursor); + verify(timeoutContext, times(1)).resetTimeout(); + verify(newCommandBatchCursor, times(1)).next(); + verify(newCommandBatchCursor, atLeastOnce()).getPostBatchResumeToken(); + verifyNoMoreInteractions(newCommandBatchCursor); + verify(changeStreamOperation, times(1)).getDecoder(); + verifyNoMoreInteractions(changeStreamOperation); + verifyNoInteractions(readBinding); + verifyNoMoreInteractions(changeStreamOperation); + } + + @Test + @DisplayName("should propagate any errors occurred in aggregate operation during creating new change stream when previous next timed out") + void shouldPropagateAnyErrorsOccurredInAggregateOperation() { + when(commandBatchCursor.next()).thenThrow(new MongoOperationTimeoutException("timeout")); + MongoNotPrimaryException resumableError = new MongoNotPrimaryException(new BsonDocument(), new ServerAddress()); + when(changeStreamOperation.execute(readBinding)).thenThrow(resumableError); + + ChangeStreamBatchCursor cursor = createChangeStreamCursor(); + //when + assertThrows(MongoOperationTimeoutException.class, cursor::next); + clearInvocations(commandBatchCursor, newCommandBatchCursor, timeoutContext, changeStreamOperation, readBinding); + assertThrows(MongoNotPrimaryException.class, cursor::next); + + //then + verify(timeoutContext, times(1)).resetTimeout(); + verifyResumeAttemptCalled(); + verifyNoMoreInteractions(changeStreamOperation); + verifyNoInteractions(newCommandBatchCursor); + } + + + @Test + @DisplayName("should perform a resume attempt in subsequent next call when previous resume attempt in next timed out") + void shouldResumeAfterTimeoutInAggregateOnNextCall() { + //given + ChangeStreamBatchCursor cursor = createChangeStreamCursor(); + + //first next operation times out on getMore + when(commandBatchCursor.next()).thenThrow(new MongoOperationTimeoutException("timeout during next call")); + assertThrows(MongoOperationTimeoutException.class, cursor::next); + clearInvocations(commandBatchCursor, newCommandBatchCursor, timeoutContext, changeStreamOperation, readBinding); + + //second next operation times out on resume attempt when creating change stream + when(changeStreamOperation.execute(readBinding)).thenThrow(new MongoOperationTimeoutException("timeout during resumption")); + assertThrows(MongoOperationTimeoutException.class, cursor::next); + clearInvocations(commandBatchCursor, newCommandBatchCursor, timeoutContext, changeStreamOperation); + + doReturn(newChangeStreamCursor).when(changeStreamOperation).execute(readBinding); + + //when third operation succeeds to resume and call next + List next = cursor.next(); + + //then + assertEquals(RESULT_FROM_NEW_CURSOR, next); + verify(timeoutContext, times(1)).resetTimeout(); + + verifyResumeAttemptCalled(); + verify(changeStreamOperation, times(1)).getDecoder(); + verifyNoMoreInteractions(changeStreamOperation); + + verify(newCommandBatchCursor, times(1)).next(); + verify(newCommandBatchCursor, atLeastOnce()).getPostBatchResumeToken(); + verifyNoMoreInteractions(newCommandBatchCursor); + } + + @Test + @DisplayName("should close change stream when resume operation fails due to non-timeout error") + void shouldCloseChangeStreamWhenResumeOperationFailsDueToNonTimeoutError() { + //given + ChangeStreamBatchCursor cursor = createChangeStreamCursor(); + + //first next operation times out on getMore + when(commandBatchCursor.next()).thenThrow(new MongoOperationTimeoutException("timeout during next call")); + assertThrows(MongoOperationTimeoutException.class, cursor::next); + clearInvocations(commandBatchCursor, newCommandBatchCursor, timeoutContext, changeStreamOperation, readBinding); + + //when second next operation errors on resume attempt when creating change stream + when(changeStreamOperation.execute(readBinding)).thenThrow(new MongoNotPrimaryException(new BsonDocument(), new ServerAddress())); + assertThrows(MongoNotPrimaryException.class, cursor::next); + + //then + verify(timeoutContext, times(1)).resetTimeout(); + verifyResumeAttemptCalled(); + verifyNoMoreInteractions(changeStreamOperation); + verifyNoInteractions(newCommandBatchCursor); + clearInvocations(commandBatchCursor, newCommandBatchCursor, timeoutContext, changeStreamOperation, readBinding); + + + //when third next operation errors with cursor closed exception + doThrow(new IllegalStateException(MESSAGE_IF_CLOSED_AS_CURSOR)).when(commandBatchCursor).next(); + MongoException mongoException = assertThrows(MongoException.class, cursor::next); + + //then + assertEquals(MESSAGE_IF_CLOSED_AS_CURSOR, mongoException.getMessage()); + verify(timeoutContext, times(1)).resetTimeout(); + verifyNoResumeAttemptCalled(); + } + + private ChangeStreamBatchCursor createChangeStreamCursor() { + ChangeStreamBatchCursor cursor = + new ChangeStreamBatchCursor<>(changeStreamOperation, commandBatchCursor, readBinding, null, maxWireVersion); + clearInvocations(commandBatchCursor, newCommandBatchCursor, timeoutContext, changeStreamOperation, readBinding); + return cursor; + } + + private void verifyNoResumeAttemptCalled() { + verifyNoInteractions(changeStreamOperation); + verifyNoInteractions(newCommandBatchCursor); + verifyNoInteractions(readBinding); + } + + + private void verifyResumeAttemptCalled() { + verify(commandBatchCursor, times(1)).close(); + verify(changeStreamOperation).setChangeStreamOptionsForResume(resumeToken, maxWireVersion); + verify(changeStreamOperation, times(1)).execute(readBinding); + verifyNoMoreInteractions(commandBatchCursor); + } + + @BeforeEach + void setUp() { + resumeToken = new BsonDocument("_id", new BsonInt32(1)); + serverDescription = mock(ServerDescription.class); + when(serverDescription.getMaxWireVersion()).thenReturn(maxWireVersion); + + timeoutContext = mock(TimeoutContext.class); + when(timeoutContext.hasTimeoutMS()).thenReturn(true); + doNothing().when(timeoutContext).resetTimeout(); + + operationContext = mock(OperationContext.class); + when(operationContext.getTimeoutContext()).thenReturn(timeoutContext); + connection = mock(Connection.class); + when(connection.command(any(), any(), any(), any(), any(), any())).thenReturn(null); + connectionSource = mock(ConnectionSource.class); + when(connectionSource.getConnection()).thenReturn(connection); + when(connectionSource.release()).thenReturn(1); + when(connectionSource.getServerDescription()).thenReturn(serverDescription); + + readBinding = mock(ReadBinding.class); + when(readBinding.getOperationContext()).thenReturn(operationContext); + when(readBinding.retain()).thenReturn(readBinding); + when(readBinding.release()).thenReturn(1); + when(readBinding.getReadConnectionSource()).thenReturn(connectionSource); + + + commandBatchCursor = mock(CommandBatchCursor.class); + when(commandBatchCursor.getPostBatchResumeToken()).thenReturn(resumeToken); + doNothing().when(commandBatchCursor).close(); + + newCommandBatchCursor = mock(CommandBatchCursor.class); + when(newCommandBatchCursor.getPostBatchResumeToken()).thenReturn(resumeToken); + when(newCommandBatchCursor.next()).thenReturn(RESULT_FROM_NEW_CURSOR); + doNothing().when(newCommandBatchCursor).close(); + + newChangeStreamCursor = mock(ChangeStreamBatchCursor.class); + when(newChangeStreamCursor.getWrapped()).thenReturn(newCommandBatchCursor); + + changeStreamOperation = mock(ChangeStreamOperation.class); + when(changeStreamOperation.getDecoder()).thenReturn(new DocumentCodec()); + doNothing().when(changeStreamOperation).setChangeStreamOptionsForResume(resumeToken, maxWireVersion); + when(changeStreamOperation.execute(readBinding)).thenReturn(newChangeStreamCursor); + } + +} diff --git a/driver-kotlin-sync/src/main/kotlin/com/mongodb/kotlin/client/MongoCursor.kt b/driver-kotlin-sync/src/main/kotlin/com/mongodb/kotlin/client/MongoCursor.kt index 5c757bf5e65..32f25fe0fd2 100644 --- a/driver-kotlin-sync/src/main/kotlin/com/mongodb/kotlin/client/MongoCursor.kt +++ b/driver-kotlin-sync/src/main/kotlin/com/mongodb/kotlin/client/MongoCursor.kt @@ -76,6 +76,14 @@ public sealed interface MongoCursor : Iterator, Closeable { * } * ``` * + * A [com.mongodb.MongoOperationTimeoutException] does not invalidate the [MongoChangeStreamCursor], but is immediately + * propagated to the caller. Subsequent method calls will attempt to resume operation by establishing a new change + * stream on the server, without performing a `getMore` request first. + * + * If a [com.mongodb.MongoOperationTimeoutException] occurs before any events are received, it indicates that the server + * has timed out before it could finish processing the existing oplog. In such cases, it is recommended to close the + * current stream and recreate it with a higher timeout setting. + * * @param T The type of documents the cursor contains */ public sealed interface MongoChangeStreamCursor : MongoCursor { diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/AggregatePublisherImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/AggregatePublisherImpl.java index 73ae8a05a6f..26aabdbe6b2 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/AggregatePublisherImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/AggregatePublisherImpl.java @@ -94,8 +94,7 @@ public AggregatePublisher maxTime(final long maxTime, final TimeUnit timeUnit @Override public AggregatePublisher maxAwaitTime(final long maxAwaitTime, final TimeUnit timeUnit) { - notNull("timeUnit", timeUnit); - this.maxAwaitTimeMS = TimeUnit.MILLISECONDS.convert(maxAwaitTime, timeUnit); + this.maxAwaitTimeMS = validateMaxAwaitTime(maxAwaitTime, timeUnit); return this; } diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/BatchCursorPublisher.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/BatchCursorPublisher.java index a45e691b72e..cf5a9d9f25b 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/BatchCursorPublisher.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/BatchCursorPublisher.java @@ -31,10 +31,12 @@ import org.reactivestreams.Subscriber; import reactor.core.publisher.Mono; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Supplier; import static com.mongodb.assertions.Assertions.assertNotNull; +import static com.mongodb.assertions.Assertions.isTrueArgument; import static com.mongodb.assertions.Assertions.notNull; @VisibleForTesting(otherwise = VisibleForTesting.AccessModifier.PROTECTED) @@ -151,4 +153,15 @@ Mono> batchCursor(final Supplier maxAwaitTimeMS); + + return maxAwaitTimeMS; + } } diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ChangeStreamPublisherImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ChangeStreamPublisherImpl.java index ab7ade7d354..8fc1a093aab 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ChangeStreamPublisherImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ChangeStreamPublisherImpl.java @@ -41,7 +41,6 @@ import java.util.function.Function; import static com.mongodb.assertions.Assertions.notNull; -import static java.util.concurrent.TimeUnit.MILLISECONDS; final class ChangeStreamPublisherImpl extends BatchCursorPublisher> @@ -124,8 +123,7 @@ public ChangeStreamPublisher comment(@Nullable final BsonValue comment) { @Override public ChangeStreamPublisher maxAwaitTime(final long maxAwaitTime, final TimeUnit timeUnit) { - notNull("timeUnit", timeUnit); - this.maxAwaitTimeMS = MILLISECONDS.convert(maxAwaitTime, timeUnit); + this.maxAwaitTimeMS = validateMaxAwaitTime(maxAwaitTime, timeUnit); return this; } diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/FindPublisherImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/FindPublisherImpl.java index 069954d15a1..eeddef5449a 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/FindPublisherImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/FindPublisherImpl.java @@ -79,7 +79,7 @@ public FindPublisher maxTime(final long maxTime, final TimeUnit timeUnit) { @Override public FindPublisher maxAwaitTime(final long maxAwaitTime, final TimeUnit timeUnit) { - notNull("timeUnit", timeUnit); + validateMaxAwaitTime(maxAwaitTime, timeUnit); findOptions.maxAwaitTime(maxAwaitTime, timeUnit); return this; } diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java index 141c60c6233..8fcf38c787e 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java @@ -18,32 +18,54 @@ import com.mongodb.ClusterFixture; import com.mongodb.MongoClientSettings; +import com.mongodb.MongoCommandException; +import com.mongodb.MongoNamespace; import com.mongodb.MongoOperationTimeoutException; import com.mongodb.MongoSocketReadTimeoutException; +import com.mongodb.ReadPreference; +import com.mongodb.WriteConcern; import com.mongodb.client.AbstractClientSideOperationsTimeoutProseTest; +import com.mongodb.client.model.CreateCollectionOptions; +import com.mongodb.client.model.changestream.FullDocument; import com.mongodb.event.CommandFailedEvent; +import com.mongodb.event.CommandStartedEvent; import com.mongodb.reactivestreams.client.gridfs.GridFSBucket; import com.mongodb.reactivestreams.client.gridfs.GridFSBuckets; import com.mongodb.reactivestreams.client.syncadapter.SyncGridFSBucket; import com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient; +import org.bson.BsonDocument; +import org.bson.BsonTimestamp; +import org.bson.Document; import org.bson.types.ObjectId; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; import reactor.core.publisher.Hooks; +import reactor.test.StepVerifier; import java.nio.ByteBuffer; +import java.time.Duration; +import java.time.Instant; +import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; import static com.mongodb.ClusterFixture.TIMEOUT_DURATION; +import static com.mongodb.ClusterFixture.isDiscoverableReplicaSet; +import static com.mongodb.ClusterFixture.isServerlessTest; import static com.mongodb.ClusterFixture.serverVersionAtLeast; +import static com.mongodb.ClusterFixture.sleep; +import static java.util.Collections.singletonList; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assumptions.assumeFalse; import static org.junit.jupiter.api.Assumptions.assumeTrue; @@ -85,6 +107,7 @@ protected boolean isAsync() { @Tag("setsFailPoint") @DisplayName("6. GridFS Upload - uploads via openUploadStream can be timed out") @Test + @Override public void testGridFSUploadViaOpenUploadStreamTimeout() { assumeTrue(serverVersionAtLeast(4, 4)); long rtt = ClusterFixture.getPrimaryRTT(); @@ -141,6 +164,7 @@ public void testGridFSUploadViaOpenUploadStreamTimeout() { @Tag("setsFailPoint") @DisplayName("6. GridFS Upload - Aborting an upload stream can be timed out") @Test + @Override public void testAbortingGridFsUploadStreamTimeout() throws ExecutionException, InterruptedException, TimeoutException { assumeTrue(serverVersionAtLeast(4, 4)); long rtt = ClusterFixture.getPrimaryRTT(); @@ -196,4 +220,299 @@ public void testAbortingGridFsUploadStreamTimeout() throws ExecutionException, I testSubscriber.assertNoTerminalEvent(); } } + + /** + * Not a prose spec test. However, it is additional test case for better coverage. + */ + @Tag("setsFailPoint") + @DisplayName("TimeoutMS applies to full resume attempt in a next call") + @Test + public void testTimeoutMSAppliesToFullResumeAttemptInNextCall() { + assumeTrue(serverVersionAtLeast(4, 4)); + assumeTrue(isDiscoverableReplicaSet()); + assumeFalse(isServerlessTest()); + + //given + long rtt = ClusterFixture.getPrimaryRTT(); + try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder() + .timeout(rtt + 500, TimeUnit.MILLISECONDS))) { + + MongoNamespace namespace = generateNamespace(); + MongoCollection collection = client.getDatabase(namespace.getDatabaseName()) + .getCollection(namespace.getCollectionName()).withReadPreference(ReadPreference.primary()); + + collectionHelper.runAdminCommand("{" + + " configureFailPoint: \"failCommand\"," + + " mode: { times: 1}," + + " data: {" + + " failCommands: [\"getMore\" ]," + + " errorCode: 7," + + " errorLabels: [\"ResumableChangeStreamError\" ]" + + " }" + + "}"); + + //when + ChangeStreamPublisher documentChangeStreamPublisher = collection.watch( + singletonList(Document.parse("{ '$match': {'operationType': 'insert'}}"))); + + Assertions.assertThrows(MongoOperationTimeoutException.class, + () -> Flux.from(documentChangeStreamPublisher).blockFirst(TIMEOUT_DURATION)); + //then + sleep(200); //let publisher invalidate the cursor after the error. + List commandStartedEvents = commandListener.getCommandStartedEvents(); + + List expectedCommandNames = Arrays.asList("aggregate", "getMore", "killCursors", "aggregate", "getMore", "killCursors"); + assertCommandStartedEventsInOder(expectedCommandNames, commandStartedEvents); + + List commandFailedEvents = commandListener.getCommandFailedEvents(); + assertEquals(2, commandFailedEvents.size()); + + CommandFailedEvent firstGetMoreFailedEvent = commandFailedEvents.get(0); + assertEquals("getMore", firstGetMoreFailedEvent.getCommandName()); + assertInstanceOf(MongoCommandException.class, firstGetMoreFailedEvent.getThrowable()); + + CommandFailedEvent secondGetMoreFailedEvent = commandFailedEvents.get(1); + assertEquals("getMore", secondGetMoreFailedEvent.getCommandName()); + assertInstanceOf(MongoOperationTimeoutException.class, secondGetMoreFailedEvent.getThrowable()); + } + } + + /** + * Not a prose spec test. However, it is additional test case for better coverage. + */ + @Tag("setsFailPoint") + @DisplayName("TimeoutMS applied to initial aggregate") + @Test + public void testTimeoutMSAppliedToInitialAggregate() { + assumeTrue(serverVersionAtLeast(4, 4)); + assumeTrue(isDiscoverableReplicaSet()); + assumeFalse(isServerlessTest()); + + //given + long rtt = ClusterFixture.getPrimaryRTT(); + try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder() + .timeout(rtt + 200, TimeUnit.MILLISECONDS))) { + + MongoNamespace namespace = generateNamespace(); + MongoCollection collection = client.getDatabase(namespace.getDatabaseName()) + .getCollection(namespace.getCollectionName()).withReadPreference(ReadPreference.primary()); + ChangeStreamPublisher documentChangeStreamPublisher = collection.watch( + singletonList(Document.parse("{ '$match': {'operationType': 'insert'}}"))) + .fullDocument(FullDocument.UPDATE_LOOKUP); + + collectionHelper.runAdminCommand("{" + + " configureFailPoint: \"failCommand\"," + + " mode: { times: 1}," + + " data: {" + + " failCommands: [\"aggregate\" ]," + + " blockConnection: true," + + " blockTimeMS: " + (rtt + 201) + + " }" + + "}"); + + //when + Assertions.assertThrows(MongoOperationTimeoutException.class, + () -> Flux.from(documentChangeStreamPublisher).blockFirst(TIMEOUT_DURATION)); + + //We do not expect cursor to have been created. However, publisher closes cursor asynchronously, thus we give it some time + // to make sure that cursor has not been closed (which would indicate that it was created). + sleep(200); + + //then + List commandStartedEvents = commandListener.getCommandStartedEvents(); + assertEquals(1, commandStartedEvents.size()); + assertEquals("aggregate", commandStartedEvents.get(0).getCommandName()); + assertOnlyOneCommandTimeoutFailure("aggregate"); + } + } + + /** + * Not a prose spec test. However, it is additional test case for better coverage. + */ + @Tag("setsFailPoint") + @DisplayName("TimeoutMS is refreshed for getMore if maxAwaitTimeMS is not set") + @Test + public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsNotSet() { + assumeTrue(serverVersionAtLeast(4, 4)); + assumeTrue(isDiscoverableReplicaSet()); + assumeFalse(isServerlessTest()); + + //given + BsonTimestamp startTime = new BsonTimestamp((int) Instant.now().getEpochSecond(), 0); + collectionHelper.create(namespace.getCollectionName(), new CreateCollectionOptions()); + sleep(2000); + + + long rtt = ClusterFixture.getPrimaryRTT(); + try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder() + .timeout(rtt + 300, TimeUnit.MILLISECONDS))) { + + MongoCollection collection = client.getDatabase(namespace.getDatabaseName()) + .getCollection(namespace.getCollectionName()).withReadPreference(ReadPreference.primary()); + + collectionHelper.runAdminCommand("{" + + " configureFailPoint: \"failCommand\"," + + " mode: { times: 3}," + + " data: {" + + " failCommands: [\"getMore\", \"aggregate\"]," + + " blockConnection: true," + + " blockTimeMS: " + (rtt + 200) + + " }" + + "}"); + + collectionHelper.insertDocuments(WriteConcern.MAJORITY, + BsonDocument.parse("{x: 1}"), + BsonDocument.parse("{x: 2}"), + + BsonDocument.parse("{x: 3}"), + BsonDocument.parse("{x: 4}"), + + BsonDocument.parse("{x: 5}"), + BsonDocument.parse("{x: 6}")); + + //when + ChangeStreamPublisher documentChangeStreamPublisher = collection.watch() + .startAtOperationTime(startTime); + StepVerifier.create(documentChangeStreamPublisher, 2) + //then + .expectNextCount(2) + .thenAwait(Duration.ofMillis(300)) + .thenRequest(2) + .expectNextCount(2) + .thenAwait(Duration.ofMillis(300)) + .thenRequest(2) + .expectNextCount(2) + .thenAwait(Duration.ofMillis(300)) + .thenRequest(2) + .expectError(MongoOperationTimeoutException.class) + .verify(); + + sleep(500); //let publisher invalidate the cursor after the error. + + List commandStartedEvents = commandListener.getCommandStartedEvents(); + List expectedCommandNames = Arrays.asList("aggregate", "getMore", "getMore", "getMore", "killCursors"); + assertCommandStartedEventsInOder(expectedCommandNames, commandStartedEvents); + assertOnlyOneCommandTimeoutFailure("getMore"); + } + } + + /** + * Not a prose spec test. However, it is additional test case for better coverage. + */ + @Tag("setsFailPoint") + @DisplayName("TimeoutMS is refreshed for getMore if maxAwaitTimeMS is set") + @Test + public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsSet() { + assumeTrue(serverVersionAtLeast(4, 4)); + assumeTrue(isDiscoverableReplicaSet()); + assumeFalse(isServerlessTest()); + + //given + BsonTimestamp startTime = new BsonTimestamp((int) Instant.now().getEpochSecond(), 0); + collectionHelper.create(namespace.getCollectionName(), new CreateCollectionOptions()); + sleep(2000); + + long rtt = ClusterFixture.getPrimaryRTT(); + try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder() + .timeout(rtt + 300, TimeUnit.MILLISECONDS))) { + + MongoCollection collection = client.getDatabase(namespace.getDatabaseName()) + .getCollection(namespace.getCollectionName()).withReadPreference(ReadPreference.primary()); + + collectionHelper.runAdminCommand("{" + + " configureFailPoint: \"failCommand\"," + + " mode: { times: 2}," + + " data: {" + + " failCommands: [\"aggregate\", \"getMore\"]," + + " blockConnection: true," + + " blockTimeMS: " + (rtt + 200) + + " }" + + "}"); + + + collectionHelper.insertDocuments(WriteConcern.MAJORITY, + BsonDocument.parse("{x: 1}"), + BsonDocument.parse("{x: 2}"), + + BsonDocument.parse("{x: 3}"), + BsonDocument.parse("{x: 4}")); + + //when + ChangeStreamPublisher documentChangeStreamPublisher = collection.watch() + .maxAwaitTime(1, TimeUnit.MILLISECONDS) + .startAtOperationTime(startTime); + StepVerifier.create(documentChangeStreamPublisher, 2) + //then + .expectNextCount(2) + .thenAwait(Duration.ofMillis(600)) + .thenRequest(2) + .thenCancel() + .verify(); + + sleep(500); //let publisher invalidate the cursor after the error. + + List commandStartedEvents = commandListener.getCommandStartedEvents(); + List expectedCommandNames = Arrays.asList("aggregate", "getMore", "killCursors"); + assertCommandStartedEventsInOder(expectedCommandNames, commandStartedEvents); + } + } + + /** + * Not a prose spec test. However, it is additional test case for better coverage. + */ + @Tag("setsFailPoint") + @DisplayName("TimeoutMS is honored for next operation when several getMore executed internally") + @Test + public void testTimeoutMsISHonoredForNnextOperationWhenSeveralGetMoreExecutedInternally() { + assumeTrue(serverVersionAtLeast(4, 4)); + assumeTrue(isDiscoverableReplicaSet()); + assumeFalse(isServerlessTest()); + + //given + long rtt = ClusterFixture.getPrimaryRTT(); + try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder() + .timeout(rtt + 2500, TimeUnit.MILLISECONDS))) { + + MongoCollection collection = client.getDatabase(namespace.getDatabaseName()) + .getCollection(namespace.getCollectionName()).withReadPreference(ReadPreference.primary()); + + //when + ChangeStreamPublisher documentChangeStreamPublisher = collection.watch(); + StepVerifier.create(documentChangeStreamPublisher, 2) + //then + .expectError(MongoOperationTimeoutException.class) + .verify(); + + sleep(200); //let publisher invalidate the cursor after the error. + + List commandStartedEvents = commandListener.getCommandStartedEvents(); + assertCommandStartedEventsInOder(Arrays.asList("aggregate", "getMore", "getMore", "getMore", "killCursors"), + commandStartedEvents); + assertOnlyOneCommandTimeoutFailure("getMore"); + } + } + + private static void assertCommandStartedEventsInOder(final List expectedCommandNames, + final List commandStartedEvents) { + assertEquals(expectedCommandNames.size(), commandStartedEvents.size(), "Expected: " + expectedCommandNames + ". Actual: " + + commandStartedEvents.stream() + .map(CommandStartedEvent::getCommand) + .map(BsonDocument::toJson) + .collect(Collectors.toList())); + + for (int i = 0; i < expectedCommandNames.size(); i++) { + CommandStartedEvent commandStartedEvent = commandStartedEvents.get(i); + + assertEquals(expectedCommandNames.get(i), commandStartedEvent.getCommandName()); + } + } + + private void assertOnlyOneCommandTimeoutFailure(final String command) { + List commandFailedEvents = commandListener.getCommandFailedEvents(); + assertEquals(1, commandFailedEvents.size()); + + CommandFailedEvent failedAggregateCommandEvent = commandFailedEvents.get(0); + assertEquals(command, commandFailedEvents.get(0).getCommandName()); + assertInstanceOf(MongoOperationTimeoutException.class, failedAggregateCommandEvent.getThrowable()); + } } diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/ClientSideOperationTimeoutTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/ClientSideOperationTimeoutTest.java index d55c6a78baf..cdf21cadc8d 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/ClientSideOperationTimeoutTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/ClientSideOperationTimeoutTest.java @@ -52,8 +52,24 @@ public ClientSideOperationTimeoutTest(final String fileDescription, final String final BsonArray initialData, final BsonDocument definition) { super(schemaVersion, runOnRequirements, entities, initialData, definition); this.testDescription = testDescription; - // Time sensitive - cannot just create a cursor with publishers + + assumeFalse("No iterateOnce support. There is alternative prose test for it.", + testDescription.equals("timeoutMS is refreshed for getMore if maxAwaitTimeMS is not set")); + assumeFalse("No iterateOnce support. There is alternative prose test for it.", + testDescription.equals("timeoutMS is refreshed for getMore if maxAwaitTimeMS is set")); + /* + The Reactive Streams specification prevents us from allowing a subsequent next call (event in reactive terms) after a timeout error, + conflicting with the CSOT spec requirement not to invalidate the change stream and to try resuming and establishing a new change + stream on the server. We immediately let users know about a timeout error, which then closes the stream/publisher. + */ + assumeFalse("It is not possible due to a conflict with the Reactive Streams specification .", + testDescription.equals("change stream can be iterated again if previous iteration times out")); + assumeFalse("Flaky and racy due to asynchronous behaviour. There is alternative prose test for it.", + testDescription.equals("timeoutMS applies to full resume attempt in a next call")); + assumeFalse("No way to catch an error on BarchCursor creation. There is alternative prose test for it.", + testDescription.equals("timeoutMS applied to initial aggregate")); + assumeFalse(testDescription.endsWith("createChangeStream on client")); assumeFalse(testDescription.endsWith("createChangeStream on database")); assumeFalse(testDescription.endsWith("createChangeStream on collection")); diff --git a/driver-sync/src/main/com/mongodb/client/MongoChangeStreamCursor.java b/driver-sync/src/main/com/mongodb/client/MongoChangeStreamCursor.java index 38e33c8ae8e..ed58412496d 100644 --- a/driver-sync/src/main/com/mongodb/client/MongoChangeStreamCursor.java +++ b/driver-sync/src/main/com/mongodb/client/MongoChangeStreamCursor.java @@ -33,6 +33,16 @@ * } * } * + * + *

+ * A {@link com.mongodb.MongoOperationTimeoutException} does not invalidate the {@link MongoChangeStreamCursor}, but is immediately + * propagated to the caller. Subsequent method call will attempt to resume operation by establishing a new change stream on the server, + * without doing {@code getMore} request first.

+ *

+ * If a {@link com.mongodb.MongoOperationTimeoutException} occurs before any events are received, it indicates that the server + * has timed out before it could finish processing the existing oplog. In such cases, it is recommended to close the current stream + * and recreate it with a higher timeout setting. + * * @since 3.11 * @param The type of documents the cursor contains */ diff --git a/driver-sync/src/main/com/mongodb/client/internal/AggregateIterableImpl.java b/driver-sync/src/main/com/mongodb/client/internal/AggregateIterableImpl.java index ad052738948..4feb06fa9b9 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/AggregateIterableImpl.java +++ b/driver-sync/src/main/com/mongodb/client/internal/AggregateIterableImpl.java @@ -132,8 +132,7 @@ public AggregateIterable maxTime(final long maxTime, final TimeUnit tim @Override public AggregateIterable maxAwaitTime(final long maxAwaitTime, final TimeUnit timeUnit) { - notNull("timeUnit", timeUnit); - this.maxAwaitTimeMS = TimeUnit.MILLISECONDS.convert(maxAwaitTime, timeUnit); + this.maxAwaitTimeMS = validateMaxAwaitTime(maxAwaitTime, timeUnit); return this; } diff --git a/driver-sync/src/main/com/mongodb/client/internal/ChangeStreamIterableImpl.java b/driver-sync/src/main/com/mongodb/client/internal/ChangeStreamIterableImpl.java index 3a24da53984..4b7b3865569 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/ChangeStreamIterableImpl.java +++ b/driver-sync/src/main/com/mongodb/client/internal/ChangeStreamIterableImpl.java @@ -48,7 +48,6 @@ import java.util.concurrent.TimeUnit; import static com.mongodb.assertions.Assertions.notNull; -import static java.util.concurrent.TimeUnit.MILLISECONDS; /** *

This class is not part of the public API and may be removed or changed at any time

@@ -116,8 +115,7 @@ public ChangeStreamIterable batchSize(final int batchSize) { @Override public ChangeStreamIterable maxAwaitTime(final long maxAwaitTime, final TimeUnit timeUnit) { - notNull("timeUnit", timeUnit); - this.maxAwaitTimeMS = MILLISECONDS.convert(maxAwaitTime, timeUnit); + this.maxAwaitTimeMS = validateMaxAwaitTime(maxAwaitTime, timeUnit); return this; } diff --git a/driver-sync/src/main/com/mongodb/client/internal/FindIterableImpl.java b/driver-sync/src/main/com/mongodb/client/internal/FindIterableImpl.java index 2d6fb54b333..df8b17866ac 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/FindIterableImpl.java +++ b/driver-sync/src/main/com/mongodb/client/internal/FindIterableImpl.java @@ -90,7 +90,7 @@ public FindIterable maxTime(final long maxTime, final TimeUnit timeUnit @Override public FindIterable maxAwaitTime(final long maxAwaitTime, final TimeUnit timeUnit) { - notNull("timeUnit", timeUnit); + validateMaxAwaitTime(maxAwaitTime, timeUnit); findOptions.maxAwaitTime(maxAwaitTime, timeUnit); return this; } diff --git a/driver-sync/src/main/com/mongodb/client/internal/MongoCollectionImpl.java b/driver-sync/src/main/com/mongodb/client/internal/MongoCollectionImpl.java index ce7f517545d..292537231cb 100755 --- a/driver-sync/src/main/com/mongodb/client/internal/MongoCollectionImpl.java +++ b/driver-sync/src/main/com/mongodb/client/internal/MongoCollectionImpl.java @@ -199,7 +199,7 @@ public MongoCollection withReadConcern(final ReadConcern readConcern) public MongoCollection withTimeout(final long timeout, final TimeUnit timeUnit) { isTrueArgument("timeout >= 0", timeout >= 0); notNull("timeUnit", timeUnit); - TimeoutSettings newTimeoutSettings = timeoutSettings.withTimeoutMS(timeUnit.convert(timeout, TimeUnit.MILLISECONDS)); + TimeoutSettings newTimeoutSettings = timeoutSettings.withTimeoutMS(TimeUnit.MILLISECONDS.convert(timeout, timeUnit)); return new MongoCollectionImpl<>(namespace, documentClass, codecRegistry, readPreference, writeConcern, retryWrites, retryReads, readConcern, uuidRepresentation, autoEncryptionSettings, newTimeoutSettings, executor); } diff --git a/driver-sync/src/main/com/mongodb/client/internal/MongoDatabaseImpl.java b/driver-sync/src/main/com/mongodb/client/internal/MongoDatabaseImpl.java index 80b91796ca0..21c73a2bde0 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/MongoDatabaseImpl.java +++ b/driver-sync/src/main/com/mongodb/client/internal/MongoDatabaseImpl.java @@ -152,7 +152,7 @@ public MongoDatabase withReadConcern(final ReadConcern readConcern) { public MongoDatabase withTimeout(final long timeout, final TimeUnit timeUnit) { isTrueArgument("timeout >= 0", timeout >= 0); notNull("timeUnit", timeUnit); - TimeoutSettings newTimeoutSettings = timeoutSettings.withTimeoutMS(timeUnit.convert(timeout, TimeUnit.MILLISECONDS)); + TimeoutSettings newTimeoutSettings = timeoutSettings.withTimeoutMS(TimeUnit.MILLISECONDS.convert(timeout, timeUnit)); return new MongoDatabaseImpl(name, codecRegistry, readPreference, writeConcern, retryWrites, retryReads, readConcern, uuidRepresentation, autoEncryptionSettings, newTimeoutSettings, executor); } diff --git a/driver-sync/src/main/com/mongodb/client/internal/MongoIterableImpl.java b/driver-sync/src/main/com/mongodb/client/internal/MongoIterableImpl.java index 0e1a46a8dd7..d4b948c07a1 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/MongoIterableImpl.java +++ b/driver-sync/src/main/com/mongodb/client/internal/MongoIterableImpl.java @@ -29,8 +29,10 @@ import com.mongodb.lang.Nullable; import java.util.Collection; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import static com.mongodb.assertions.Assertions.isTrueArgument; import static com.mongodb.assertions.Assertions.notNull; /** @@ -153,4 +155,16 @@ public > A into(final A target) { private BatchCursor execute() { return getExecutor().execute(asReadOperation(), readPreference, readConcern, clientSession); } + + + protected long validateMaxAwaitTime(final long maxAwaitTime, final TimeUnit timeUnit) { + notNull("timeUnit", timeUnit); + Long timeoutMS = timeoutSettings.getTimeoutMS(); + long maxAwaitTimeMS = TimeUnit.MILLISECONDS.convert(maxAwaitTime, timeUnit); + + isTrueArgument("maxAwaitTimeMS must be less than timeoutMS", timeoutMS == null || timeoutMS == 0 + || timeoutMS > maxAwaitTimeMS); + + return maxAwaitTimeMS; + } } diff --git a/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java index 69784b4bb6d..43b4897a318 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java @@ -96,7 +96,7 @@ public abstract class AbstractClientSideOperationsTimeoutProseTest { protected static final String GRID_FS_BUCKET_NAME = "db.fs"; private static final AtomicInteger COUNTER = new AtomicInteger(); - private MongoNamespace namespace; + protected MongoNamespace namespace; protected MongoNamespace gridFsFileNamespace; protected MongoNamespace gridFsChunksNamespace; @@ -275,6 +275,7 @@ public void testBlockingIterationMethodsChangeStream() { @Tag("setsFailPoint") @DisplayName("6. GridFS Upload - uploads via openUploadStream can be timed out") + @Test public void testGridFSUploadViaOpenUploadStreamTimeout() { assumeTrue(serverVersionAtLeast(4, 4)); long rtt = ClusterFixture.getPrimaryRTT(); @@ -475,7 +476,7 @@ private static Stream test8ServerSelectionHandshakeArguments() { ); } - private MongoNamespace generateNamespace() { + protected MongoNamespace generateNamespace() { return new MongoNamespace(getDefaultDatabaseName(), getClass().getSimpleName() + "_" + COUNTER.incrementAndGet()); } diff --git a/driver-sync/src/test/functional/com/mongodb/client/ClientSideOperationTimeoutTest.java b/driver-sync/src/test/functional/com/mongodb/client/ClientSideOperationTimeoutTest.java index 2091bbe975f..ddfd80ef1ec 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/ClientSideOperationTimeoutTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/ClientSideOperationTimeoutTest.java @@ -79,13 +79,11 @@ public static void checkSkipCSOTTest(final String fileDescription, final String || testDescription.contains("runCommand on database")); assumeFalse("No count command helper", testDescription.endsWith("count on collection")); assumeFalse("No operation based overrides", fileDescription.equals("timeoutMS can be overridden for an operation")); - assumeFalse("No iterateOnce support", testDescription.equals("timeoutMS is refreshed for getMore if maxAwaitTimeMS is not set")); assumeFalse("TODO (CSOT) - JAVA-5259 No client.withTimeout", testDescription.endsWith("on client")); checkTransactionSessionSupport(fileDescription, testDescription); - assumeFalse("TODO (CSOT) - JAVA-4054", fileDescription.equals("timeoutMS behaves correctly for change streams")); assumeFalse("TODO (CSOT) - JAVA-4052", fileDescription.startsWith("timeoutMS behaves correctly for retryable operations")); assumeFalse("TODO (CSOT) - JAVA-4052", fileDescription.startsWith("legacy timeouts behave correctly for retryable operations")); diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedCrudHelper.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedCrudHelper.java index 33e22167163..a017e350157 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedCrudHelper.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedCrudHelper.java @@ -1556,34 +1556,33 @@ public OperationResult createChangeStreamCursor(final BsonDocument operation) { throw new UnsupportedOperationException("No entity found for id: " + entityName); } - for (Map.Entry cur : arguments.entrySet()) { - switch (cur.getKey()) { - case "batchSize": - iterable.batchSize(cur.getValue().asNumber().intValue()); - break; - case "pipeline": - break; - case "comment": - iterable.comment(cur.getValue()); - break; - case "fullDocument": - iterable.fullDocument(FullDocument.fromString(cur.getValue().asString().getValue())); - break; - case "fullDocumentBeforeChange": - iterable.fullDocumentBeforeChange(FullDocumentBeforeChange.fromString(cur.getValue().asString().getValue())); - break; - case "maxAwaitTimeMS": - iterable.maxAwaitTime(cur.getValue().asNumber().longValue(), TimeUnit.MILLISECONDS); - break; - case "showExpandedEvents": - iterable.showExpandedEvents(cur.getValue().asBoolean().getValue()); - break; - default: - throw new UnsupportedOperationException("Unsupported argument: " + cur.getKey()); - } - } - return resultOf(() -> { + for (Map.Entry cur : arguments.entrySet()) { + switch (cur.getKey()) { + case "batchSize": + iterable.batchSize(cur.getValue().asNumber().intValue()); + break; + case "pipeline": + break; + case "comment": + iterable.comment(cur.getValue()); + break; + case "fullDocument": + iterable.fullDocument(FullDocument.fromString(cur.getValue().asString().getValue())); + break; + case "fullDocumentBeforeChange": + iterable.fullDocumentBeforeChange(FullDocumentBeforeChange.fromString(cur.getValue().asString().getValue())); + break; + case "maxAwaitTimeMS": + iterable.maxAwaitTime(cur.getValue().asNumber().longValue(), TimeUnit.MILLISECONDS); + break; + case "showExpandedEvents": + iterable.showExpandedEvents(cur.getValue().asBoolean().getValue()); + break; + default: + throw new UnsupportedOperationException("Unsupported argument: " + cur.getKey()); + } + } MongoCursor changeStreamWrappingCursor = createChangeStreamWrappingCursor(iterable); entities.addCursor(operation.getString("saveResultAsEntity", new BsonString(createRandomEntityId())).getValue(), changeStreamWrappingCursor); @@ -1602,6 +1601,18 @@ public OperationResult executeIterateUntilDocumentOrError(final BsonDocument ope return resultOf(cursor::next); } + + public OperationResult executeIterateOnce(final BsonDocument operation) { + String id = operation.getString("object").getValue(); + MongoCursor cursor = entities.getCursor(id); + + if (operation.containsKey("arguments")) { + throw new UnsupportedOperationException("Unexpected arguments " + operation.get("arguments")); + } + + return resultOf(cursor::tryNext); + } + public OperationResult close(final BsonDocument operation) { String id = operation.getString("object").getValue(); diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java index 6827ac5e39b..f972d47805a 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java @@ -468,6 +468,8 @@ private OperationResult executeOperation(final UnifiedTestContext context, final return crudHelper.close(operation); case "iterateUntilDocumentOrError": return crudHelper.executeIterateUntilDocumentOrError(operation); + case "iterateOnce": + return crudHelper.executeIterateOnce(operation); case "delete": return gridFSHelper.executeDelete(operation); case "drop":