Skip to content

Commit ab1ec84

Browse files
authored
Add Client Operation Timeout to Change Streams (#1321)
- Trigger timeout error when maxAwaitTimeMS exceeds a timeoutMS. - Apply timeoutMS to initial aggregate and subsequent next calls, without appending maxTimeMS to getMore. - Utilize maxAwaitTimeMS as maxTimeMS in getMore commands. - Preserve change stream validity upon timeout errors. JAVA-4054
1 parent 45895b2 commit ab1ec84

34 files changed

+976
-73
lines changed

build.gradle

+2-2
Original file line numberDiff line numberDiff line change
@@ -257,8 +257,8 @@ configure(javaCodeCheckedProjects) {
257257
testImplementation platform('org.spockframework:spock-bom:2.1-groovy-3.0')
258258
testImplementation 'org.spockframework:spock-core'
259259
testImplementation 'org.spockframework:spock-junit4'
260-
testImplementation("org.mockito:mockito-core:3.8.0")
261-
testImplementation("org.mockito:mockito-inline:3.8.0")
260+
testImplementation 'org.mockito:mockito-core:4.0.0'
261+
testImplementation 'org.mockito:mockito-inline:4.0.0'
262262
testImplementation 'cglib:cglib-nodep:2.2.2'
263263
testImplementation 'org.objenesis:objenesis:1.3'
264264
testImplementation 'org.hamcrest:hamcrest-all:1.3'

driver-core/src/main/com/mongodb/internal/TimeoutContext.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ public TimeoutSettings getTimeoutSettings() {
167167
}
168168

169169
public long getMaxAwaitTimeMS() {
170-
return hasTimeoutMS() ? 0 : timeoutSettings.getMaxAwaitTimeMS();
170+
return timeoutSettings.getMaxAwaitTimeMS();
171171
}
172172

173173
public long getMaxTimeMS() {

driver-core/src/main/com/mongodb/internal/operation/AggregateOperationImpl.java

+25-4
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,12 @@
1616

1717
package com.mongodb.internal.operation;
1818

19+
import com.mongodb.CursorType;
1920
import com.mongodb.MongoNamespace;
2021
import com.mongodb.client.cursor.TimeoutMode;
2122
import com.mongodb.client.model.Collation;
23+
import com.mongodb.internal.TimeoutContext;
24+
import com.mongodb.internal.TimeoutSettings;
2225
import com.mongodb.internal.async.AsyncBatchCursor;
2326
import com.mongodb.internal.async.SingleResultCallback;
2427
import com.mongodb.internal.binding.AsyncReadBinding;
@@ -68,6 +71,7 @@ class AggregateOperationImpl<T> implements AsyncReadOperation<AsyncBatchCursor<T
6871
private BsonValue hint;
6972
private BsonDocument variables;
7073
private TimeoutMode timeoutMode;
74+
private CursorType cursorType;
7175

7276
AggregateOperationImpl(final MongoNamespace namespace,
7377
final List<BsonDocument> pipeline, final Decoder<T> decoder, final AggregationLevel aggregationLevel) {
@@ -146,6 +150,19 @@ AggregateOperationImpl<T> retryReads(final boolean retryReads) {
146150
return this;
147151
}
148152

153+
/**
154+
* When {@link TimeoutContext#hasTimeoutMS()} then {@link TimeoutSettings#getMaxAwaitTimeMS()} usage in {@code getMore} commands
155+
* depends on the type of cursor. For {@link CursorType#TailableAwait} it is used, for others it is not.
156+
* {@link CursorType#TailableAwait} is used mainly for change streams in {@link AggregateOperationImpl}.
157+
*
158+
* @param cursorType
159+
* @return this
160+
*/
161+
AggregateOperationImpl<T> cursorType(final CursorType cursorType) {
162+
this.cursorType = cursorType;
163+
return this;
164+
}
165+
149166
boolean getRetryReads() {
150167
return retryReads;
151168
}
@@ -221,13 +238,13 @@ BsonDocument getCommand(final OperationContext operationContext, final int maxWi
221238
private CommandReadTransformer<BsonDocument, CommandBatchCursor<T>> transformer() {
222239
return (result, source, connection) ->
223240
new CommandBatchCursor<>(getTimeoutMode(), result, batchSize != null ? batchSize : 0,
224-
getMaxTimeForCursor(source.getOperationContext()), decoder, comment, source, connection);
241+
getMaxTimeForCursor(source.getOperationContext().getTimeoutContext()), decoder, comment, source, connection);
225242
}
226243

227244
private CommandReadTransformerAsync<BsonDocument, AsyncBatchCursor<T>> asyncTransformer() {
228245
return (result, source, connection) ->
229246
new AsyncCommandBatchCursor<>(getTimeoutMode(), result, batchSize != null ? batchSize : 0,
230-
getMaxTimeForCursor(source.getOperationContext()), decoder, comment, source, connection);
247+
getMaxTimeForCursor(source.getOperationContext().getTimeoutContext()), decoder, comment, source, connection);
231248
}
232249

233250
private TimeoutMode getTimeoutMode() {
@@ -238,8 +255,12 @@ private TimeoutMode getTimeoutMode() {
238255
return localTimeoutMode;
239256
}
240257

241-
private long getMaxTimeForCursor(final OperationContext operationContext) {
242-
return operationContext.getTimeoutContext().getMaxAwaitTimeMS();
258+
private long getMaxTimeForCursor(final TimeoutContext timeoutContext) {
259+
long maxAwaitTimeMS = timeoutContext.getMaxAwaitTimeMS();
260+
if (timeoutContext.hasTimeoutMS()){
261+
return CursorType.TailableAwait == cursorType ? maxAwaitTimeMS : 0;
262+
}
263+
return maxAwaitTimeMS;
243264
}
244265

245266
interface AggregateTarget {

driver-core/src/main/com/mongodb/internal/operation/AsyncChangeStreamBatchCursor.java

+13-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.mongodb.internal.operation;
1818

1919
import com.mongodb.MongoException;
20+
import com.mongodb.internal.TimeoutContext;
2021
import com.mongodb.internal.async.AsyncAggregateResponseBatchCursor;
2122
import com.mongodb.internal.async.AsyncBatchCursor;
2223
import com.mongodb.internal.async.SingleResultCallback;
@@ -42,6 +43,7 @@
4243

4344
final class AsyncChangeStreamBatchCursor<T> implements AsyncAggregateResponseBatchCursor<T> {
4445
private final AsyncReadBinding binding;
46+
private final TimeoutContext timeoutContext;
4547
private final ChangeStreamOperation<T> changeStreamOperation;
4648
private final int maxWireVersion;
4749

@@ -63,6 +65,7 @@ final class AsyncChangeStreamBatchCursor<T> implements AsyncAggregateResponseBat
6365
this.wrapped = new AtomicReference<>(assertNotNull(wrapped));
6466
this.binding = binding;
6567
binding.retain();
68+
this.timeoutContext = binding.getOperationContext().getTimeoutContext();
6669
this.resumeToken = resumeToken;
6770
this.maxWireVersion = maxWireVersion;
6871
isClosed = new AtomicBoolean();
@@ -80,6 +83,7 @@ public void next(final SingleResultCallback<List<T>> callback) {
8083

8184
@Override
8285
public void close() {
86+
resetTimeout();
8387
if (isClosed.compareAndSet(false, true)) {
8488
try {
8589
nullifyAndCloseWrapped();
@@ -177,6 +181,7 @@ private interface AsyncBlock {
177181
}
178182

179183
private void resumeableOperation(final AsyncBlock asyncBlock, final SingleResultCallback<List<T>> callback, final boolean tryNext) {
184+
resetTimeout();
180185
SingleResultCallback<List<T>> errHandlingCallback = errorHandlingCallback(callback, LOGGER);
181186
if (isClosed()) {
182187
errHandlingCallback.onResult(null, new MongoException(format("%s called after the cursor was closed.",
@@ -219,12 +224,12 @@ private void retryOperation(final AsyncBlock asyncBlock, final SingleResultCallb
219224
changeStreamOperation.setChangeStreamOptionsForResume(resumeToken,
220225
assertNotNull(source).getServerDescription().getMaxWireVersion());
221226
source.release();
222-
changeStreamOperation.executeAsync(binding, (result, t1) -> {
227+
changeStreamOperation.executeAsync(binding, (asyncBatchCursor, t1) -> {
223228
if (t1 != null) {
224229
callback.onResult(null, t1);
225230
} else {
226231
try {
227-
setWrappedOrCloseIt(assertNotNull((AsyncChangeStreamBatchCursor<T>) result).getWrapped());
232+
setWrappedOrCloseIt(assertNotNull((AsyncChangeStreamBatchCursor<T>) asyncBatchCursor).getWrapped());
228233
} finally {
229234
try {
230235
binding.release(); // release the new change stream batch cursor's reference to the binding
@@ -237,4 +242,10 @@ private void retryOperation(final AsyncBlock asyncBlock, final SingleResultCallb
237242
}
238243
});
239244
}
245+
246+
private void resetTimeout() {
247+
if (timeoutContext.hasTimeoutMS()) {
248+
timeoutContext.resetTimeout();
249+
}
250+
}
240251
}

driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java

+4
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,10 @@ private CommandCursorResult<T> toCommandCursorResult(final ServerAddress serverA
212212
return commandCursorResult;
213213
}
214214

215+
void setCloseWithoutTimeoutReset(final boolean closeWithoutTimeoutReset) {
216+
this.resourceManager.setCloseWithoutTimeoutReset(closeWithoutTimeoutReset);
217+
}
218+
215219
@ThreadSafe
216220
private static final class ResourceManager extends CursorResourceManager<AsyncConnectionSource, AsyncConnection> {
217221

driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java

+69-8
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818

1919
import com.mongodb.MongoChangeStreamException;
2020
import com.mongodb.MongoException;
21+
import com.mongodb.MongoOperationTimeoutException;
2122
import com.mongodb.ServerAddress;
2223
import com.mongodb.ServerCursor;
24+
import com.mongodb.internal.TimeoutContext;
2325
import com.mongodb.internal.binding.ReadBinding;
2426
import com.mongodb.lang.Nullable;
2527
import org.bson.BsonDocument;
@@ -37,26 +39,50 @@
3739
import static com.mongodb.internal.operation.ChangeStreamBatchCursorHelper.isResumableError;
3840
import static com.mongodb.internal.operation.SyncOperationHelper.withReadConnectionSource;
3941

42+
/**
43+
* A change stream cursor that wraps {@link CommandBatchCursor} with automatic resumption capabilities in the event
44+
* of timeouts or transient errors.
45+
* <p>
46+
* Upon encountering a resumable error during {@code hasNext()}, {@code next()}, or {@code tryNext()} calls, the {@link ChangeStreamBatchCursor}
47+
* attempts to establish a new change stream on the server.
48+
* </p>
49+
* If an error occurring during any of these method calls is not resumable, it is immediately propagated to the caller, and the {@link ChangeStreamBatchCursor}
50+
* is closed and invalidated on the server. Server errors that occur during this invalidation process are not propagated to the caller.
51+
* <p>
52+
* A {@link MongoOperationTimeoutException} does not invalidate the {@link ChangeStreamBatchCursor}, but is immediately propagated to the caller.
53+
* Subsequent method call will attempt to resume operation by establishing a new change stream on the server, without doing {@code getMore}
54+
* request first.
55+
* </p>
56+
*/
4057
final class ChangeStreamBatchCursor<T> implements AggregateResponseBatchCursor<T> {
4158
private final ReadBinding binding;
4259
private final ChangeStreamOperation<T> changeStreamOperation;
4360
private final int maxWireVersion;
44-
61+
private final TimeoutContext timeoutContext;
4562
private CommandBatchCursor<RawBsonDocument> wrapped;
4663
private BsonDocument resumeToken;
4764
private final AtomicBoolean closed;
4865

66+
/**
67+
* This flag is used to manage change stream resumption logic after a timeout error.
68+
* Indicates whether the last {@code hasNext()}, {@code next()}, or {@code tryNext()} call resulted in a {@link MongoOperationTimeoutException}.
69+
* If {@code true}, indicates a timeout occurred, prompting an attempt to resume the change stream on the subsequent call.
70+
*/
71+
private boolean lastOperationTimedOut;
72+
4973
ChangeStreamBatchCursor(final ChangeStreamOperation<T> changeStreamOperation,
5074
final CommandBatchCursor<RawBsonDocument> wrapped,
5175
final ReadBinding binding,
5276
@Nullable final BsonDocument resumeToken,
5377
final int maxWireVersion) {
78+
this.timeoutContext = binding.getOperationContext().getTimeoutContext();
5479
this.changeStreamOperation = changeStreamOperation;
5580
this.binding = binding.retain();
5681
this.wrapped = wrapped;
5782
this.resumeToken = resumeToken;
5883
this.maxWireVersion = maxWireVersion;
5984
closed = new AtomicBoolean();
85+
lastOperationTimedOut = false;
6086
}
6187

6288
CommandBatchCursor<RawBsonDocument> getWrapped() {
@@ -107,6 +133,7 @@ public List<T> tryNext() {
107133
@Override
108134
public void close() {
109135
if (!closed.getAndSet(true)) {
136+
resetTimeout();
110137
wrapped.close();
111138
binding.release();
112139
}
@@ -184,22 +211,56 @@ static <T> List<T> convertAndProduceLastId(final List<RawBsonDocument> rawDocume
184211
}
185212

186213
<R> R resumeableOperation(final Function<AggregateResponseBatchCursor<RawBsonDocument>, R> function) {
214+
resetTimeout();
215+
try {
216+
R result = execute(function);
217+
lastOperationTimedOut = false;
218+
return result;
219+
} catch (Throwable exception) {
220+
lastOperationTimedOut = isTimeoutException(exception);
221+
throw exception;
222+
}
223+
}
224+
225+
private <R> R execute(final Function<AggregateResponseBatchCursor<RawBsonDocument>, R> function) {
226+
boolean shouldBeResumed = hasPreviousNextTimedOut();
187227
while (true) {
228+
if (shouldBeResumed) {
229+
resumeChangeStream();
230+
}
188231
try {
189232
return function.apply(wrapped);
190233
} catch (Throwable t) {
191234
if (!isResumableError(t, maxWireVersion)) {
192235
throw MongoException.fromThrowableNonNull(t);
193236
}
237+
shouldBeResumed = true;
194238
}
195-
wrapped.close();
239+
}
240+
}
241+
242+
private void resumeChangeStream() {
243+
wrapped.close();
244+
245+
withReadConnectionSource(binding, source -> {
246+
changeStreamOperation.setChangeStreamOptionsForResume(resumeToken, source.getServerDescription().getMaxWireVersion());
247+
return null;
248+
});
249+
wrapped = ((ChangeStreamBatchCursor<T>) changeStreamOperation.execute(binding)).getWrapped();
250+
binding.release(); // release the new change stream batch cursor's reference to the binding
251+
}
252+
253+
private boolean hasPreviousNextTimedOut() {
254+
return lastOperationTimedOut && !closed.get();
255+
}
196256

197-
withReadConnectionSource(binding, source -> {
198-
changeStreamOperation.setChangeStreamOptionsForResume(resumeToken, source.getServerDescription().getMaxWireVersion());
199-
return null;
200-
});
201-
wrapped = ((ChangeStreamBatchCursor<T>) changeStreamOperation.execute(binding)).getWrapped();
202-
binding.release(); // release the new change stream batch cursor's reference to the binding
257+
private void resetTimeout() {
258+
if (timeoutContext.hasTimeoutMS()) {
259+
timeoutContext.resetTimeout();
203260
}
204261
}
262+
263+
private static boolean isTimeoutException(final Throwable exception) {
264+
return exception instanceof MongoOperationTimeoutException;
265+
}
205266
}

driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java

+34-3
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@
1616

1717
package com.mongodb.internal.operation;
1818

19+
import com.mongodb.CursorType;
1920
import com.mongodb.MongoNamespace;
2021
import com.mongodb.client.model.Collation;
2122
import com.mongodb.client.model.changestream.FullDocument;
2223
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
24+
import com.mongodb.internal.TimeoutContext;
2325
import com.mongodb.internal.async.AsyncBatchCursor;
2426
import com.mongodb.internal.async.SingleResultCallback;
2527
import com.mongodb.internal.binding.AsyncReadBinding;
@@ -42,6 +44,7 @@
4244

4345
import static com.mongodb.assertions.Assertions.assertNotNull;
4446
import static com.mongodb.assertions.Assertions.notNull;
47+
import static com.mongodb.client.cursor.TimeoutMode.CURSOR_LIFETIME;
4548

4649
/**
4750
* An operation that executes an {@code $changeStream} aggregation.
@@ -71,7 +74,7 @@ public ChangeStreamOperation(final MongoNamespace namespace, final FullDocument
7174
final FullDocumentBeforeChange fullDocumentBeforeChange, final List<BsonDocument> pipeline, final Decoder<T> decoder,
7275
final ChangeStreamLevel changeStreamLevel) {
7376
this.wrapped = new AggregateOperationImpl<>(namespace, pipeline, RAW_BSON_DOCUMENT_CODEC, getAggregateTarget(),
74-
getPipelineCreator());
77+
getPipelineCreator()).cursorType(CursorType.TailableAwait);
7578
this.fullDocument = notNull("fullDocument", fullDocument);
7679
this.fullDocumentBeforeChange = notNull("fullDocumentBeforeChange", fullDocumentBeforeChange);
7780
this.decoder = notNull("decoder", decoder);
@@ -167,21 +170,49 @@ public ChangeStreamOperation<T> showExpandedEvents(final boolean showExpandedEve
167170
return this;
168171
}
169172

173+
/**
174+
* Gets an aggregate operation with consideration for timeout settings.
175+
* <p>
176+
* Change streams act similarly to tailable awaitData cursors, with identical timeoutMS option behavior.
177+
* Key distinctions include:
178+
* - The timeoutMS option must be applied at the start of the aggregate operation for change streams.
179+
* - Change streams support resumption on next() calls. The driver handles automatic resumption for transient errors.
180+
* <p>
181+
*
182+
* As a result, when {@code timeoutContext.hasTimeoutMS()} the CURSOR_LIFETIME setting is utilized to manage the underlying cursor's
183+
* lifespan in change streams.
184+
*
185+
* @param timeoutContext
186+
* @return An AggregateOperationImpl
187+
*/
188+
private AggregateOperationImpl<RawBsonDocument> getAggregateOperation(final TimeoutContext timeoutContext) {
189+
if (timeoutContext.hasTimeoutMS()) {
190+
return wrapped.timeoutMode(CURSOR_LIFETIME);
191+
}
192+
return wrapped;
193+
}
194+
170195
@Override
171196
public BatchCursor<T> execute(final ReadBinding binding) {
172-
CommandBatchCursor<RawBsonDocument> cursor = (CommandBatchCursor<RawBsonDocument>) wrapped.execute(binding);
197+
TimeoutContext timeoutContext = binding.getOperationContext().getTimeoutContext();
198+
CommandBatchCursor<RawBsonDocument> cursor = (CommandBatchCursor<RawBsonDocument>) getAggregateOperation(timeoutContext).execute(binding);
199+
cursor.setCloseWithoutTimeoutReset(true);
200+
173201
return new ChangeStreamBatchCursor<>(ChangeStreamOperation.this, cursor, binding,
174202
setChangeStreamOptions(cursor.getPostBatchResumeToken(), cursor.getOperationTime(),
175203
cursor.getMaxWireVersion(), cursor.isFirstBatchEmpty()), cursor.getMaxWireVersion());
176204
}
177205

178206
@Override
179207
public void executeAsync(final AsyncReadBinding binding, final SingleResultCallback<AsyncBatchCursor<T>> callback) {
180-
wrapped.executeAsync(binding, (result, t) -> {
208+
TimeoutContext timeoutContext = binding.getOperationContext().getTimeoutContext();
209+
getAggregateOperation(timeoutContext).executeAsync(binding, (result, t) -> {
181210
if (t != null) {
182211
callback.onResult(null, t);
183212
} else {
184213
AsyncCommandBatchCursor<RawBsonDocument> cursor = (AsyncCommandBatchCursor<RawBsonDocument>) assertNotNull(result);
214+
cursor.setCloseWithoutTimeoutReset(true);
215+
185216
callback.onResult(new AsyncChangeStreamBatchCursor<>(ChangeStreamOperation.this, cursor, binding,
186217
setChangeStreamOptions(cursor.getPostBatchResumeToken(), cursor.getOperationTime(),
187218
cursor.getMaxWireVersion(), cursor.isFirstBatchEmpty()), cursor.getMaxWireVersion()), null);

0 commit comments

Comments
 (0)