Skip to content

Add Client Operation Timeout to Change Streams #1321

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Mar 14, 2024

Conversation

vbabanin
Copy link
Member

@vbabanin vbabanin commented Feb 28, 2024

Changes made:

  • Trigger timeout error when maxAwaitTimeMS exceeds a timeoutMS.
  • Apply timeoutMS to initial aggregate and subsequent next calls, without appending maxTimeMS to getMore.
  • Attempt to resume change stream on subsequent next call if previous next failed with timeout error.
  • Use maxAwaitTimeMS as maxTimeMS in getMore commands.
  • Preserve change stream validity upon timeout errors.

NOTE

The Reactive Streams specification prevents us from allowing a subsequent next call (event in reactive terms) after a timeout error, conflicting with the CSOT spec requirement not to invalidate the change stream and to try resuming and establishing a new change stream on the server. We immediately let users know about a timeout error, which then closes the stream/publisher.

JAVA-4054

vbabanin and others added 13 commits February 28, 2024 15:01
- Trigger timeout error when maxAwaitTimeMS exceeds a timeoutMS.
- Apply timeoutMS to initial aggregate and subsequent next calls, without appending maxTimeMS to getMore.
- Utilize maxAwaitTimeMS as maxTimeMS in getMore commands.
- Preserve change stream validity upon timeout errors.

JAVA-4054
JAVA-4054
JAVA-4054
JAVA-4054
JAVA-4054
@@ -165,7 +165,7 @@ public TimeoutSettings getTimeoutSettings() {
}

public long getMaxAwaitTimeMS() {
return hasTimeoutMS() ? 0 : timeoutSettings.getMaxAwaitTimeMS();
Copy link
Member Author

@vbabanin vbabanin Mar 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In accordance with the CSOT specifications for the tailable-await cursor:

If the maxAwaitTimeMS option is set, drivers MUST use it as the maxTimeMS field on getMore commands.

The logic to determine whether maxAwaitTimeMS should be utilized for getMore is located in the AggregateOperationImpl and FindOperation.

@vbabanin vbabanin marked this pull request as ready for review March 5, 2024 07:56
@vbabanin vbabanin requested a review from rozza March 5, 2024 07:56
Copy link
Member

@rozza rozza left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of questions, its looking good just mainly things to help the maintainability in the future.

Also lets document in the sync drivers that a MongoOperationTimeoutException is not terminal for a change stream cursor and calling next() again may produce results.

@@ -59,6 +59,9 @@
class CommandBatchCursor<T> implements AggregateResponseBatchCursor<T> {

private final MongoNamespace namespace;
/**
* maxAwaitTimeMS
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit - no need for this javadoc

@@ -258,6 +261,9 @@ private CommandCursorResult<T> toCommandCursorResult(final ServerAddress serverA
return commandCursorResult;
}

void setCloseImmediately(final boolean closeImmediately) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to explain what his is and why its needed? I dont fully understand the reasoning here.

Copy link
Member Author

@vbabanin vbabanin Mar 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current implementation automatically resets the timeoutMS for cursor closure, ensuring that the cursor has sufficient time to release resources. This behavior particularly impacts the ChangeStreamCursor which enforces a unified timeout duration for the entire 'next' operation, including resume attempts and the internal process of closing the cursor.

The method modifies this behavior with respect to timeouts. When enabled, it prevents the automatic reset of the cursor's timeout for closing.

I realized that the method name is not reflecting its purpose. Therefore, I have updated the name to setCloseWithoutTimeoutReset and added the JavaDoc documentation for clarity.

@@ -192,14 +217,34 @@ <R> R resumeableOperation(final Function<AggregateResponseBatchCursor<RawBsonDoc
throw MongoException.fromThrowableNonNull(t);
}
}
wrapped.close();
resumeChangeStream();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is weird as resumeChangeStream is also called via resumeAfterTimeout - can this be simplified?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've simplified the code to avoid using resumeAfterTimeout. If you have specific suggestions, please let me know.

* Indicates whether the last {@code next} call resulted in a {@link MongoOperationTimeoutException}. If {@code true},
* indicates a timeout occurred, prompting an attempt to resume the change stream on the next call.
*/
private boolean lastOpTimeoutStatus;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The naming / use of this flag is confusing. Can you clarify in the code / docs that throwing a MongoOperationTimeoutException doesn't invalidate the cursor.

The reason for this is in 6 months we'll all have forgotten about this special and strange CSOT behavior.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acknowledged. I have renamed the field and added additional class level javaDoc. Please let me know if there is anything to add.

vbabanin added 2 commits March 6, 2024 16:50
Remove additional call to resumeChangeStream.

JAVA-4054
@vbabanin vbabanin requested a review from rozza March 7, 2024 00:56
JAVA-4054
Copy link
Member

@rozza rozza left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking really good.

Need to add to the public ChangeStreamIterable Javadoc so its user discoverable.

 * <p>
 * A {@link MongoOperationTimeoutException} does not invalidate the {@link ChangeStreamBatchCursor}, but is immediately propagated to the caller.
 * Subsequent method call will attempt to resume operation by establishing a new change stream on the server, without doing {@code getMore}
 * request first.
 * </p>

JAVA-5322
@vbabanin vbabanin requested a review from rozza March 12, 2024 06:17
Copy link
Member

@rozza rozza left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@rozza
Copy link
Member

rozza commented Mar 13, 2024

I noticed that the test failures are unrelated but we should keep an eye on them as there maybe more racy tests to review / fix in the future.

@vbabanin vbabanin merged commit ab1ec84 into mongodb:CSOT Mar 14, 2024
50 of 58 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants