-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Add Client Operation Timeout to Change Streams #1321
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
- Trigger timeout error when maxAwaitTimeMS exceeds a timeoutMS. - Apply timeoutMS to initial aggregate and subsequent next calls, without appending maxTimeMS to getMore. - Utilize maxAwaitTimeMS as maxTimeMS in getMore commands. - Preserve change stream validity upon timeout errors. JAVA-4054
JAVA-4054
JAVA-4054
JAVA-4054
JAVA-4054
JAVA-4054
JAVA-4054
JAVA-4054
JAVA-4054
JAVA-4054
@@ -165,7 +165,7 @@ public TimeoutSettings getTimeoutSettings() { | |||
} | |||
|
|||
public long getMaxAwaitTimeMS() { | |||
return hasTimeoutMS() ? 0 : timeoutSettings.getMaxAwaitTimeMS(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In accordance with the CSOT specifications for the tailable-await cursor:
If the maxAwaitTimeMS option is set, drivers MUST use it as the maxTimeMS field on getMore commands.
The logic to determine whether maxAwaitTimeMS
should be utilized for getMore
is located in the AggregateOperationImpl and FindOperation.
JAVA-4054
JAVA-4054
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple of questions, its looking good just mainly things to help the maintainability in the future.
Also lets document in the sync drivers that a MongoOperationTimeoutException
is not terminal for a change stream cursor and calling next()
again may produce results.
@@ -59,6 +59,9 @@ | |||
class CommandBatchCursor<T> implements AggregateResponseBatchCursor<T> { | |||
|
|||
private final MongoNamespace namespace; | |||
/** | |||
* maxAwaitTimeMS | |||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit - no need for this javadoc
@@ -258,6 +261,9 @@ private CommandCursorResult<T> toCommandCursorResult(final ServerAddress serverA | |||
return commandCursorResult; | |||
} | |||
|
|||
void setCloseImmediately(final boolean closeImmediately) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be good to explain what his is and why its needed? I dont fully understand the reasoning here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current implementation automatically resets the timeoutMS for cursor closure, ensuring that the cursor has sufficient time to release resources. This behavior particularly impacts the ChangeStreamCursor
which enforces a unified timeout duration for the entire 'next' operation, including resume attempts and the internal process of closing the cursor.
The method modifies this behavior with respect to timeouts. When enabled, it prevents the automatic reset of the cursor's timeout for closing.
I realized that the method name is not reflecting its purpose. Therefore, I have updated the name to setCloseWithoutTimeoutReset
and added the JavaDoc documentation for clarity.
driver-core/src/test/unit/com/mongodb/internal/operation/ChangeStreamBatchCursorTest.java
Outdated
Show resolved
Hide resolved
@@ -192,14 +217,34 @@ <R> R resumeableOperation(final Function<AggregateResponseBatchCursor<RawBsonDoc | |||
throw MongoException.fromThrowableNonNull(t); | |||
} | |||
} | |||
wrapped.close(); | |||
resumeChangeStream(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is weird as resumeChangeStream
is also called via resumeAfterTimeout
- can this be simplified?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've simplified the code to avoid using resumeAfterTimeout
. If you have specific suggestions, please let me know.
* Indicates whether the last {@code next} call resulted in a {@link MongoOperationTimeoutException}. If {@code true}, | ||
* indicates a timeout occurred, prompting an attempt to resume the change stream on the next call. | ||
*/ | ||
private boolean lastOpTimeoutStatus; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The naming / use of this flag is confusing. Can you clarify in the code / docs that throwing a MongoOperationTimeoutException
doesn't invalidate the cursor.
The reason for this is in 6 months we'll all have forgotten about this special and strange CSOT behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Acknowledged. I have renamed the field and added additional class level javaDoc. Please let me know if there is anything to add.
Remove additional call to resumeChangeStream. JAVA-4054
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking really good.
Need to add to the public ChangeStreamIterable
Javadoc so its user discoverable.
* <p>
* A {@link MongoOperationTimeoutException} does not invalidate the {@link ChangeStreamBatchCursor}, but is immediately propagated to the caller.
* Subsequent method call will attempt to resume operation by establishing a new change stream on the server, without doing {@code getMore}
* request first.
* </p>
JAVA-5322
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
I noticed that the test failures are unrelated but we should keep an eye on them as there maybe more racy tests to review / fix in the future. |
Changes made:
NOTE
The Reactive Streams specification prevents us from allowing a subsequent next call (event in reactive terms) after a timeout error, conflicting with the CSOT spec requirement not to invalidate the change stream and to try resuming and establishing a new change stream on the server. We immediately let users know about a timeout error, which then closes the stream/publisher.
JAVA-4054