Skip to content

Commit 791f79f

Browse files
committed
Add support for cancellation on reactive session run
This update brings support for cancelling subscription on reactive session `run` methods, like: - `org.neo4j.driver.reactive.ReactiveSession.run(..)` - `org.neo4j.driver.reactivestreams.ReactiveSession.run(..)` If subscription is cancelled before an instance of the `ReactiveResult` is published, the driver will rollback the initiated query by sending the Bolt `RESET` message and will release the network connection back to its connection pool. This will be done in the background. Once the `ReactiveResult` is published, the cancellation with the rollback is no longer possible. In addition, this update makes the driver wait until subscriber subscribes to the publisher before dispatching the query to the server. Previously, the driver would do that immediately upon calling the `run` method.
1 parent b32717a commit 791f79f

File tree

2 files changed

+21
-81
lines changed

2 files changed

+21
-81
lines changed

driver/src/main/java/org/neo4j/driver/internal/reactive/AbstractReactiveSession.java

Lines changed: 20 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import static org.neo4j.driver.internal.reactive.RxUtils.createEmptyPublisher;
2222
import static org.neo4j.driver.internal.reactive.RxUtils.createSingleItemPublisher;
2323

24-
import java.util.Optional;
2524
import java.util.Set;
2625
import java.util.concurrent.CompletableFuture;
2726
import java.util.concurrent.CompletionException;
@@ -151,65 +150,27 @@ public Set<Bookmark> lastBookmarks() {
151150
protected <T> Publisher<T> run(Query query, TransactionConfig config, Function<RxResultCursor, T> cursorToResult) {
152151
var cursorPublishFuture = new CompletableFuture<RxResultCursor>();
153152
var cursorReference = new AtomicReference<RxResultCursor>();
154-
return Mono.<T>create(sink -> {
155-
var state = new SinkState<T>();
156-
sink.onRequest(ignored -> {
157-
CompletionStage<T> stage;
158-
synchronized (state) {
159-
if (state.isCancelled()) {
160-
return;
161-
}
162-
if (state.getStage() != null) {
163-
return;
164-
}
165-
stage = runAsStage(query, config, cursorPublishFuture)
166-
.thenApply(cursor -> {
167-
cursorReference.set(cursor);
168-
return cursor;
169-
})
170-
.thenApply(cursorToResult);
171-
state.setStage(stage);
172-
}
173-
stage.whenComplete((item, completionError) -> {
174-
if (completionError == null) {
175-
if (item != null) {
176-
sink.success(item);
177-
} else {
178-
sink.error(
179-
new IllegalStateException(
180-
"Unexpected condition, run call has completed successfully with result being null"));
181-
}
182-
} else {
183-
var error = Optional.ofNullable(Futures.completionExceptionCause(completionError))
184-
.orElse(completionError);
185-
sink.error(error);
186-
}
187-
});
188-
});
189-
sink.onCancel(() -> {
190-
CompletionStage<T> stage;
191-
synchronized (state) {
192-
if (state.isCancelled()) {
193-
return;
153+
154+
return createSingleItemPublisher(
155+
() -> runAsStage(query, config, cursorPublishFuture)
156+
.thenApply(cursor -> {
157+
cursorReference.set(cursor);
158+
return cursor;
159+
})
160+
.thenApply(cursorToResult),
161+
() -> new IllegalStateException(
162+
"Unexpected condition, run call has completed successfully with result being null"),
163+
value -> {
164+
if (value != null) {
165+
cursorReference.get().rollback().whenComplete((unused, throwable) -> {
166+
if (throwable != null) {
167+
cursorPublishFuture.completeExceptionally(throwable);
168+
} else {
169+
cursorPublishFuture.complete(null);
170+
}
171+
});
194172
}
195-
state.setCancelled(true);
196-
stage = state.getStage();
197-
}
198-
if (stage != null) {
199-
stage.whenComplete((value, ignored) -> {
200-
if (value != null) {
201-
cursorReference.get().rollback().whenComplete((unused, throwable) -> {
202-
if (throwable != null) {
203-
cursorPublishFuture.completeExceptionally(throwable);
204-
} else {
205-
cursorPublishFuture.complete(null);
206-
}
207-
});
208-
}
209-
});
210-
}
211-
});
212-
})
173+
})
213174
.doOnNext(value -> cursorPublishFuture.complete(cursorReference.get()))
214175
.doOnError(cursorPublishFuture::completeExceptionally);
215176
}
@@ -256,25 +217,4 @@ private <T> CompletionStage<T> releaseConnectionAndRethrow(Throwable throwable)
256217
protected <T> Publisher<T> doClose() {
257218
return createEmptyPublisher(session::closeAsync);
258219
}
259-
260-
private static class SinkState<T> {
261-
private CompletionStage<T> stage;
262-
private boolean cancelled;
263-
264-
public CompletionStage<T> getStage() {
265-
return stage;
266-
}
267-
268-
public void setStage(CompletionStage<T> stage) {
269-
this.stage = stage;
270-
}
271-
272-
public boolean isCancelled() {
273-
return cancelled;
274-
}
275-
276-
public void setCancelled(boolean cancelled) {
277-
this.cancelled = cancelled;
278-
}
279-
}
280220
}

driver/src/main/java/org/neo4j/driver/internal/reactive/RxUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public static <T> Publisher<T> createEmptyPublisher(Supplier<CompletionStage<Voi
5656
* @param <T> the type of the item to publish.
5757
* @return A publisher that succeeds exactly one item or fails with an error.
5858
*/
59-
public static <T> Publisher<T> createSingleItemPublisher(
59+
public static <T> Mono<T> createSingleItemPublisher(
6060
Supplier<CompletionStage<T>> supplier,
6161
Supplier<Throwable> nullResultThrowableSupplier,
6262
Consumer<T> cancellationHandler) {

0 commit comments

Comments
 (0)