Skip to content

Add Client Operation Timeout to Change Streams #1321

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,8 @@ configure(javaCodeCheckedProjects) {
testImplementation platform('org.spockframework:spock-bom:2.1-groovy-3.0')
testImplementation 'org.spockframework:spock-core'
testImplementation 'org.spockframework:spock-junit4'
testImplementation("org.mockito:mockito-core:3.8.0")
testImplementation("org.mockito:mockito-inline:3.8.0")
testImplementation 'org.mockito:mockito-core:4.0.0'
testImplementation 'org.mockito:mockito-inline:4.0.0'
testImplementation 'cglib:cglib-nodep:2.2.2'
testImplementation 'org.objenesis:objenesis:1.3'
testImplementation 'org.hamcrest:hamcrest-all:1.3'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public TimeoutSettings getTimeoutSettings() {
}

public long getMaxAwaitTimeMS() {
return hasTimeoutMS() ? 0 : timeoutSettings.getMaxAwaitTimeMS();
Copy link
Member Author

@vbabanin vbabanin Mar 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In accordance with the CSOT specifications for the tailable-await cursor:

If the maxAwaitTimeMS option is set, drivers MUST use it as the maxTimeMS field on getMore commands.

The logic to determine whether maxAwaitTimeMS should be utilized for getMore is located in the AggregateOperationImpl and FindOperation.

return timeoutSettings.getMaxAwaitTimeMS();
}

public long getMaxTimeMS() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@

package com.mongodb.internal.operation;

import com.mongodb.CursorType;
import com.mongodb.MongoNamespace;
import com.mongodb.client.cursor.TimeoutMode;
import com.mongodb.client.model.Collation;
import com.mongodb.internal.TimeoutContext;
import com.mongodb.internal.TimeoutSettings;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.binding.AsyncReadBinding;
Expand Down Expand Up @@ -68,6 +71,7 @@ class AggregateOperationImpl<T> implements AsyncReadOperation<AsyncBatchCursor<T
private BsonValue hint;
private BsonDocument variables;
private TimeoutMode timeoutMode;
private CursorType cursorType;

AggregateOperationImpl(final MongoNamespace namespace,
final List<BsonDocument> pipeline, final Decoder<T> decoder, final AggregationLevel aggregationLevel) {
Expand Down Expand Up @@ -146,6 +150,19 @@ AggregateOperationImpl<T> retryReads(final boolean retryReads) {
return this;
}

/**
* When {@link TimeoutContext#hasTimeoutMS()} then {@link TimeoutSettings#getMaxAwaitTimeMS()} usage in {@code getMore} commands
* depends on the type of cursor. For {@link CursorType#TailableAwait} it is used, for others it is not.
* {@link CursorType#TailableAwait} is used mainly for change streams in {@link AggregateOperationImpl}.
*
* @param cursorType
* @return this
*/
AggregateOperationImpl<T> cursorType(final CursorType cursorType) {
this.cursorType = cursorType;
return this;
}

boolean getRetryReads() {
return retryReads;
}
Expand Down Expand Up @@ -221,13 +238,13 @@ BsonDocument getCommand(final OperationContext operationContext, final int maxWi
private CommandReadTransformer<BsonDocument, CommandBatchCursor<T>> transformer() {
return (result, source, connection) ->
new CommandBatchCursor<>(getTimeoutMode(), result, batchSize != null ? batchSize : 0,
getMaxTimeForCursor(source.getOperationContext()), decoder, comment, source, connection);
getMaxTimeForCursor(source.getOperationContext().getTimeoutContext()), decoder, comment, source, connection);
}

private CommandReadTransformerAsync<BsonDocument, AsyncBatchCursor<T>> asyncTransformer() {
return (result, source, connection) ->
new AsyncCommandBatchCursor<>(getTimeoutMode(), result, batchSize != null ? batchSize : 0,
getMaxTimeForCursor(source.getOperationContext()), decoder, comment, source, connection);
getMaxTimeForCursor(source.getOperationContext().getTimeoutContext()), decoder, comment, source, connection);
}

private TimeoutMode getTimeoutMode() {
Expand All @@ -238,8 +255,12 @@ private TimeoutMode getTimeoutMode() {
return localTimeoutMode;
}

private long getMaxTimeForCursor(final OperationContext operationContext) {
return operationContext.getTimeoutContext().getMaxAwaitTimeMS();
private long getMaxTimeForCursor(final TimeoutContext timeoutContext) {
long maxAwaitTimeMS = timeoutContext.getMaxAwaitTimeMS();
if (timeoutContext.hasTimeoutMS()){
return CursorType.TailableAwait == cursorType ? maxAwaitTimeMS : 0;
}
return maxAwaitTimeMS;
}

interface AggregateTarget {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.mongodb.internal.operation;

import com.mongodb.MongoException;
import com.mongodb.internal.TimeoutContext;
import com.mongodb.internal.async.AsyncAggregateResponseBatchCursor;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.async.SingleResultCallback;
Expand All @@ -42,6 +43,7 @@

final class AsyncChangeStreamBatchCursor<T> implements AsyncAggregateResponseBatchCursor<T> {
private final AsyncReadBinding binding;
private final TimeoutContext timeoutContext;
private final ChangeStreamOperation<T> changeStreamOperation;
private final int maxWireVersion;

Expand All @@ -63,6 +65,7 @@ final class AsyncChangeStreamBatchCursor<T> implements AsyncAggregateResponseBat
this.wrapped = new AtomicReference<>(assertNotNull(wrapped));
this.binding = binding;
binding.retain();
this.timeoutContext = binding.getOperationContext().getTimeoutContext();
this.resumeToken = resumeToken;
this.maxWireVersion = maxWireVersion;
isClosed = new AtomicBoolean();
Expand All @@ -80,6 +83,7 @@ public void next(final SingleResultCallback<List<T>> callback) {

@Override
public void close() {
resetTimeout();
if (isClosed.compareAndSet(false, true)) {
try {
nullifyAndCloseWrapped();
Expand Down Expand Up @@ -177,6 +181,7 @@ private interface AsyncBlock {
}

private void resumeableOperation(final AsyncBlock asyncBlock, final SingleResultCallback<List<T>> callback, final boolean tryNext) {
resetTimeout();
SingleResultCallback<List<T>> errHandlingCallback = errorHandlingCallback(callback, LOGGER);
if (isClosed()) {
errHandlingCallback.onResult(null, new MongoException(format("%s called after the cursor was closed.",
Expand Down Expand Up @@ -219,12 +224,12 @@ private void retryOperation(final AsyncBlock asyncBlock, final SingleResultCallb
changeStreamOperation.setChangeStreamOptionsForResume(resumeToken,
assertNotNull(source).getServerDescription().getMaxWireVersion());
source.release();
changeStreamOperation.executeAsync(binding, (result, t1) -> {
changeStreamOperation.executeAsync(binding, (asyncBatchCursor, t1) -> {
if (t1 != null) {
callback.onResult(null, t1);
} else {
try {
setWrappedOrCloseIt(assertNotNull((AsyncChangeStreamBatchCursor<T>) result).getWrapped());
setWrappedOrCloseIt(assertNotNull((AsyncChangeStreamBatchCursor<T>) asyncBatchCursor).getWrapped());
} finally {
try {
binding.release(); // release the new change stream batch cursor's reference to the binding
Expand All @@ -237,4 +242,10 @@ private void retryOperation(final AsyncBlock asyncBlock, final SingleResultCallb
}
});
}

private void resetTimeout() {
if (timeoutContext.hasTimeoutMS()) {
timeoutContext.resetTimeout();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,10 @@ private CommandCursorResult<T> toCommandCursorResult(final ServerAddress serverA
return commandCursorResult;
}

void setCloseWithoutTimeoutReset(final boolean closeWithoutTimeoutReset) {
this.resourceManager.setCloseWithoutTimeoutReset(closeWithoutTimeoutReset);
}

@ThreadSafe
private static final class ResourceManager extends CursorResourceManager<AsyncConnectionSource, AsyncConnection> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

import com.mongodb.MongoChangeStreamException;
import com.mongodb.MongoException;
import com.mongodb.MongoOperationTimeoutException;
import com.mongodb.ServerAddress;
import com.mongodb.ServerCursor;
import com.mongodb.internal.TimeoutContext;
import com.mongodb.internal.binding.ReadBinding;
import com.mongodb.lang.Nullable;
import org.bson.BsonDocument;
Expand All @@ -37,26 +39,50 @@
import static com.mongodb.internal.operation.ChangeStreamBatchCursorHelper.isResumableError;
import static com.mongodb.internal.operation.SyncOperationHelper.withReadConnectionSource;

/**
* A change stream cursor that wraps {@link CommandBatchCursor} with automatic resumption capabilities in the event
* of timeouts or transient errors.
* <p>
* Upon encountering a resumable error during {@code hasNext()}, {@code next()}, or {@code tryNext()} calls, the {@link ChangeStreamBatchCursor}
* attempts to establish a new change stream on the server.
* </p>
* If an error occurring during any of these method calls is not resumable, it is immediately propagated to the caller, and the {@link ChangeStreamBatchCursor}
* is closed and invalidated on the server. Server errors that occur during this invalidation process are not propagated to the caller.
* <p>
* A {@link MongoOperationTimeoutException} does not invalidate the {@link ChangeStreamBatchCursor}, but is immediately propagated to the caller.
* Subsequent method call will attempt to resume operation by establishing a new change stream on the server, without doing {@code getMore}
* request first.
* </p>
*/
final class ChangeStreamBatchCursor<T> implements AggregateResponseBatchCursor<T> {
private final ReadBinding binding;
private final ChangeStreamOperation<T> changeStreamOperation;
private final int maxWireVersion;

private final TimeoutContext timeoutContext;
private CommandBatchCursor<RawBsonDocument> wrapped;
private BsonDocument resumeToken;
private final AtomicBoolean closed;

/**
* This flag is used to manage change stream resumption logic after a timeout error.
* Indicates whether the last {@code hasNext()}, {@code next()}, or {@code tryNext()} call resulted in a {@link MongoOperationTimeoutException}.
* If {@code true}, indicates a timeout occurred, prompting an attempt to resume the change stream on the subsequent call.
*/
private boolean lastOperationTimedOut;

ChangeStreamBatchCursor(final ChangeStreamOperation<T> changeStreamOperation,
final CommandBatchCursor<RawBsonDocument> wrapped,
final ReadBinding binding,
@Nullable final BsonDocument resumeToken,
final int maxWireVersion) {
this.timeoutContext = binding.getOperationContext().getTimeoutContext();
this.changeStreamOperation = changeStreamOperation;
this.binding = binding.retain();
this.wrapped = wrapped;
this.resumeToken = resumeToken;
this.maxWireVersion = maxWireVersion;
closed = new AtomicBoolean();
lastOperationTimedOut = false;
}

CommandBatchCursor<RawBsonDocument> getWrapped() {
Expand Down Expand Up @@ -107,6 +133,7 @@ public List<T> tryNext() {
@Override
public void close() {
if (!closed.getAndSet(true)) {
resetTimeout();
wrapped.close();
binding.release();
}
Expand Down Expand Up @@ -184,22 +211,56 @@ static <T> List<T> convertAndProduceLastId(final List<RawBsonDocument> rawDocume
}

<R> R resumeableOperation(final Function<AggregateResponseBatchCursor<RawBsonDocument>, R> function) {
resetTimeout();
try {
R result = execute(function);
lastOperationTimedOut = false;
return result;
} catch (Throwable exception) {
lastOperationTimedOut = isTimeoutException(exception);
throw exception;
}
}

private <R> R execute(final Function<AggregateResponseBatchCursor<RawBsonDocument>, R> function) {
boolean shouldBeResumed = hasPreviousNextTimedOut();
while (true) {
if (shouldBeResumed) {
resumeChangeStream();
}
try {
return function.apply(wrapped);
} catch (Throwable t) {
if (!isResumableError(t, maxWireVersion)) {
throw MongoException.fromThrowableNonNull(t);
}
shouldBeResumed = true;
}
wrapped.close();
}
}

private void resumeChangeStream() {
wrapped.close();

withReadConnectionSource(binding, source -> {
changeStreamOperation.setChangeStreamOptionsForResume(resumeToken, source.getServerDescription().getMaxWireVersion());
return null;
});
wrapped = ((ChangeStreamBatchCursor<T>) changeStreamOperation.execute(binding)).getWrapped();
binding.release(); // release the new change stream batch cursor's reference to the binding
}

private boolean hasPreviousNextTimedOut() {
return lastOperationTimedOut && !closed.get();
}

withReadConnectionSource(binding, source -> {
changeStreamOperation.setChangeStreamOptionsForResume(resumeToken, source.getServerDescription().getMaxWireVersion());
return null;
});
wrapped = ((ChangeStreamBatchCursor<T>) changeStreamOperation.execute(binding)).getWrapped();
binding.release(); // release the new change stream batch cursor's reference to the binding
private void resetTimeout() {
if (timeoutContext.hasTimeoutMS()) {
timeoutContext.resetTimeout();
}
}

private static boolean isTimeoutException(final Throwable exception) {
return exception instanceof MongoOperationTimeoutException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@

package com.mongodb.internal.operation;

import com.mongodb.CursorType;
import com.mongodb.MongoNamespace;
import com.mongodb.client.model.Collation;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
import com.mongodb.internal.TimeoutContext;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.binding.AsyncReadBinding;
Expand All @@ -42,6 +44,7 @@

import static com.mongodb.assertions.Assertions.assertNotNull;
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.client.cursor.TimeoutMode.CURSOR_LIFETIME;

/**
* An operation that executes an {@code $changeStream} aggregation.
Expand Down Expand Up @@ -71,7 +74,7 @@ public ChangeStreamOperation(final MongoNamespace namespace, final FullDocument
final FullDocumentBeforeChange fullDocumentBeforeChange, final List<BsonDocument> pipeline, final Decoder<T> decoder,
final ChangeStreamLevel changeStreamLevel) {
this.wrapped = new AggregateOperationImpl<>(namespace, pipeline, RAW_BSON_DOCUMENT_CODEC, getAggregateTarget(),
getPipelineCreator());
getPipelineCreator()).cursorType(CursorType.TailableAwait);
this.fullDocument = notNull("fullDocument", fullDocument);
this.fullDocumentBeforeChange = notNull("fullDocumentBeforeChange", fullDocumentBeforeChange);
this.decoder = notNull("decoder", decoder);
Expand Down Expand Up @@ -167,21 +170,49 @@ public ChangeStreamOperation<T> showExpandedEvents(final boolean showExpandedEve
return this;
}

/**
* Gets an aggregate operation with consideration for timeout settings.
* <p>
* Change streams act similarly to tailable awaitData cursors, with identical timeoutMS option behavior.
* Key distinctions include:
* - The timeoutMS option must be applied at the start of the aggregate operation for change streams.
* - Change streams support resumption on next() calls. The driver handles automatic resumption for transient errors.
* <p>
*
* As a result, when {@code timeoutContext.hasTimeoutMS()} the CURSOR_LIFETIME setting is utilized to manage the underlying cursor's
* lifespan in change streams.
*
* @param timeoutContext
* @return An AggregateOperationImpl
*/
private AggregateOperationImpl<RawBsonDocument> getAggregateOperation(final TimeoutContext timeoutContext) {
if (timeoutContext.hasTimeoutMS()) {
return wrapped.timeoutMode(CURSOR_LIFETIME);
}
return wrapped;
}

@Override
public BatchCursor<T> execute(final ReadBinding binding) {
CommandBatchCursor<RawBsonDocument> cursor = (CommandBatchCursor<RawBsonDocument>) wrapped.execute(binding);
TimeoutContext timeoutContext = binding.getOperationContext().getTimeoutContext();
CommandBatchCursor<RawBsonDocument> cursor = (CommandBatchCursor<RawBsonDocument>) getAggregateOperation(timeoutContext).execute(binding);
cursor.setCloseWithoutTimeoutReset(true);

return new ChangeStreamBatchCursor<>(ChangeStreamOperation.this, cursor, binding,
setChangeStreamOptions(cursor.getPostBatchResumeToken(), cursor.getOperationTime(),
cursor.getMaxWireVersion(), cursor.isFirstBatchEmpty()), cursor.getMaxWireVersion());
}

@Override
public void executeAsync(final AsyncReadBinding binding, final SingleResultCallback<AsyncBatchCursor<T>> callback) {
wrapped.executeAsync(binding, (result, t) -> {
TimeoutContext timeoutContext = binding.getOperationContext().getTimeoutContext();
getAggregateOperation(timeoutContext).executeAsync(binding, (result, t) -> {
if (t != null) {
callback.onResult(null, t);
} else {
AsyncCommandBatchCursor<RawBsonDocument> cursor = (AsyncCommandBatchCursor<RawBsonDocument>) assertNotNull(result);
cursor.setCloseWithoutTimeoutReset(true);

callback.onResult(new AsyncChangeStreamBatchCursor<>(ChangeStreamOperation.this, cursor, binding,
setChangeStreamOptions(cursor.getPostBatchResumeToken(), cursor.getOperationTime(),
cursor.getMaxWireVersion(), cursor.isFirstBatchEmpty()), cursor.getMaxWireVersion()), null);
Expand Down
Loading