Skip to content

Commit e21be5c

Browse files
authored
Update the new reactive API to use Flow API (#1295)
The new reactive API should use Java Flow API, the legacy deprecated reactive API stays on Reactive Streams API.
1 parent 8c54468 commit e21be5c

36 files changed

+205
-117
lines changed

driver/src/main/java/org/neo4j/driver/internal/reactive/AbstractReactiveSession.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public AbstractReactiveSession(NetworkSession session) {
4949

5050
abstract Publisher<Void> closeTransaction(S transaction, boolean commit);
5151

52-
public Publisher<S> beginTransaction(TransactionConfig config) {
52+
Publisher<S> doBeginTransaction(TransactionConfig config) {
5353
return createSingleItemPublisher(
5454
() -> {
5555
CompletableFuture<S> txFuture = new CompletableFuture<>();
@@ -115,7 +115,7 @@ public Set<Bookmark> lastBookmarks() {
115115
return session.lastBookmarks();
116116
}
117117

118-
public <T> Publisher<T> close() {
118+
<T> Publisher<T> doClose() {
119119
return createEmptyPublisher(session::closeAsync);
120120
}
121121
}

driver/src/main/java/org/neo4j/driver/internal/reactive/AbstractReactiveTransaction.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,19 +31,19 @@ protected AbstractReactiveTransaction(UnmanagedTransaction tx) {
3131
this.tx = tx;
3232
}
3333

34-
public <T> Publisher<T> commit() {
34+
<T> Publisher<T> doCommit() {
3535
return createEmptyPublisher(tx::commitAsync);
3636
}
3737

38-
public <T> Publisher<T> rollback() {
38+
<T> Publisher<T> doRollback() {
3939
return createEmptyPublisher(tx::rollbackAsync);
4040
}
4141

42-
public Publisher<Void> close() {
42+
Publisher<Void> doClose() {
4343
return close(false);
4444
}
4545

46-
public Publisher<Boolean> isOpen() {
46+
Publisher<Boolean> doIsOpen() {
4747
return Mono.just(tx.isOpen());
4848
}
4949

driver/src/main/java/org/neo4j/driver/internal/reactive/BaseReactiveQueryRunner.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@
1818
*/
1919
package org.neo4j.driver.internal.reactive;
2020

21+
import static reactor.adapter.JdkFlowAdapter.publisherToFlowPublisher;
22+
2123
import java.util.Map;
24+
import java.util.concurrent.Flow.Publisher;
2225
import org.neo4j.driver.Query;
2326
import org.neo4j.driver.Record;
2427
import org.neo4j.driver.Value;
@@ -27,7 +30,6 @@
2730
import org.neo4j.driver.internal.value.MapValue;
2831
import org.neo4j.driver.reactive.ReactiveQueryRunner;
2932
import org.neo4j.driver.reactive.ReactiveResult;
30-
import org.reactivestreams.Publisher;
3133
import reactor.core.publisher.Mono;
3234

3335
interface BaseReactiveQueryRunner extends ReactiveQueryRunner {
@@ -37,7 +39,7 @@ default Publisher<ReactiveResult> run(String queryStr, Value parameters) {
3739
Query query = new Query(queryStr, parameters);
3840
return run(query);
3941
} catch (Throwable t) {
40-
return Mono.error(t);
42+
return publisherToFlowPublisher(Mono.error(t));
4143
}
4244
}
4345

@@ -57,7 +59,7 @@ default Publisher<ReactiveResult> run(String queryStr) {
5759
Query query = new Query(queryStr);
5860
return run(query);
5961
} catch (Throwable t) {
60-
return Mono.error(t);
62+
return publisherToFlowPublisher(Mono.error(t));
6163
}
6264
}
6365

driver/src/main/java/org/neo4j/driver/internal/reactive/DelegatingReactiveTransactionContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@
1919
package org.neo4j.driver.internal.reactive;
2020

2121
import java.util.Map;
22+
import java.util.concurrent.Flow.Publisher;
2223
import org.neo4j.driver.Query;
2324
import org.neo4j.driver.Record;
2425
import org.neo4j.driver.Value;
2526
import org.neo4j.driver.reactive.ReactiveResult;
2627
import org.neo4j.driver.reactive.ReactiveTransaction;
2728
import org.neo4j.driver.reactive.ReactiveTransactionContext;
28-
import org.reactivestreams.Publisher;
2929

3030
final class DelegatingReactiveTransactionContext implements ReactiveTransactionContext {
3131
private final ReactiveTransaction delegate;

driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveResult.java

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,17 @@
1919
package org.neo4j.driver.internal.reactive;
2020

2121
import static org.neo4j.driver.internal.util.ErrorUtil.newResultConsumedError;
22+
import static reactor.adapter.JdkFlowAdapter.publisherToFlowPublisher;
2223
import static reactor.core.publisher.FluxSink.OverflowStrategy.IGNORE;
2324

2425
import java.util.List;
26+
import java.util.concurrent.Flow.Publisher;
2527
import java.util.function.BiConsumer;
2628
import org.neo4j.driver.Record;
2729
import org.neo4j.driver.internal.cursor.RxResultCursor;
2830
import org.neo4j.driver.internal.util.Futures;
2931
import org.neo4j.driver.reactive.ReactiveResult;
3032
import org.neo4j.driver.summary.ResultSummary;
31-
import org.reactivestreams.Publisher;
3233
import reactor.core.publisher.Flux;
3334
import reactor.core.publisher.FluxSink;
3435
import reactor.core.publisher.Mono;
@@ -47,7 +48,7 @@ public List<String> keys() {
4748

4849
@Override
4950
public Publisher<Record> records() {
50-
return Flux.create(
51+
return publisherToFlowPublisher(Flux.create(
5152
sink -> {
5253
if (cursor.isDone()) {
5354
sink.error(newResultConsumedError());
@@ -57,24 +58,25 @@ public Publisher<Record> records() {
5758
sink.onRequest(cursor::request);
5859
}
5960
},
60-
IGNORE);
61+
IGNORE));
6162
}
6263

6364
@Override
6465
public Publisher<ResultSummary> consume() {
65-
return Mono.create(sink -> cursor.summaryAsync().whenComplete((summary, summaryCompletionError) -> {
66-
Throwable error = Futures.completionExceptionCause(summaryCompletionError);
67-
if (summary != null) {
68-
sink.success(summary);
69-
} else {
70-
sink.error(error);
71-
}
72-
}));
66+
return publisherToFlowPublisher(
67+
Mono.create(sink -> cursor.summaryAsync().whenComplete((summary, summaryCompletionError) -> {
68+
Throwable error = Futures.completionExceptionCause(summaryCompletionError);
69+
if (summary != null) {
70+
sink.success(summary);
71+
} else {
72+
sink.error(error);
73+
}
74+
})));
7375
}
7476

7577
@Override
7678
public Publisher<Boolean> isOpen() {
77-
return Mono.just(!cursor.isDone());
79+
return publisherToFlowPublisher(Mono.just(!cursor.isDone()));
7880
}
7981

8082
/**

driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveSession.java

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,13 @@
1818
*/
1919
package org.neo4j.driver.internal.reactive;
2020

21+
import static reactor.adapter.JdkFlowAdapter.flowPublisherToFlux;
22+
import static reactor.adapter.JdkFlowAdapter.publisherToFlowPublisher;
23+
2124
import java.util.HashSet;
2225
import java.util.Set;
2326
import java.util.concurrent.CompletionStage;
27+
import java.util.concurrent.Flow.Publisher;
2428
import org.neo4j.driver.AccessMode;
2529
import org.neo4j.driver.Bookmark;
2630
import org.neo4j.driver.Query;
@@ -33,7 +37,6 @@
3337
import org.neo4j.driver.reactive.ReactiveSession;
3438
import org.neo4j.driver.reactive.ReactiveTransaction;
3539
import org.neo4j.driver.reactive.ReactiveTransactionCallback;
36-
import org.reactivestreams.Publisher;
3740
import reactor.core.publisher.Mono;
3841

3942
public class InternalReactiveSession extends AbstractReactiveSession<ReactiveTransaction>
@@ -48,22 +51,31 @@ ReactiveTransaction createTransaction(UnmanagedTransaction unmanagedTransaction)
4851
}
4952

5053
@Override
51-
Publisher<Void> closeTransaction(ReactiveTransaction transaction, boolean commit) {
54+
org.reactivestreams.Publisher<Void> closeTransaction(ReactiveTransaction transaction, boolean commit) {
5255
return ((InternalReactiveTransaction) transaction).close(commit);
5356
}
5457

58+
@Override
59+
public Publisher<ReactiveTransaction> beginTransaction(TransactionConfig config) {
60+
return publisherToFlowPublisher(doBeginTransaction(config));
61+
}
62+
5563
@Override
5664
public <T> Publisher<T> executeRead(
5765
ReactiveTransactionCallback<? extends Publisher<T>> callback, TransactionConfig config) {
58-
return runTransaction(
59-
AccessMode.READ, tx -> callback.execute(new DelegatingReactiveTransactionContext(tx)), config);
66+
return publisherToFlowPublisher(runTransaction(
67+
AccessMode.READ,
68+
tx -> flowPublisherToFlux(callback.execute(new DelegatingReactiveTransactionContext(tx))),
69+
config));
6070
}
6171

6272
@Override
6373
public <T> Publisher<T> executeWrite(
6474
ReactiveTransactionCallback<? extends Publisher<T>> callback, TransactionConfig config) {
65-
return runTransaction(
66-
AccessMode.WRITE, tx -> callback.execute(new DelegatingReactiveTransactionContext(tx)), config);
75+
return publisherToFlowPublisher(runTransaction(
76+
AccessMode.WRITE,
77+
tx -> flowPublisherToFlux(callback.execute(new DelegatingReactiveTransactionContext(tx))),
78+
config));
6779
}
6880

6981
@Override
@@ -80,7 +92,7 @@ public Publisher<ReactiveResult> run(Query query, TransactionConfig config) {
8092
cursorStage = Futures.failedFuture(t);
8193
}
8294

83-
return Mono.fromCompletionStage(cursorStage)
95+
return publisherToFlowPublisher(Mono.fromCompletionStage(cursorStage)
8496
.onErrorResume(error -> Mono.fromCompletionStage(session.releaseConnectionAsync())
8597
.onErrorMap(releaseError -> Futures.combineErrors(error, releaseError))
8698
.then(Mono.error(error)))
@@ -96,11 +108,16 @@ public Publisher<ReactiveResult> run(Query query, TransactionConfig config) {
96108
}
97109
return publisher;
98110
})
99-
.map(InternalReactiveResult::new);
111+
.map(InternalReactiveResult::new));
100112
}
101113

102114
@Override
103115
public Set<Bookmark> lastBookmarks() {
104116
return new HashSet<>(session.lastBookmarks());
105117
}
118+
119+
@Override
120+
public <T> Publisher<T> close() {
121+
return publisherToFlowPublisher(doClose());
122+
}
106123
}

driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveTransaction.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,16 @@
1818
*/
1919
package org.neo4j.driver.internal.reactive;
2020

21+
import static reactor.adapter.JdkFlowAdapter.publisherToFlowPublisher;
22+
2123
import java.util.concurrent.CompletionStage;
24+
import java.util.concurrent.Flow.Publisher;
2225
import org.neo4j.driver.Query;
2326
import org.neo4j.driver.internal.async.UnmanagedTransaction;
2427
import org.neo4j.driver.internal.cursor.RxResultCursor;
2528
import org.neo4j.driver.internal.util.Futures;
2629
import org.neo4j.driver.reactive.ReactiveResult;
2730
import org.neo4j.driver.reactive.ReactiveTransaction;
28-
import org.reactivestreams.Publisher;
2931
import reactor.core.publisher.Mono;
3032

3133
public class InternalReactiveTransaction extends AbstractReactiveTransaction
@@ -43,7 +45,7 @@ public Publisher<ReactiveResult> run(Query query) {
4345
cursorStage = Futures.failedFuture(t);
4446
}
4547

46-
return Mono.fromCompletionStage(cursorStage)
48+
return publisherToFlowPublisher(Mono.fromCompletionStage(cursorStage)
4749
.flatMap(cursor -> {
4850
Mono<RxResultCursor> publisher;
4951
Throwable runError = cursor.getRunError();
@@ -55,7 +57,7 @@ public Publisher<ReactiveResult> run(Query query) {
5557
}
5658
return publisher;
5759
})
58-
.map(InternalReactiveResult::new);
60+
.map(InternalReactiveResult::new));
5961
}
6062

6163
/**
@@ -66,6 +68,26 @@ public Publisher<ReactiveResult> run(Query query) {
6668
* @return {@code RESET} response publisher
6769
*/
6870
public Publisher<Void> interrupt() {
69-
return Mono.fromCompletionStage(tx.interruptAsync());
71+
return publisherToFlowPublisher(Mono.fromCompletionStage(tx.interruptAsync()));
72+
}
73+
74+
@Override
75+
public <T> Publisher<T> commit() {
76+
return publisherToFlowPublisher(doCommit());
77+
}
78+
79+
@Override
80+
public <T> Publisher<T> rollback() {
81+
return publisherToFlowPublisher(doRollback());
82+
}
83+
84+
@Override
85+
public Publisher<Void> close() {
86+
return publisherToFlowPublisher(doClose());
87+
}
88+
89+
@Override
90+
public Publisher<Boolean> isOpen() {
91+
return publisherToFlowPublisher(doIsOpen());
7092
}
7193
}

driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818
*/
1919
package org.neo4j.driver.internal.reactive;
2020

21-
import static org.neo4j.driver.internal.reactive.RxUtils.createEmptyPublisher;
22-
2321
import java.util.Map;
2422
import java.util.concurrent.CompletableFuture;
2523
import org.neo4j.driver.AccessMode;
@@ -54,6 +52,11 @@ Publisher<Void> closeTransaction(RxTransaction transaction, boolean commit) {
5452
return ((InternalRxTransaction) transaction).close(commit);
5553
}
5654

55+
@Override
56+
public Publisher<RxTransaction> beginTransaction(TransactionConfig config) {
57+
return doBeginTransaction(config);
58+
}
59+
5760
@Override
5861
public <T> Publisher<T> readTransaction(RxTransactionWork<? extends Publisher<T>> work) {
5962
return readTransaction(work, TransactionConfig.empty());
@@ -128,6 +131,6 @@ public Bookmark lastBookmark() {
128131

129132
@Override
130133
public <T> Publisher<T> close() {
131-
return createEmptyPublisher(session::closeAsync);
134+
return doClose();
132135
}
133136
}

driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxTransaction.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.neo4j.driver.internal.util.Futures;
2626
import org.neo4j.driver.reactive.RxResult;
2727
import org.neo4j.driver.reactive.RxTransaction;
28+
import org.reactivestreams.Publisher;
2829

2930
@Deprecated
3031
public class InternalRxTransaction extends AbstractReactiveTransaction implements RxTransaction {
@@ -53,4 +54,24 @@ public RxResult run(Query query) {
5354
return cursorFuture;
5455
});
5556
}
57+
58+
@Override
59+
public <T> Publisher<T> commit() {
60+
return doCommit();
61+
}
62+
63+
@Override
64+
public <T> Publisher<T> rollback() {
65+
return doRollback();
66+
}
67+
68+
@Override
69+
public Publisher<Void> close() {
70+
return doClose();
71+
}
72+
73+
@Override
74+
public Publisher<Boolean> isOpen() {
75+
return doIsOpen();
76+
}
5677
}

driver/src/main/java/org/neo4j/driver/reactive/ReactiveQueryRunner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@
1919
package org.neo4j.driver.reactive;
2020

2121
import java.util.Map;
22+
import java.util.concurrent.Flow.Publisher;
2223
import org.neo4j.driver.Query;
2324
import org.neo4j.driver.Record;
2425
import org.neo4j.driver.Value;
2526
import org.neo4j.driver.Values;
26-
import org.reactivestreams.Publisher;
2727

2828
/**
2929
* Common interface for components that can execute Neo4j queries using Reactive API.

driver/src/main/java/org/neo4j/driver/reactive/ReactiveResult.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@
1919
package org.neo4j.driver.reactive;
2020

2121
import java.util.List;
22+
import java.util.concurrent.Flow.Publisher;
23+
import java.util.concurrent.Flow.Subscriber;
24+
import java.util.concurrent.Flow.Subscription;
2225
import org.neo4j.driver.Query;
2326
import org.neo4j.driver.Record;
2427
import org.neo4j.driver.exceptions.ResultConsumedException;
2528
import org.neo4j.driver.summary.ResultSummary;
26-
import org.reactivestreams.Publisher;
27-
import org.reactivestreams.Subscriber;
28-
import org.reactivestreams.Subscription;
2929

3030
/**
3131
* A reactive result provides a reactive way to execute query on the server and receives records back. This reactive result consists of a result key publisher,

0 commit comments

Comments
 (0)