Skip to content

Commit 6f8af77

Browse files
authored
Add support for cancellation on reactive session run (#1457)
This update brings support for cancelling subscription on reactive session `run` methods, like: - `org.neo4j.driver.reactive.ReactiveSession.run(..)` - `org.neo4j.driver.reactivestreams.ReactiveSession.run(..)` If subscription is cancelled before an instance of the `ReactiveResult` is published, the driver will rollback the initiated query by sending the Bolt `RESET` message and will release the network connection back to its connection pool. This will be done in the background. Once the `ReactiveResult` is published, the cancellation with the rollback is no longer possible. In addition, this update makes the driver wait until subscriber subscribes to the publisher before dispatching the query to the server. Previously, the driver would do that immediately upon calling the `run` method.
1 parent e2c60a1 commit 6f8af77

16 files changed

+346
-78
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,10 +122,13 @@ public CompletionStage<ResultCursor> runAsync(Query query, TransactionConfig con
122122
.thenApply(cursor -> cursor); // convert the return type
123123
}
124124

125-
public CompletionStage<RxResultCursor> runRx(Query query, TransactionConfig config) {
125+
public CompletionStage<RxResultCursor> runRx(
126+
Query query, TransactionConfig config, CompletionStage<RxResultCursor> cursorPublishStage) {
126127
var newResultCursorStage = buildResultCursorFactory(query, config).thenCompose(ResultCursorFactory::rxResult);
127128

128-
resultCursorStage = newResultCursorStage.exceptionally(error -> null);
129+
resultCursorStage = newResultCursorStage
130+
.thenCompose(cursor -> cursor == null ? CompletableFuture.completedFuture(null) : cursorPublishStage)
131+
.exceptionally(throwable -> null);
129132
return newResultCursorStage;
130133
}
131134

driver/src/main/java/org/neo4j/driver/internal/cursor/ResultCursorFactoryImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ public CompletionStage<AsyncResultCursor> asyncResult() {
7474
@Override
7575
public CompletionStage<RxResultCursor> rxResult() {
7676
connection.writeAndFlush(runMessage, runHandler);
77-
return runFuture.handle((ignored, error) -> new RxResultCursorImpl(error, runHandler, pullHandler));
77+
return runFuture.handle(
78+
(ignored, error) -> new RxResultCursorImpl(error, runHandler, pullHandler, connection::release));
7879
}
7980
}

driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursor.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,13 @@ public interface RxResultCursor extends Subscription, FailableCursor {
3636
boolean isDone();
3737

3838
Throwable getRunError();
39+
40+
/**
41+
* Rolls back this instance by releasing connection with RESET.
42+
* <p>
43+
* This must never be called on a published instance.
44+
* @return reset completion stage
45+
* @since 5.11
46+
*/
47+
CompletionStage<Void> rollback();
3948
}

driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursorImpl.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.concurrent.CompletableFuture;
2929
import java.util.concurrent.CompletionStage;
3030
import java.util.function.BiConsumer;
31+
import java.util.function.Supplier;
3132
import org.neo4j.driver.Record;
3233
import org.neo4j.driver.exceptions.TransactionNestingException;
3334
import org.neo4j.driver.internal.handlers.RunResponseHandler;
@@ -41,23 +42,30 @@ public class RxResultCursorImpl implements RxResultCursor {
4142
private final RunResponseHandler runHandler;
4243
private final PullResponseHandler pullHandler;
4344
private final Throwable runResponseError;
45+
private final Supplier<CompletionStage<Void>> connectionReleaseSupplier;
4446
private boolean runErrorSurfaced;
4547
private final CompletableFuture<ResultSummary> summaryFuture = new CompletableFuture<>();
4648
private boolean summaryFutureExposed;
4749
private boolean resultConsumed;
4850
private RecordConsumerStatus consumerStatus = NOT_INSTALLED;
4951

52+
// for testing only
5053
public RxResultCursorImpl(RunResponseHandler runHandler, PullResponseHandler pullHandler) {
51-
this(null, runHandler, pullHandler);
54+
this(null, runHandler, pullHandler, () -> CompletableFuture.completedFuture(null));
5255
}
5356

54-
public RxResultCursorImpl(Throwable runError, RunResponseHandler runHandler, PullResponseHandler pullHandler) {
57+
public RxResultCursorImpl(
58+
Throwable runError,
59+
RunResponseHandler runHandler,
60+
PullResponseHandler pullHandler,
61+
Supplier<CompletionStage<Void>> connectionReleaseSupplier) {
5562
Objects.requireNonNull(runHandler);
5663
Objects.requireNonNull(pullHandler);
5764

5865
this.runResponseError = runError;
5966
this.runHandler = runHandler;
6067
this.pullHandler = pullHandler;
68+
this.connectionReleaseSupplier = connectionReleaseSupplier;
6169
installSummaryConsumer();
6270
}
6371

@@ -130,6 +138,12 @@ public Throwable getRunError() {
130138
return runResponseError;
131139
}
132140

141+
@Override
142+
public CompletionStage<Void> rollback() {
143+
summaryFuture.complete(null);
144+
return connectionReleaseSupplier.get();
145+
}
146+
133147
public CompletionStage<ResultSummary> summaryStage() {
134148
if (!isDone() && !resultConsumed) // the summary is called before record streaming
135149
{

driver/src/main/java/org/neo4j/driver/internal/reactive/AbstractReactiveSession.java

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,19 @@
2323

2424
import java.util.Set;
2525
import java.util.concurrent.CompletableFuture;
26+
import java.util.concurrent.CompletionException;
27+
import java.util.concurrent.CompletionStage;
28+
import java.util.concurrent.atomic.AtomicReference;
2629
import java.util.function.Function;
2730
import org.neo4j.driver.AccessMode;
2831
import org.neo4j.driver.Bookmark;
32+
import org.neo4j.driver.Query;
2933
import org.neo4j.driver.TransactionConfig;
3034
import org.neo4j.driver.exceptions.ClientException;
3135
import org.neo4j.driver.exceptions.TransactionNestingException;
3236
import org.neo4j.driver.internal.async.NetworkSession;
3337
import org.neo4j.driver.internal.async.UnmanagedTransaction;
38+
import org.neo4j.driver.internal.cursor.RxResultCursor;
3439
import org.neo4j.driver.internal.util.Futures;
3540
import org.neo4j.driver.reactive.RxResult;
3641
import org.neo4j.driver.reactivestreams.ReactiveResult;
@@ -142,6 +147,73 @@ public Set<Bookmark> lastBookmarks() {
142147
return session.lastBookmarks();
143148
}
144149

150+
protected <T> Publisher<T> run(Query query, TransactionConfig config, Function<RxResultCursor, T> cursorToResult) {
151+
var cursorPublishFuture = new CompletableFuture<RxResultCursor>();
152+
var cursorReference = new AtomicReference<RxResultCursor>();
153+
154+
return createSingleItemPublisher(
155+
() -> runAsStage(query, config, cursorPublishFuture)
156+
.thenApply(cursor -> {
157+
cursorReference.set(cursor);
158+
return cursor;
159+
})
160+
.thenApply(cursorToResult),
161+
() -> new IllegalStateException(
162+
"Unexpected condition, run call has completed successfully with result being null"),
163+
value -> {
164+
if (value != null) {
165+
cursorReference.get().rollback().whenComplete((unused, throwable) -> {
166+
if (throwable != null) {
167+
cursorPublishFuture.completeExceptionally(throwable);
168+
} else {
169+
cursorPublishFuture.complete(null);
170+
}
171+
});
172+
}
173+
})
174+
.doOnNext(value -> cursorPublishFuture.complete(cursorReference.get()))
175+
.doOnError(cursorPublishFuture::completeExceptionally);
176+
}
177+
178+
private CompletionStage<RxResultCursor> runAsStage(
179+
Query query, TransactionConfig config, CompletionStage<RxResultCursor> finalStage) {
180+
CompletionStage<RxResultCursor> cursorStage;
181+
try {
182+
cursorStage = session.runRx(query, config, finalStage);
183+
} catch (Throwable t) {
184+
cursorStage = Futures.failedFuture(t);
185+
}
186+
187+
return cursorStage
188+
.handle((cursor, throwable) -> {
189+
if (throwable != null) {
190+
return this.<RxResultCursor>releaseConnectionAndRethrow(throwable);
191+
} else {
192+
var runError = cursor.getRunError();
193+
if (runError != null) {
194+
return this.<RxResultCursor>releaseConnectionAndRethrow(runError);
195+
} else {
196+
return CompletableFuture.completedFuture(cursor);
197+
}
198+
}
199+
})
200+
.thenCompose(stage -> stage);
201+
}
202+
203+
private <T> CompletionStage<T> releaseConnectionAndRethrow(Throwable throwable) {
204+
return session.releaseConnectionAsync().handle((ignored, releaseThrowable) -> {
205+
if (releaseThrowable != null) {
206+
throw Futures.combineErrors(throwable, releaseThrowable);
207+
} else {
208+
if (throwable instanceof RuntimeException e) {
209+
throw e;
210+
} else {
211+
throw new CompletionException(throwable);
212+
}
213+
}
214+
});
215+
}
216+
145217
protected <T> Publisher<T> doClose() {
146218
return createEmptyPublisher(session::closeAsync);
147219
}

driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveSession.java

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -23,21 +23,17 @@
2323

2424
import java.util.HashSet;
2525
import java.util.Set;
26-
import java.util.concurrent.CompletionStage;
2726
import java.util.concurrent.Flow.Publisher;
2827
import org.neo4j.driver.AccessMode;
2928
import org.neo4j.driver.Bookmark;
3029
import org.neo4j.driver.Query;
3130
import org.neo4j.driver.TransactionConfig;
3231
import org.neo4j.driver.internal.async.NetworkSession;
3332
import org.neo4j.driver.internal.async.UnmanagedTransaction;
34-
import org.neo4j.driver.internal.cursor.RxResultCursor;
35-
import org.neo4j.driver.internal.util.Futures;
3633
import org.neo4j.driver.reactive.ReactiveResult;
3734
import org.neo4j.driver.reactive.ReactiveSession;
3835
import org.neo4j.driver.reactive.ReactiveTransaction;
3936
import org.neo4j.driver.reactive.ReactiveTransactionCallback;
40-
import reactor.core.publisher.Mono;
4137

4238
public class InternalReactiveSession extends AbstractReactiveSession<ReactiveTransaction>
4339
implements ReactiveSession, BaseReactiveQueryRunner {
@@ -89,30 +85,7 @@ public Publisher<ReactiveResult> run(Query query) {
8985

9086
@Override
9187
public Publisher<ReactiveResult> run(Query query, TransactionConfig config) {
92-
CompletionStage<RxResultCursor> cursorStage;
93-
try {
94-
cursorStage = session.runRx(query, config);
95-
} catch (Throwable t) {
96-
cursorStage = Futures.failedFuture(t);
97-
}
98-
99-
return publisherToFlowPublisher(Mono.fromCompletionStage(cursorStage)
100-
.onErrorResume(error -> Mono.fromCompletionStage(session.releaseConnectionAsync())
101-
.onErrorMap(releaseError -> Futures.combineErrors(error, releaseError))
102-
.then(Mono.error(error)))
103-
.flatMap(cursor -> {
104-
Mono<RxResultCursor> publisher;
105-
var runError = cursor.getRunError();
106-
if (runError != null) {
107-
publisher = Mono.fromCompletionStage(session.releaseConnectionAsync())
108-
.onErrorMap(releaseError -> Futures.combineErrors(runError, releaseError))
109-
.then(Mono.error(runError));
110-
} else {
111-
publisher = Mono.just(cursor);
112-
}
113-
return publisher;
114-
})
115-
.map(InternalReactiveResult::new));
88+
return publisherToFlowPublisher(run(query, config, InternalReactiveResult::new));
11689
}
11790

11891
@Override

driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public RxResult run(Query query) {
9696
public RxResult run(Query query, TransactionConfig config) {
9797
return new InternalRxResult(() -> {
9898
var resultCursorFuture = new CompletableFuture<RxResultCursor>();
99-
session.runRx(query, config).whenComplete((cursor, completionError) -> {
99+
session.runRx(query, config, resultCursorFuture).whenComplete((cursor, completionError) -> {
100100
if (cursor != null) {
101101
resultCursorFuture.complete(cursor);
102102
} else {

driver/src/main/java/org/neo4j/driver/internal/reactive/RxUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public static <T> Publisher<T> createEmptyPublisher(Supplier<CompletionStage<Voi
5656
* @param <T> the type of the item to publish.
5757
* @return A publisher that succeeds exactly one item or fails with an error.
5858
*/
59-
public static <T> Publisher<T> createSingleItemPublisher(
59+
public static <T> Mono<T> createSingleItemPublisher(
6060
Supplier<CompletionStage<T>> supplier,
6161
Supplier<Throwable> nullResultThrowableSupplier,
6262
Consumer<T> cancellationHandler) {

driver/src/main/java/org/neo4j/driver/internal/reactivestreams/InternalReactiveSession.java

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,18 @@
2020

2121
import java.util.HashSet;
2222
import java.util.Set;
23-
import java.util.concurrent.CompletionStage;
2423
import org.neo4j.driver.AccessMode;
2524
import org.neo4j.driver.Bookmark;
2625
import org.neo4j.driver.Query;
2726
import org.neo4j.driver.TransactionConfig;
2827
import org.neo4j.driver.internal.async.NetworkSession;
2928
import org.neo4j.driver.internal.async.UnmanagedTransaction;
30-
import org.neo4j.driver.internal.cursor.RxResultCursor;
3129
import org.neo4j.driver.internal.reactive.AbstractReactiveSession;
32-
import org.neo4j.driver.internal.util.Futures;
3330
import org.neo4j.driver.reactivestreams.ReactiveResult;
3431
import org.neo4j.driver.reactivestreams.ReactiveSession;
3532
import org.neo4j.driver.reactivestreams.ReactiveTransaction;
3633
import org.neo4j.driver.reactivestreams.ReactiveTransactionCallback;
3734
import org.reactivestreams.Publisher;
38-
import reactor.core.publisher.Mono;
3935

4036
public class InternalReactiveSession extends AbstractReactiveSession<ReactiveTransaction>
4137
implements ReactiveSession, BaseReactiveQueryRunner {
@@ -83,30 +79,7 @@ public Publisher<ReactiveResult> run(Query query) {
8379

8480
@Override
8581
public Publisher<ReactiveResult> run(Query query, TransactionConfig config) {
86-
CompletionStage<RxResultCursor> cursorStage;
87-
try {
88-
cursorStage = session.runRx(query, config);
89-
} catch (Throwable t) {
90-
cursorStage = Futures.failedFuture(t);
91-
}
92-
93-
return Mono.fromCompletionStage(cursorStage)
94-
.onErrorResume(error -> Mono.fromCompletionStage(session.releaseConnectionAsync())
95-
.onErrorMap(releaseError -> Futures.combineErrors(error, releaseError))
96-
.then(Mono.error(error)))
97-
.flatMap(cursor -> {
98-
Mono<RxResultCursor> publisher;
99-
var runError = cursor.getRunError();
100-
if (runError != null) {
101-
publisher = Mono.fromCompletionStage(session.releaseConnectionAsync())
102-
.onErrorMap(releaseError -> Futures.combineErrors(runError, releaseError))
103-
.then(Mono.error(runError));
104-
} else {
105-
publisher = Mono.just(cursor);
106-
}
107-
return publisher;
108-
})
109-
.map(InternalReactiveResult::new);
82+
return run(query, config, InternalReactiveResult::new);
11083
}
11184

11285
@Override

0 commit comments

Comments
 (0)