Skip to content

Commit 65b187c

Browse files
authored
fix: Ensure reactive transaction subsequent runs work during streaming (#1636)
When using reactive transaction, it is possible for one thread to initiate multiple query runs with subsequent results streaming, while the driver thread would be supplying the respective results and handling the streaming. Since all of this uses the same Bolt connection, it is important to make sure it is shared correctly to avoid mismanagement of the Bolt message handling.
1 parent 6c80601 commit 65b187c

20 files changed

+393
-172
lines changed

driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/AdaptingDriverBoltConnection.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.Objects;
2222
import java.util.Set;
2323
import java.util.concurrent.CompletionStage;
24+
import java.util.function.Supplier;
2425
import org.neo4j.bolt.connection.AccessMode;
2526
import org.neo4j.bolt.connection.AuthInfo;
2627
import org.neo4j.bolt.connection.AuthTokens;
@@ -48,8 +49,8 @@ final class AdaptingDriverBoltConnection implements DriverBoltConnection {
4849
}
4950

5051
@Override
51-
public CompletionStage<DriverBoltConnection> onLoop() {
52-
return connection.onLoop().exceptionally(errorMapper::mapAndTrow).thenApply(ignored -> this);
52+
public <T> CompletionStage<T> onLoop(Supplier<T> supplier) {
53+
return connection.onLoop(supplier);
5354
}
5455

5556
@Override

driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/DriverBoltConnection.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.Map;
2121
import java.util.Set;
2222
import java.util.concurrent.CompletionStage;
23+
import java.util.function.Supplier;
2324
import org.neo4j.bolt.connection.AccessMode;
2425
import org.neo4j.bolt.connection.AuthInfo;
2526
import org.neo4j.bolt.connection.BoltConnectionState;
@@ -32,7 +33,7 @@
3233
import org.neo4j.driver.Value;
3334

3435
public interface DriverBoltConnection {
35-
CompletionStage<DriverBoltConnection> onLoop();
36+
<T> CompletionStage<T> onLoop(Supplier<T> supplier);
3637

3738
CompletionStage<DriverBoltConnection> route(
3839
DatabaseName databaseName, String impersonatedUser, Set<String> bookmarks);

driver/src/main/java/org/neo4j/driver/internal/async/DelegatingBoltConnection.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.Objects;
2222
import java.util.Set;
2323
import java.util.concurrent.CompletionStage;
24+
import java.util.function.Supplier;
2425
import org.neo4j.bolt.connection.AccessMode;
2526
import org.neo4j.bolt.connection.AuthInfo;
2627
import org.neo4j.bolt.connection.BoltConnectionState;
@@ -42,8 +43,8 @@ protected DelegatingBoltConnection(DriverBoltConnection delegate) {
4243
}
4344

4445
@Override
45-
public CompletionStage<DriverBoltConnection> onLoop() {
46-
return delegate.onLoop().thenApply(ignored -> this);
46+
public <T> CompletionStage<T> onLoop(Supplier<T> supplier) {
47+
return delegate.onLoop(supplier);
4748
}
4849

4950
@Override

driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,11 @@
2929
import java.util.concurrent.CompletableFuture;
3030
import java.util.concurrent.CompletionException;
3131
import java.util.concurrent.CompletionStage;
32+
import java.util.concurrent.TimeUnit;
3233
import java.util.concurrent.TimeoutException;
3334
import java.util.concurrent.atomic.AtomicBoolean;
35+
import java.util.concurrent.locks.Condition;
36+
import java.util.concurrent.locks.Lock;
3437
import java.util.function.Consumer;
3538
import java.util.function.Function;
3639
import java.util.function.Supplier;
@@ -707,6 +710,7 @@ public AuthToken overrideAuthToken() {
707710
}
708711

709712
public static class RunRxResponseHandler implements DriverResponseHandler {
713+
private static final Lock NOOP_LOCK = new NoopLock();
710714
final CompletableFuture<RxResultCursor> cursorFuture = new CompletableFuture<>();
711715
private final Logging logging;
712716
private final DriverBoltConnection connection;
@@ -763,8 +767,8 @@ public void onComplete() {
763767
if (error != null) {
764768
runFailed.set(true);
765769
}
766-
cursorFuture.complete(
767-
new RxResultCursorImpl(connection, query, runSummary, error, bookmarkConsumer, true, logging));
770+
cursorFuture.complete(new RxResultCursorImpl(
771+
connection, NOOP_LOCK, query, runSummary, error, bookmarkConsumer, true, logging));
768772
} else {
769773
var message = ignoredCount > 0
770774
? "Run exchange contains ignored messages."
@@ -793,4 +797,30 @@ public boolean handleSecurityException(AuthToken authToken, SecurityException ex
793797
return false;
794798
}
795799
}
800+
801+
private static class NoopLock implements Lock {
802+
@Override
803+
public void lock() {}
804+
805+
@Override
806+
public void lockInterruptibly() {}
807+
808+
@Override
809+
public boolean tryLock() {
810+
return true;
811+
}
812+
813+
@Override
814+
public boolean tryLock(long time, TimeUnit unit) {
815+
return true;
816+
}
817+
818+
@Override
819+
public void unlock() {}
820+
821+
@Override
822+
public Condition newCondition() {
823+
return null;
824+
}
825+
}
796826
}

driver/src/main/java/org/neo4j/driver/internal/async/TerminationAwareBoltConnection.java

Lines changed: 65 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.concurrent.CompletionException;
2222
import java.util.concurrent.CompletionStage;
2323
import java.util.function.Consumer;
24+
import java.util.function.Function;
2425
import org.neo4j.driver.Logger;
2526
import org.neo4j.driver.Logging;
2627
import org.neo4j.driver.internal.adaptedbolt.DriverBoltConnection;
@@ -48,81 +49,82 @@ public TerminationAwareBoltConnection(
4849
public CompletionStage<DriverBoltConnection> clearAndReset() {
4950
var future = new CompletableFuture<DriverBoltConnection>();
5051
var thisVal = this;
51-
52-
delegate.onLoop()
53-
.thenCompose(connection -> executor.execute(ignored -> connection
54-
.clear()
55-
.thenCompose(DriverBoltConnection::reset)
56-
.thenCompose(conn -> conn.flush(new DriverResponseHandler() {
57-
Throwable throwable = null;
58-
59-
@Override
60-
public void onError(Throwable throwable) {
61-
log.error("Unexpected error occurred while resetting connection", throwable);
62-
throwableConsumer.accept(throwable);
63-
this.throwable = throwable;
64-
}
65-
66-
@Override
67-
public void onComplete() {
68-
if (throwable != null) {
69-
future.completeExceptionally(throwable);
70-
} else {
71-
future.complete(thisVal);
72-
}
73-
}
74-
}))))
52+
delegate.onLoop(() -> executor.execute(ignored -> clearAndResetBolt(future)))
53+
.thenCompose(Function.identity())
7554
.whenComplete((ignored, throwable) -> {
7655
if (throwable != null) {
7756
throwableConsumer.accept(throwable);
7857
future.completeExceptionally(throwable);
7958
}
8059
});
81-
8260
return future;
8361
}
8462

63+
private CompletionStage<Void> clearAndResetBolt(CompletableFuture<DriverBoltConnection> future) {
64+
var thisVal = this;
65+
return delegate.clear()
66+
.thenCompose(DriverBoltConnection::reset)
67+
.thenCompose(conn -> conn.flush(new DriverResponseHandler() {
68+
Throwable throwable = null;
69+
70+
@Override
71+
public void onError(Throwable throwable) {
72+
log.error("Unexpected error occurred while resetting connection", throwable);
73+
throwableConsumer.accept(throwable);
74+
this.throwable = throwable;
75+
}
76+
77+
@Override
78+
public void onComplete() {
79+
if (throwable != null) {
80+
future.completeExceptionally(throwable);
81+
} else {
82+
future.complete(thisVal);
83+
}
84+
}
85+
}));
86+
}
87+
8588
@Override
8689
public CompletionStage<Void> flush(DriverResponseHandler handler) {
87-
return delegate.onLoop()
88-
.thenCompose(connection -> executor.execute(causeOfTermination -> {
89-
if (causeOfTermination == null) {
90-
log.trace("This connection is active, will flush");
91-
var terminationAwareResponseHandler =
92-
new TerminationAwareResponseHandler(logging, handler, executor, throwableConsumer);
93-
return delegate.flush(terminationAwareResponseHandler).handle((ignored, flushThrowable) -> {
94-
flushThrowable = Futures.completionExceptionCause(flushThrowable);
95-
if (flushThrowable != null) {
96-
if (log.isTraceEnabled()) {
97-
log.error("The flush has failed", flushThrowable);
98-
}
99-
var flushThrowableRef = flushThrowable;
100-
flushThrowable = executor.execute(existingThrowable -> {
101-
if (existingThrowable != null) {
102-
log.trace(
103-
"The flush has failed, but there is an existing %s", existingThrowable);
104-
return existingThrowable;
105-
} else {
106-
throwableConsumer.accept(flushThrowableRef);
107-
return flushThrowableRef;
108-
}
109-
});
110-
// rethrow
111-
if (flushThrowable instanceof RuntimeException runtimeException) {
112-
throw runtimeException;
113-
} else {
114-
throw new CompletionException(flushThrowable);
115-
}
116-
} else {
117-
return ignored;
118-
}
119-
});
90+
return delegate.onLoop(() -> executor.execute(causeOfTermination -> flushBolt(causeOfTermination, handler)))
91+
.thenCompose(Function.identity());
92+
}
93+
94+
private CompletionStage<Void> flushBolt(Throwable causeOfTermination, DriverResponseHandler handler) {
95+
if (causeOfTermination == null) {
96+
log.trace("This connection is active, will flush");
97+
var terminationAwareResponseHandler =
98+
new TerminationAwareResponseHandler(logging, handler, executor, throwableConsumer);
99+
return delegate.flush(terminationAwareResponseHandler).handle((ignored, flushThrowable) -> {
100+
flushThrowable = Futures.completionExceptionCause(flushThrowable);
101+
if (flushThrowable != null) {
102+
if (log.isTraceEnabled()) {
103+
log.error("The flush has failed", flushThrowable);
104+
}
105+
var flushThrowableRef = flushThrowable;
106+
flushThrowable = executor.execute(existingThrowable -> {
107+
if (existingThrowable != null) {
108+
log.trace("The flush has failed, but there is an existing %s", existingThrowable);
109+
return existingThrowable;
110+
} else {
111+
throwableConsumer.accept(flushThrowableRef);
112+
return flushThrowableRef;
113+
}
114+
});
115+
// rethrow
116+
if (flushThrowable instanceof RuntimeException runtimeException) {
117+
throw runtimeException;
120118
} else {
121-
// there is an existing error
122-
return connection
123-
.clear()
124-
.thenCompose(ignored -> CompletableFuture.failedStage(causeOfTermination));
119+
throw new CompletionException(flushThrowable);
125120
}
126-
}));
121+
} else {
122+
return ignored;
123+
}
124+
});
125+
} else {
126+
// there is an existing error
127+
return delegate.clear().thenCompose(ignored -> CompletableFuture.failedStage(causeOfTermination));
128+
}
127129
}
128130
}

driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ private enum State {
106106
private final ResultCursorsHolder resultCursors;
107107
private final long fetchSize;
108108
private final Lock lock = new ReentrantLock();
109+
private final Lock connectionLock = new ReentrantLock();
109110
private State state = State.ACTIVE;
110111
private CompletableFuture<Void> commitFuture;
111112
private CompletableFuture<Void> rollbackFuture;
@@ -257,9 +258,17 @@ public CompletionStage<ResultCursor> runAsync(Query query) {
257258
public CompletionStage<RxResultCursor> runRx(Query query) {
258259
ensureCanRunQueries();
259260
var parameters = query.parameters().asMap(Values::value);
260-
var responseHandler = new RunRxResponseHandler(logging, apiTelemetryWork, beginFuture, connection, query);
261-
var flushStage =
262-
connection.run(query.text(), parameters).thenCompose(ignored2 -> connection.flush(responseHandler));
261+
var responseHandler =
262+
new RunRxResponseHandler(logging, apiTelemetryWork, beginFuture, connection, connectionLock, query);
263+
var flushStage = connection
264+
.onLoop(() -> {
265+
connectionLock.lock();
266+
return connection
267+
.run(query.text(), parameters)
268+
.thenCompose(conn -> conn.flush(responseHandler))
269+
.whenComplete((ignored, throwable) -> connectionLock.unlock());
270+
})
271+
.thenCompose(Function.identity());
263272
return beginFuture.thenCompose(ignored -> {
264273
var cursorStage = flushStage.thenCompose(flushResult -> responseHandler.cursorFuture);
265274
resultCursors.add(cursorStage);
@@ -670,6 +679,7 @@ private static class RunRxResponseHandler implements DriverResponseHandler {
670679
private final ApiTelemetryWork apiTelemetryWork;
671680
private final CompletableFuture<UnmanagedTransaction> beginFuture;
672681
private final DriverBoltConnection connection;
682+
private final Lock connectionLock;
673683
private final Query query;
674684
private Throwable error;
675685
private RunSummary runSummary;
@@ -680,11 +690,13 @@ private RunRxResponseHandler(
680690
ApiTelemetryWork apiTelemetryWork,
681691
CompletableFuture<UnmanagedTransaction> beginFuture,
682692
DriverBoltConnection connection,
693+
Lock connectionLock,
683694
Query query) {
684695
this.logging = logging;
685696
this.apiTelemetryWork = apiTelemetryWork;
686697
this.beginFuture = beginFuture;
687698
this.connection = connection;
699+
this.connectionLock = connectionLock;
688700
this.query = query;
689701
}
690702

@@ -720,13 +732,13 @@ public void onIgnored() {
720732
public void onComplete() {
721733
if (error != null) {
722734
if (!beginFuture.completeExceptionally(error)) {
723-
cursorFuture.complete(
724-
new RxResultCursorImpl(connection, query, null, error, bookmark -> {}, false, logging));
735+
cursorFuture.complete(new RxResultCursorImpl(
736+
connection, connectionLock, query, null, error, bookmark -> {}, false, logging));
725737
}
726738
} else {
727739
if (runSummary != null) {
728740
cursorFuture.complete(new RxResultCursorImpl(
729-
connection, query, runSummary, null, bookmark -> {}, false, logging));
741+
connection, connectionLock, query, runSummary, null, bookmark -> {}, false, logging));
730742
} else {
731743
var message =
732744
ignoredCount > 0 ? "Run exchange contains ignored messages" : "Unexpected state during run";

driver/src/main/java/org/neo4j/driver/internal/boltlistener/ListeningBoltConnection.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Optional;
2323
import java.util.Set;
2424
import java.util.concurrent.CompletionStage;
25+
import java.util.function.Supplier;
2526
import org.neo4j.bolt.connection.AccessMode;
2627
import org.neo4j.bolt.connection.AuthInfo;
2728
import org.neo4j.bolt.connection.AuthToken;
@@ -46,8 +47,8 @@ public ListeningBoltConnection(BoltConnection delegate, BoltConnectionListener b
4647
}
4748

4849
@Override
49-
public CompletionStage<BoltConnection> onLoop() {
50-
return delegate.onLoop().thenApply(ignored -> this);
50+
public <T> CompletionStage<T> onLoop(Supplier<T> supplier) {
51+
return delegate.onLoop(supplier);
5152
}
5253

5354
@Override

0 commit comments

Comments
 (0)