Skip to content

Update RxResultCursorImpl #1580

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ public CompletionStage<RxResultCursor> runRx(
apiTelemetryWork.setEnabled(!telemetryDisabled);
var runFailed = new AtomicBoolean(false);
var responseHandler =
new RunRxResponseHandler(connection, query, this::handleNewBookmark, runFailed);
new RunRxResponseHandler(logging, connection, query, this::handleNewBookmark, runFailed);
var cursorStage = apiTelemetryWork
.pipelineTelemetryIfEnabled(connection)
.thenCompose(conn -> conn.runInAutoCommitTransaction(
Expand Down Expand Up @@ -808,6 +808,7 @@ public AuthToken overrideAuthToken() {

public static class RunRxResponseHandler implements ResponseHandler {
final CompletableFuture<RxResultCursor> cursorFuture = new CompletableFuture<>();
private final Logging logging;
private final BoltConnection connection;
private final Query query;
private final Consumer<DatabaseBookmark> bookmarkConsumer;
Expand All @@ -817,10 +818,12 @@ public static class RunRxResponseHandler implements ResponseHandler {
private int ignoredCount;

public RunRxResponseHandler(
Logging logging,
BoltConnection connection,
Query query,
Consumer<DatabaseBookmark> bookmarkConsumer,
AtomicBoolean runFailed) {
this.logging = logging;
this.connection = connection;
this.query = query;
this.bookmarkConsumer = bookmarkConsumer;
Expand Down Expand Up @@ -867,11 +870,11 @@ public void onComplete() {
query,
runSummary,
error,
() -> null,
bookmarkConsumer,
(ignored) -> {},
true,
() -> null));
() -> null,
logging));
} else {
var message = ignoredCount > 0
? "Run exchange contains ignored messages."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.neo4j.driver.internal.FailableCursor;
import org.neo4j.driver.internal.util.Futures;

public class ResultCursorsHolder {
private final List<CompletionStage<? extends FailableCursor>> cursorStages = new ArrayList<>();
Expand All @@ -35,8 +36,11 @@ void add(CompletionStage<? extends FailableCursor> cursorStage) {
cursorStages.add(cursorStage);
}
cursorStage.thenCompose(FailableCursor::consumed).whenComplete((ignored, throwable) -> {
throwable = Futures.completionExceptionCause(throwable);
synchronized (this) {
cursorStages.remove(cursorStage);
if (throwable == null) {
cursorStages.remove(cursorStage);
}
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ private enum State {
"Can't rollback, transaction has been requested to be committed";
private static final EnumSet<State> OPEN_STATES = EnumSet.of(State.ACTIVE, State.TERMINATED);

private final Logging logging;
private final TerminationAwareBoltConnection connection;
private final Consumer<DatabaseBookmark> bookmarkConsumer;
private final ResultCursorsHolder resultCursors;
Expand Down Expand Up @@ -153,6 +154,7 @@ protected UnmanagedTransaction(
NotificationConfig notificationConfig,
ApiTelemetryWork apiTelemetryWork,
Logging logging) {
this.logging = logging;
this.connection = new TerminationAwareBoltConnection(connection, this);
this.databaseName = databaseName;
this.accessMode = accessMode;
Expand Down Expand Up @@ -254,6 +256,7 @@ public CompletionStage<RxResultCursor> runRx(Query query) {
ensureCanRunQueries();
var parameters = query.parameters().asMap(Values::value);
var responseHandler = new RunRxResponseHandler(
logging,
apiTelemetryWork,
() -> executeWithLock(lock, () -> causeOfTermination),
this::markTerminated,
Expand Down Expand Up @@ -673,6 +676,7 @@ public void onComplete() {

private static class RunRxResponseHandler implements ResponseHandler {
final CompletableFuture<RxResultCursor> cursorFuture = new CompletableFuture<>();
private final Logging logging;
private final ApiTelemetryWork apiTelemetryWork;
private final Supplier<Throwable> termSupplier;
private final Consumer<Throwable> markTerminated;
Expand All @@ -685,13 +689,15 @@ private static class RunRxResponseHandler implements ResponseHandler {
private int ignoredCount;

private RunRxResponseHandler(
Logging logging,
ApiTelemetryWork apiTelemetryWork,
Supplier<Throwable> termSupplier,
Consumer<Throwable> markTerminated,
CompletableFuture<UnmanagedTransaction> beginFuture,
UnmanagedTransaction transaction,
BoltConnection connection,
Query query) {
this.logging = logging;
this.apiTelemetryWork = apiTelemetryWork;
this.termSupplier = termSupplier;
this.markTerminated = markTerminated;
Expand Down Expand Up @@ -747,11 +753,11 @@ public void onComplete() {
query,
null,
error,
termSupplier,
bookmark -> {},
transaction::markTerminated,
false,
termSupplier));
termSupplier,
logging));
}
} else {
if (runSummary != null) {
Expand All @@ -760,11 +766,11 @@ public void onComplete() {
query,
runSummary,
null,
termSupplier,
bookmark -> {},
transaction::markTerminated,
false,
termSupplier));
termSupplier,
logging));
} else {
var throwable = termSupplier.get();
if (throwable == null) {
Expand Down
Loading