Skip to content

Commit b32717a

Browse files
committed
Initial impl
1 parent e2c60a1 commit b32717a

File tree

15 files changed

+405
-77
lines changed

15 files changed

+405
-77
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,10 +122,13 @@ public CompletionStage<ResultCursor> runAsync(Query query, TransactionConfig con
122122
.thenApply(cursor -> cursor); // convert the return type
123123
}
124124

125-
public CompletionStage<RxResultCursor> runRx(Query query, TransactionConfig config) {
125+
public CompletionStage<RxResultCursor> runRx(
126+
Query query, TransactionConfig config, CompletionStage<RxResultCursor> cursorPublishStage) {
126127
var newResultCursorStage = buildResultCursorFactory(query, config).thenCompose(ResultCursorFactory::rxResult);
127128

128-
resultCursorStage = newResultCursorStage.exceptionally(error -> null);
129+
resultCursorStage = newResultCursorStage
130+
.thenCompose(cursor -> cursor == null ? CompletableFuture.completedFuture(null) : cursorPublishStage)
131+
.exceptionally(throwable -> null);
129132
return newResultCursorStage;
130133
}
131134

driver/src/main/java/org/neo4j/driver/internal/cursor/ResultCursorFactoryImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ public CompletionStage<AsyncResultCursor> asyncResult() {
7474
@Override
7575
public CompletionStage<RxResultCursor> rxResult() {
7676
connection.writeAndFlush(runMessage, runHandler);
77-
return runFuture.handle((ignored, error) -> new RxResultCursorImpl(error, runHandler, pullHandler));
77+
return runFuture.handle(
78+
(ignored, error) -> new RxResultCursorImpl(error, runHandler, pullHandler, connection::release));
7879
}
7980
}

driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursor.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,13 @@ public interface RxResultCursor extends Subscription, FailableCursor {
3636
boolean isDone();
3737

3838
Throwable getRunError();
39+
40+
/**
41+
* Rolls back this instance by releasing connection with RESET.
42+
* <p>
43+
* This must never be called on a published instance.
44+
* @return reset completion stage
45+
* @since 5.11
46+
*/
47+
CompletionStage<Void> rollback();
3948
}

driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursorImpl.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.concurrent.CompletableFuture;
2929
import java.util.concurrent.CompletionStage;
3030
import java.util.function.BiConsumer;
31+
import java.util.function.Supplier;
3132
import org.neo4j.driver.Record;
3233
import org.neo4j.driver.exceptions.TransactionNestingException;
3334
import org.neo4j.driver.internal.handlers.RunResponseHandler;
@@ -41,23 +42,30 @@ public class RxResultCursorImpl implements RxResultCursor {
4142
private final RunResponseHandler runHandler;
4243
private final PullResponseHandler pullHandler;
4344
private final Throwable runResponseError;
45+
private final Supplier<CompletionStage<Void>> connectionReleaseSupplier;
4446
private boolean runErrorSurfaced;
4547
private final CompletableFuture<ResultSummary> summaryFuture = new CompletableFuture<>();
4648
private boolean summaryFutureExposed;
4749
private boolean resultConsumed;
4850
private RecordConsumerStatus consumerStatus = NOT_INSTALLED;
4951

52+
// for testing only
5053
public RxResultCursorImpl(RunResponseHandler runHandler, PullResponseHandler pullHandler) {
51-
this(null, runHandler, pullHandler);
54+
this(null, runHandler, pullHandler, () -> CompletableFuture.completedFuture(null));
5255
}
5356

54-
public RxResultCursorImpl(Throwable runError, RunResponseHandler runHandler, PullResponseHandler pullHandler) {
57+
public RxResultCursorImpl(
58+
Throwable runError,
59+
RunResponseHandler runHandler,
60+
PullResponseHandler pullHandler,
61+
Supplier<CompletionStage<Void>> connectionReleaseSupplier) {
5562
Objects.requireNonNull(runHandler);
5663
Objects.requireNonNull(pullHandler);
5764

5865
this.runResponseError = runError;
5966
this.runHandler = runHandler;
6067
this.pullHandler = pullHandler;
68+
this.connectionReleaseSupplier = connectionReleaseSupplier;
6169
installSummaryConsumer();
6270
}
6371

@@ -130,6 +138,12 @@ public Throwable getRunError() {
130138
return runResponseError;
131139
}
132140

141+
@Override
142+
public CompletionStage<Void> rollback() {
143+
summaryFuture.complete(null);
144+
return connectionReleaseSupplier.get();
145+
}
146+
133147
public CompletionStage<ResultSummary> summaryStage() {
134148
if (!isDone() && !resultConsumed) // the summary is called before record streaming
135149
{

driver/src/main/java/org/neo4j/driver/internal/reactive/AbstractReactiveSession.java

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,22 @@
2121
import static org.neo4j.driver.internal.reactive.RxUtils.createEmptyPublisher;
2222
import static org.neo4j.driver.internal.reactive.RxUtils.createSingleItemPublisher;
2323

24+
import java.util.Optional;
2425
import java.util.Set;
2526
import java.util.concurrent.CompletableFuture;
27+
import java.util.concurrent.CompletionException;
28+
import java.util.concurrent.CompletionStage;
29+
import java.util.concurrent.atomic.AtomicReference;
2630
import java.util.function.Function;
2731
import org.neo4j.driver.AccessMode;
2832
import org.neo4j.driver.Bookmark;
33+
import org.neo4j.driver.Query;
2934
import org.neo4j.driver.TransactionConfig;
3035
import org.neo4j.driver.exceptions.ClientException;
3136
import org.neo4j.driver.exceptions.TransactionNestingException;
3237
import org.neo4j.driver.internal.async.NetworkSession;
3338
import org.neo4j.driver.internal.async.UnmanagedTransaction;
39+
import org.neo4j.driver.internal.cursor.RxResultCursor;
3440
import org.neo4j.driver.internal.util.Futures;
3541
import org.neo4j.driver.reactive.RxResult;
3642
import org.neo4j.driver.reactivestreams.ReactiveResult;
@@ -142,7 +148,133 @@ public Set<Bookmark> lastBookmarks() {
142148
return session.lastBookmarks();
143149
}
144150

151+
protected <T> Publisher<T> run(Query query, TransactionConfig config, Function<RxResultCursor, T> cursorToResult) {
152+
var cursorPublishFuture = new CompletableFuture<RxResultCursor>();
153+
var cursorReference = new AtomicReference<RxResultCursor>();
154+
return Mono.<T>create(sink -> {
155+
var state = new SinkState<T>();
156+
sink.onRequest(ignored -> {
157+
CompletionStage<T> stage;
158+
synchronized (state) {
159+
if (state.isCancelled()) {
160+
return;
161+
}
162+
if (state.getStage() != null) {
163+
return;
164+
}
165+
stage = runAsStage(query, config, cursorPublishFuture)
166+
.thenApply(cursor -> {
167+
cursorReference.set(cursor);
168+
return cursor;
169+
})
170+
.thenApply(cursorToResult);
171+
state.setStage(stage);
172+
}
173+
stage.whenComplete((item, completionError) -> {
174+
if (completionError == null) {
175+
if (item != null) {
176+
sink.success(item);
177+
} else {
178+
sink.error(
179+
new IllegalStateException(
180+
"Unexpected condition, run call has completed successfully with result being null"));
181+
}
182+
} else {
183+
var error = Optional.ofNullable(Futures.completionExceptionCause(completionError))
184+
.orElse(completionError);
185+
sink.error(error);
186+
}
187+
});
188+
});
189+
sink.onCancel(() -> {
190+
CompletionStage<T> stage;
191+
synchronized (state) {
192+
if (state.isCancelled()) {
193+
return;
194+
}
195+
state.setCancelled(true);
196+
stage = state.getStage();
197+
}
198+
if (stage != null) {
199+
stage.whenComplete((value, ignored) -> {
200+
if (value != null) {
201+
cursorReference.get().rollback().whenComplete((unused, throwable) -> {
202+
if (throwable != null) {
203+
cursorPublishFuture.completeExceptionally(throwable);
204+
} else {
205+
cursorPublishFuture.complete(null);
206+
}
207+
});
208+
}
209+
});
210+
}
211+
});
212+
})
213+
.doOnNext(value -> cursorPublishFuture.complete(cursorReference.get()))
214+
.doOnError(cursorPublishFuture::completeExceptionally);
215+
}
216+
217+
private CompletionStage<RxResultCursor> runAsStage(
218+
Query query, TransactionConfig config, CompletionStage<RxResultCursor> finalStage) {
219+
CompletionStage<RxResultCursor> cursorStage;
220+
try {
221+
cursorStage = session.runRx(query, config, finalStage);
222+
} catch (Throwable t) {
223+
cursorStage = Futures.failedFuture(t);
224+
}
225+
226+
return cursorStage
227+
.handle((cursor, throwable) -> {
228+
if (throwable != null) {
229+
return this.<RxResultCursor>releaseConnectionAndRethrow(throwable);
230+
} else {
231+
var runError = cursor.getRunError();
232+
if (runError != null) {
233+
return this.<RxResultCursor>releaseConnectionAndRethrow(runError);
234+
} else {
235+
return CompletableFuture.completedFuture(cursor);
236+
}
237+
}
238+
})
239+
.thenCompose(stage -> stage);
240+
}
241+
242+
private <T> CompletionStage<T> releaseConnectionAndRethrow(Throwable throwable) {
243+
return session.releaseConnectionAsync().handle((ignored, releaseThrowable) -> {
244+
if (releaseThrowable != null) {
245+
throw Futures.combineErrors(throwable, releaseThrowable);
246+
} else {
247+
if (throwable instanceof RuntimeException e) {
248+
throw e;
249+
} else {
250+
throw new CompletionException(throwable);
251+
}
252+
}
253+
});
254+
}
255+
145256
protected <T> Publisher<T> doClose() {
146257
return createEmptyPublisher(session::closeAsync);
147258
}
259+
260+
private static class SinkState<T> {
261+
private CompletionStage<T> stage;
262+
private boolean cancelled;
263+
264+
public CompletionStage<T> getStage() {
265+
return stage;
266+
}
267+
268+
public void setStage(CompletionStage<T> stage) {
269+
this.stage = stage;
270+
}
271+
272+
public boolean isCancelled() {
273+
return cancelled;
274+
}
275+
276+
public void setCancelled(boolean cancelled) {
277+
this.cancelled = cancelled;
278+
}
279+
}
148280
}

driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveSession.java

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -23,21 +23,17 @@
2323

2424
import java.util.HashSet;
2525
import java.util.Set;
26-
import java.util.concurrent.CompletionStage;
2726
import java.util.concurrent.Flow.Publisher;
2827
import org.neo4j.driver.AccessMode;
2928
import org.neo4j.driver.Bookmark;
3029
import org.neo4j.driver.Query;
3130
import org.neo4j.driver.TransactionConfig;
3231
import org.neo4j.driver.internal.async.NetworkSession;
3332
import org.neo4j.driver.internal.async.UnmanagedTransaction;
34-
import org.neo4j.driver.internal.cursor.RxResultCursor;
35-
import org.neo4j.driver.internal.util.Futures;
3633
import org.neo4j.driver.reactive.ReactiveResult;
3734
import org.neo4j.driver.reactive.ReactiveSession;
3835
import org.neo4j.driver.reactive.ReactiveTransaction;
3936
import org.neo4j.driver.reactive.ReactiveTransactionCallback;
40-
import reactor.core.publisher.Mono;
4137

4238
public class InternalReactiveSession extends AbstractReactiveSession<ReactiveTransaction>
4339
implements ReactiveSession, BaseReactiveQueryRunner {
@@ -89,30 +85,7 @@ public Publisher<ReactiveResult> run(Query query) {
8985

9086
@Override
9187
public Publisher<ReactiveResult> run(Query query, TransactionConfig config) {
92-
CompletionStage<RxResultCursor> cursorStage;
93-
try {
94-
cursorStage = session.runRx(query, config);
95-
} catch (Throwable t) {
96-
cursorStage = Futures.failedFuture(t);
97-
}
98-
99-
return publisherToFlowPublisher(Mono.fromCompletionStage(cursorStage)
100-
.onErrorResume(error -> Mono.fromCompletionStage(session.releaseConnectionAsync())
101-
.onErrorMap(releaseError -> Futures.combineErrors(error, releaseError))
102-
.then(Mono.error(error)))
103-
.flatMap(cursor -> {
104-
Mono<RxResultCursor> publisher;
105-
var runError = cursor.getRunError();
106-
if (runError != null) {
107-
publisher = Mono.fromCompletionStage(session.releaseConnectionAsync())
108-
.onErrorMap(releaseError -> Futures.combineErrors(runError, releaseError))
109-
.then(Mono.error(runError));
110-
} else {
111-
publisher = Mono.just(cursor);
112-
}
113-
return publisher;
114-
})
115-
.map(InternalReactiveResult::new));
88+
return publisherToFlowPublisher(run(query, config, InternalReactiveResult::new));
11689
}
11790

11891
@Override

driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public RxResult run(Query query) {
9696
public RxResult run(Query query, TransactionConfig config) {
9797
return new InternalRxResult(() -> {
9898
var resultCursorFuture = new CompletableFuture<RxResultCursor>();
99-
session.runRx(query, config).whenComplete((cursor, completionError) -> {
99+
session.runRx(query, config, resultCursorFuture).whenComplete((cursor, completionError) -> {
100100
if (cursor != null) {
101101
resultCursorFuture.complete(cursor);
102102
} else {

driver/src/main/java/org/neo4j/driver/internal/reactivestreams/InternalReactiveSession.java

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,18 @@
2020

2121
import java.util.HashSet;
2222
import java.util.Set;
23-
import java.util.concurrent.CompletionStage;
2423
import org.neo4j.driver.AccessMode;
2524
import org.neo4j.driver.Bookmark;
2625
import org.neo4j.driver.Query;
2726
import org.neo4j.driver.TransactionConfig;
2827
import org.neo4j.driver.internal.async.NetworkSession;
2928
import org.neo4j.driver.internal.async.UnmanagedTransaction;
30-
import org.neo4j.driver.internal.cursor.RxResultCursor;
3129
import org.neo4j.driver.internal.reactive.AbstractReactiveSession;
32-
import org.neo4j.driver.internal.util.Futures;
3330
import org.neo4j.driver.reactivestreams.ReactiveResult;
3431
import org.neo4j.driver.reactivestreams.ReactiveSession;
3532
import org.neo4j.driver.reactivestreams.ReactiveTransaction;
3633
import org.neo4j.driver.reactivestreams.ReactiveTransactionCallback;
3734
import org.reactivestreams.Publisher;
38-
import reactor.core.publisher.Mono;
3935

4036
public class InternalReactiveSession extends AbstractReactiveSession<ReactiveTransaction>
4137
implements ReactiveSession, BaseReactiveQueryRunner {
@@ -83,30 +79,7 @@ public Publisher<ReactiveResult> run(Query query) {
8379

8480
@Override
8581
public Publisher<ReactiveResult> run(Query query, TransactionConfig config) {
86-
CompletionStage<RxResultCursor> cursorStage;
87-
try {
88-
cursorStage = session.runRx(query, config);
89-
} catch (Throwable t) {
90-
cursorStage = Futures.failedFuture(t);
91-
}
92-
93-
return Mono.fromCompletionStage(cursorStage)
94-
.onErrorResume(error -> Mono.fromCompletionStage(session.releaseConnectionAsync())
95-
.onErrorMap(releaseError -> Futures.combineErrors(error, releaseError))
96-
.then(Mono.error(error)))
97-
.flatMap(cursor -> {
98-
Mono<RxResultCursor> publisher;
99-
var runError = cursor.getRunError();
100-
if (runError != null) {
101-
publisher = Mono.fromCompletionStage(session.releaseConnectionAsync())
102-
.onErrorMap(releaseError -> Futures.combineErrors(runError, releaseError))
103-
.then(Mono.error(runError));
104-
} else {
105-
publisher = Mono.just(cursor);
106-
}
107-
return publisher;
108-
})
109-
.map(InternalReactiveResult::new);
82+
return run(query, config, InternalReactiveResult::new);
11083
}
11184

11285
@Override

0 commit comments

Comments
 (0)