-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Add Client Operation Timeout to Change Streams #1321
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 13 commits
2c163d8
d8760ab
961481e
37a9994
728d611
de200fe
da6eaea
4c44db9
4fd4d96
de6a9b3
cb69eb4
9507840
0a0e606
161667e
ecac544
6798ceb
5b7eeb7
e8f3cf0
84b0971
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,8 +18,10 @@ | |
|
||
import com.mongodb.MongoChangeStreamException; | ||
import com.mongodb.MongoException; | ||
import com.mongodb.MongoOperationTimeoutException; | ||
import com.mongodb.ServerAddress; | ||
import com.mongodb.ServerCursor; | ||
import com.mongodb.internal.TimeoutContext; | ||
import com.mongodb.internal.binding.ReadBinding; | ||
import com.mongodb.lang.Nullable; | ||
import org.bson.BsonDocument; | ||
|
@@ -41,22 +43,31 @@ final class ChangeStreamBatchCursor<T> implements AggregateResponseBatchCursor<T | |
private final ReadBinding binding; | ||
private final ChangeStreamOperation<T> changeStreamOperation; | ||
private final int maxWireVersion; | ||
|
||
private final TimeoutContext timeoutContext; | ||
private CommandBatchCursor<RawBsonDocument> wrapped; | ||
private BsonDocument resumeToken; | ||
private final AtomicBoolean closed; | ||
|
||
/** | ||
* This flag is used to manage change stream resumption logic after a timeout error. | ||
* Indicates whether the last {@code next} call resulted in a {@link MongoOperationTimeoutException}. If {@code true}, | ||
* indicates a timeout occurred, prompting an attempt to resume the change stream on the next call. | ||
*/ | ||
private boolean lastOpTimeoutStatus; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The naming / use of this flag is confusing. Can you clarify in the code / docs that throwing a The reason for this is in 6 months we'll all have forgotten about this special and strange CSOT behavior. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Acknowledged. I have renamed the field and added additional class level javaDoc. Please let me know if there is anything to add. |
||
|
||
ChangeStreamBatchCursor(final ChangeStreamOperation<T> changeStreamOperation, | ||
final CommandBatchCursor<RawBsonDocument> wrapped, | ||
final ReadBinding binding, | ||
@Nullable final BsonDocument resumeToken, | ||
final int maxWireVersion) { | ||
this.timeoutContext = binding.getOperationContext().getTimeoutContext(); | ||
this.changeStreamOperation = changeStreamOperation; | ||
this.binding = binding.retain(); | ||
this.wrapped = wrapped; | ||
this.resumeToken = resumeToken; | ||
this.maxWireVersion = maxWireVersion; | ||
closed = new AtomicBoolean(); | ||
lastOpTimeoutStatus = false; | ||
} | ||
|
||
CommandBatchCursor<RawBsonDocument> getWrapped() { | ||
|
@@ -107,6 +118,7 @@ public List<T> tryNext() { | |
@Override | ||
public void close() { | ||
if (!closed.getAndSet(true)) { | ||
resetTimeout(); | ||
wrapped.close(); | ||
binding.release(); | ||
} | ||
|
@@ -184,6 +196,19 @@ static <T> List<T> convertAndProduceLastId(final List<RawBsonDocument> rawDocume | |
} | ||
|
||
<R> R resumeableOperation(final Function<AggregateResponseBatchCursor<RawBsonDocument>, R> function) { | ||
resetTimeout(); | ||
try { | ||
resumeAfterTimeout(); | ||
R result = execute(function); | ||
lastOpTimeoutStatus = false; | ||
return result; | ||
} catch (Throwable exception) { | ||
lastOpTimeoutStatus = isTimeoutException(exception); | ||
throw exception; | ||
} | ||
} | ||
|
||
private <R> R execute(final Function<AggregateResponseBatchCursor<RawBsonDocument>, R> function) { | ||
while (true) { | ||
try { | ||
return function.apply(wrapped); | ||
|
@@ -192,14 +217,34 @@ <R> R resumeableOperation(final Function<AggregateResponseBatchCursor<RawBsonDoc | |
throw MongoException.fromThrowableNonNull(t); | ||
} | ||
} | ||
wrapped.close(); | ||
resumeChangeStream(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is weird as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've simplified the code to avoid using |
||
} | ||
} | ||
|
||
withReadConnectionSource(binding, source -> { | ||
changeStreamOperation.setChangeStreamOptionsForResume(resumeToken, source.getServerDescription().getMaxWireVersion()); | ||
return null; | ||
}); | ||
wrapped = ((ChangeStreamBatchCursor<T>) changeStreamOperation.execute(binding)).getWrapped(); | ||
binding.release(); // release the new change stream batch cursor's reference to the binding | ||
private void resumeChangeStream() { | ||
wrapped.close(); | ||
|
||
withReadConnectionSource(binding, source -> { | ||
changeStreamOperation.setChangeStreamOptionsForResume(resumeToken, source.getServerDescription().getMaxWireVersion()); | ||
return null; | ||
}); | ||
wrapped = ((ChangeStreamBatchCursor<T>) changeStreamOperation.execute(binding)).getWrapped(); | ||
binding.release(); // release the new change stream batch cursor's reference to the binding | ||
} | ||
|
||
private void resumeAfterTimeout() { | ||
if (lastOpTimeoutStatus && !closed.get()) { | ||
resumeChangeStream(); | ||
} | ||
} | ||
|
||
private void resetTimeout() { | ||
if (timeoutContext.hasTimeoutMS()) { | ||
timeoutContext.resetTimeout(); | ||
} | ||
} | ||
|
||
private static boolean isTimeoutException(final Throwable exception) { | ||
return exception instanceof MongoOperationTimeoutException; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,13 +24,13 @@ | |
import com.mongodb.ServerAddress; | ||
import com.mongodb.ServerCursor; | ||
import com.mongodb.annotations.ThreadSafe; | ||
import com.mongodb.client.cursor.TimeoutMode; | ||
import com.mongodb.connection.ConnectionDescription; | ||
import com.mongodb.connection.ServerType; | ||
import com.mongodb.internal.VisibleForTesting; | ||
import com.mongodb.internal.binding.ConnectionSource; | ||
import com.mongodb.internal.connection.Connection; | ||
import com.mongodb.internal.connection.OperationContext; | ||
import com.mongodb.client.cursor.TimeoutMode; | ||
import com.mongodb.lang.Nullable; | ||
import org.bson.BsonDocument; | ||
import org.bson.BsonTimestamp; | ||
|
@@ -59,6 +59,9 @@ | |
class CommandBatchCursor<T> implements AggregateResponseBatchCursor<T> { | ||
|
||
private final MongoNamespace namespace; | ||
/** | ||
* maxAwaitTimeMS | ||
*/ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit - no need for this javadoc |
||
private final long maxTimeMS; | ||
private final Decoder<T> decoder; | ||
@Nullable | ||
|
@@ -258,6 +261,9 @@ private CommandCursorResult<T> toCommandCursorResult(final ServerAddress serverA | |
return commandCursorResult; | ||
} | ||
|
||
void setCloseImmediately(final boolean closeImmediately) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would be good to explain what his is and why its needed? I dont fully understand the reasoning here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The current implementation automatically resets the timeoutMS for cursor closure, ensuring that the cursor has sufficient time to release resources. This behavior particularly impacts the The method modifies this behavior with respect to timeouts. When enabled, it prevents the automatic reset of the cursor's timeout for closing. I realized that the method name is not reflecting its purpose. Therefore, I have updated the name to |
||
this.resourceManager.setCloseImmediately(closeImmediately); | ||
} | ||
@ThreadSafe | ||
private static final class ResourceManager extends CursorResourceManager<ConnectionSource, Connection> { | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In accordance with the CSOT specifications for the tailable-await cursor:
The logic to determine whether
maxAwaitTimeMS
should be utilized forgetMore
is located in the AggregateOperationImpl and FindOperation.