Skip to content

Commit 685cbde

Browse files
committed
Add more details to reactive managed transaction docs
1 parent 41ff606 commit 685cbde

File tree

4 files changed

+127
-10
lines changed

4 files changed

+127
-10
lines changed

driver/src/main/java/org/neo4j/driver/reactive/ReactiveSession.java

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.neo4j.driver.BaseSession;
2727
import org.neo4j.driver.Bookmark;
2828
import org.neo4j.driver.Query;
29-
import org.neo4j.driver.Result;
3029
import org.neo4j.driver.Session;
3130
import org.neo4j.driver.TransactionConfig;
3231
import org.neo4j.driver.Values;
@@ -71,10 +70,18 @@ default Publisher<ReactiveTransaction> beginTransaction() {
7170
* The driver will attempt committing the transaction when the provided unit of work completes successfully. Any exception emitted by the unit of work will
7271
* result in a rollback attempt and abortion of execution unless exception is considered to be valid for retry attempt by the driver.
7372
* <p>
74-
* The provided unit of work should not return {@link Result} object as it won't be valid outside the scope of the transaction.
73+
* The provided unit of work should not return {@link ReactiveResult} object as it won't be valid outside the scope of the transaction.
7574
* <p>
7675
* It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a
7776
* different thread.
77+
* <p>
78+
* The driver uses the provided {@link ReactiveTransactionCallback} to get a publisher and emits its
79+
* signals via the resulting publisher. If the supplied publisher emits a
80+
* {@link org.neo4j.driver.exceptions.RetryableException} and the driver is in a position to retry, it calls the
81+
* provided callback again to get a new publisher and attempts to stream its signals. In case of retries, the
82+
* resulting publisher contains the successfully emitted values from all retry attempts. For instance, if a
83+
* retryable exception occurs after streaming values [v1, v2, v3] and a successful retry emits values [v1, v2, v3,
84+
* v4] then the resulting publisher emits the following values: [v1, v2, v3, v1, v2, v3, v4].
7885
*
7986
* @param callback the callback representing the unit of work.
8087
* @param <T> the return type of the given unit of work.
@@ -91,10 +98,18 @@ default <T> Publisher<T> executeRead(ReactiveTransactionCallback<? extends Publi
9198
* The driver will attempt committing the transaction when the provided unit of work completes successfully. Any exception emitted by the unit of work will
9299
* result in a rollback attempt and abortion of execution unless exception is considered to be valid for retry attempt by the driver.
93100
* <p>
94-
* The provided unit of work should not return {@link Result} object as it won't be valid outside the scope of the transaction.
101+
* The provided unit of work should not return {@link ReactiveResult} object as it won't be valid outside the scope of the transaction.
95102
* <p>
96103
* It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a
97104
* different thread.
105+
* <p>
106+
* The driver uses the provided {@link ReactiveTransactionCallback} to get a publisher and emits its
107+
* signals via the resulting publisher. If the supplied publisher emits a
108+
* {@link org.neo4j.driver.exceptions.RetryableException} and the driver is in a position to retry, it calls the
109+
* provided callback again to get a new publisher and attempts to stream its signals. In case of retries, the
110+
* resulting publisher contains the successfully emitted values from all retry attempts. For instance, if a
111+
* retryable exception occurs after streaming values [v1, v2, v3] and a successful retry emits values [v1, v2, v3,
112+
* v4] then the resulting publisher emits the following values: [v1, v2, v3, v1, v2, v3, v4].
98113
*
99114
* @param callback the callback representing the unit of work.
100115
* @param config configuration for all transactions started to execute the unit of work.
@@ -111,10 +126,18 @@ <T> Publisher<T> executeRead(
111126
* The driver will attempt committing the transaction when the provided unit of work completes successfully. Any exception emitted by the unit of work will
112127
* result in a rollback attempt and abortion of execution unless exception is considered to be valid for retry attempt by the driver.
113128
* <p>
114-
* The provided unit of work should not return {@link Result} object as it won't be valid outside the scope of the transaction.
129+
* The provided unit of work should not return {@link ReactiveResult} object as it won't be valid outside the scope of the transaction.
115130
* <p>
116131
* It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a
117132
* different thread.
133+
* <p>
134+
* The driver uses the provided {@link ReactiveTransactionCallback} to get a publisher and emits its
135+
* signals via the resulting publisher. If the supplied publisher emits a
136+
* {@link org.neo4j.driver.exceptions.RetryableException} and the driver is in a position to retry, it calls the
137+
* provided callback again to get a new publisher and attempts to stream its signals. In case of retries, the
138+
* resulting publisher contains the successfully emitted values from all retry attempts. For instance, if a
139+
* retryable exception occurs after streaming values [v1, v2, v3] and a successful retry emits values [v1, v2, v3,
140+
* v4] then the resulting publisher emits the following values: [v1, v2, v3, v1, v2, v3, v4].
118141
*
119142
* @param callback the callback representing the unit of work.
120143
* @param <T> the return type of the given unit of work.
@@ -131,10 +154,18 @@ default <T> Publisher<T> executeWrite(ReactiveTransactionCallback<? extends Publ
131154
* The driver will attempt committing the transaction when the provided unit of work completes successfully. Any exception emitted by the unit of work will
132155
* result in a rollback attempt and abortion of execution unless exception is considered to be valid for retry attempt by the driver.
133156
* <p>
134-
* The provided unit of work should not return {@link Result} object as it won't be valid outside the scope of the transaction.
157+
* The provided unit of work should not return {@link ReactiveResult} object as it won't be valid outside the scope of the transaction.
135158
* <p>
136159
* It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a
137160
* different thread.
161+
* <p>
162+
* The driver uses the provided {@link ReactiveTransactionCallback} to get a publisher and emits its
163+
* signals via the resulting publisher. If the supplied publisher emits a
164+
* {@link org.neo4j.driver.exceptions.RetryableException} and the driver is in a position to retry, it calls the
165+
* provided callback again to get a new publisher and attempts to stream its signals. In case of retries, the
166+
* resulting publisher contains the successfully emitted values from all retry attempts. For instance, if a
167+
* retryable exception occurs after streaming values [v1, v2, v3] and a successful retry emits values [v1, v2, v3,
168+
* v4] then the resulting publisher emits the following values: [v1, v2, v3, v1, v2, v3, v4].
138169
*
139170
* @param callback the callback representing the unit of work.
140171
* @param config configuration for all transactions started to execute the unit of work.

driver/src/main/java/org/neo4j/driver/reactivestreams/ReactiveSession.java

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.neo4j.driver.BaseSession;
2626
import org.neo4j.driver.Bookmark;
2727
import org.neo4j.driver.Query;
28-
import org.neo4j.driver.Result;
2928
import org.neo4j.driver.Session;
3029
import org.neo4j.driver.TransactionConfig;
3130
import org.neo4j.driver.Values;
@@ -71,10 +70,18 @@ default Publisher<ReactiveTransaction> beginTransaction() {
7170
* The driver will attempt committing the transaction when the provided unit of work completes successfully. Any exception emitted by the unit of work will
7271
* result in a rollback attempt and abortion of execution unless exception is considered to be valid for retry attempt by the driver.
7372
* <p>
74-
* The provided unit of work should not return {@link Result} object as it won't be valid outside the scope of the transaction.
73+
* The provided unit of work should not return {@link ReactiveResult} object as it won't be valid outside the scope of the transaction.
7574
* <p>
7675
* It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a
7776
* different thread.
77+
* <p>
78+
* The driver uses the provided {@link org.neo4j.driver.reactive.ReactiveTransactionCallback} to get a publisher and emits its
79+
* signals via the resulting publisher. If the supplied publisher emits a
80+
* {@link org.neo4j.driver.exceptions.RetryableException} and the driver is in a position to retry, it calls the
81+
* provided callback again to get a new publisher and attempts to stream its signals. In case of retries, the
82+
* resulting publisher contains the successfully emitted values from all retry attempts. For instance, if a
83+
* retryable exception occurs after streaming values [v1, v2, v3] and a successful retry emits values [v1, v2, v3,
84+
* v4] then the resulting publisher emits the following values: [v1, v2, v3, v1, v2, v3, v4].
7885
*
7986
* @param callback the callback representing the unit of work.
8087
* @param <T> the return type of the given unit of work.
@@ -91,10 +98,18 @@ default <T> Publisher<T> executeRead(ReactiveTransactionCallback<? extends Publi
9198
* The driver will attempt committing the transaction when the provided unit of work completes successfully. Any exception emitted by the unit of work will
9299
* result in a rollback attempt and abortion of execution unless exception is considered to be valid for retry attempt by the driver.
93100
* <p>
94-
* The provided unit of work should not return {@link Result} object as it won't be valid outside the scope of the transaction.
101+
* The provided unit of work should not return {@link ReactiveResult} object as it won't be valid outside the scope of the transaction.
95102
* <p>
96103
* It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a
97104
* different thread.
105+
* <p>
106+
* The driver uses the provided {@link org.neo4j.driver.reactive.ReactiveTransactionCallback} to get a publisher and emits its
107+
* signals via the resulting publisher. If the supplied publisher emits a
108+
* {@link org.neo4j.driver.exceptions.RetryableException} and the driver is in a position to retry, it calls the
109+
* provided callback again to get a new publisher and attempts to stream its signals. In case of retries, the
110+
* resulting publisher contains the successfully emitted values from all retry attempts. For instance, if a
111+
* retryable exception occurs after streaming values [v1, v2, v3] and a successful retry emits values [v1, v2, v3,
112+
* v4] then the resulting publisher emits the following values: [v1, v2, v3, v1, v2, v3, v4].
98113
*
99114
* @param callback the callback representing the unit of work.
100115
* @param config configuration for all transactions started to execute the unit of work.
@@ -111,10 +126,18 @@ <T> Publisher<T> executeRead(
111126
* The driver will attempt committing the transaction when the provided unit of work completes successfully. Any exception emitted by the unit of work will
112127
* result in a rollback attempt and abortion of execution unless exception is considered to be valid for retry attempt by the driver.
113128
* <p>
114-
* The provided unit of work should not return {@link Result} object as it won't be valid outside the scope of the transaction.
129+
* The provided unit of work should not return {@link ReactiveResult} object as it won't be valid outside the scope of the transaction.
115130
* <p>
116131
* It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a
117132
* different thread.
133+
* <p>
134+
* The driver uses the provided {@link org.neo4j.driver.reactive.ReactiveTransactionCallback} to get a publisher and emits its
135+
* signals via the resulting publisher. If the supplied publisher emits a
136+
* {@link org.neo4j.driver.exceptions.RetryableException} and the driver is in a position to retry, it calls the
137+
* provided callback again to get a new publisher and attempts to stream its signals. In case of retries, the
138+
* resulting publisher contains the successfully emitted values from all retry attempts. For instance, if a
139+
* retryable exception occurs after streaming values [v1, v2, v3] and a successful retry emits values [v1, v2, v3,
140+
* v4] then the resulting publisher emits the following values: [v1, v2, v3, v1, v2, v3, v4].
118141
*
119142
* @param callback the callback representing the unit of work.
120143
* @param <T> the return type of the given unit of work.
@@ -131,10 +154,18 @@ default <T> Publisher<T> executeWrite(ReactiveTransactionCallback<? extends Publ
131154
* The driver will attempt committing the transaction when the provided unit of work completes successfully. Any exception emitted by the unit of work will
132155
* result in a rollback attempt and abortion of execution unless exception is considered to be valid for retry attempt by the driver.
133156
* <p>
134-
* The provided unit of work should not return {@link Result} object as it won't be valid outside the scope of the transaction.
157+
* The provided unit of work should not return {@link ReactiveResult} object as it won't be valid outside the scope of the transaction.
135158
* <p>
136159
* It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a
137160
* different thread.
161+
* <p>
162+
* The driver uses the provided {@link org.neo4j.driver.reactive.ReactiveTransactionCallback} to get a publisher and emits its
163+
* signals via the resulting publisher. If the supplied publisher emits a
164+
* {@link org.neo4j.driver.exceptions.RetryableException} and the driver is in a position to retry, it calls the
165+
* provided callback again to get a new publisher and attempts to stream its signals. In case of retries, the
166+
* resulting publisher contains the successfully emitted values from all retry attempts. For instance, if a
167+
* retryable exception occurs after streaming values [v1, v2, v3] and a successful retry emits values [v1, v2, v3,
168+
* v4] then the resulting publisher emits the following values: [v1, v2, v3, v1, v2, v3, v4].
138169
*
139170
* @param callback the callback representing the unit of work.
140171
* @param config configuration for all transactions started to execute the unit of work.

driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveSessionIT.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static org.junit.jupiter.api.Assertions.fail;
2424
import static org.neo4j.driver.internal.util.Neo4jFeature.BOLT_V4;
2525
import static reactor.adapter.JdkFlowAdapter.flowPublisherToFlux;
26+
import static reactor.adapter.JdkFlowAdapter.publisherToFlowPublisher;
2627

2728
import java.time.Instant;
2829
import java.time.temporal.ChronoUnit;
@@ -32,6 +33,7 @@
3233
import java.util.concurrent.CompletableFuture;
3334
import java.util.concurrent.Executors;
3435
import java.util.concurrent.Flow;
36+
import java.util.concurrent.atomic.AtomicBoolean;
3537
import java.util.function.Function;
3638
import java.util.stream.IntStream;
3739
import org.junit.jupiter.api.Test;
@@ -42,6 +44,7 @@
4244
import org.neo4j.driver.Config;
4345
import org.neo4j.driver.ConnectionPoolMetrics;
4446
import org.neo4j.driver.exceptions.ClientException;
47+
import org.neo4j.driver.exceptions.ServiceUnavailableException;
4548
import org.neo4j.driver.internal.util.EnabledOnNeo4jWith;
4649
import org.neo4j.driver.reactive.ReactiveResult;
4750
import org.neo4j.driver.reactive.ReactiveSession;
@@ -172,6 +175,31 @@ protected void hookOnSubscribe(Subscription subscription) {
172175
}
173176
}
174177

178+
@Test
179+
void shouldEmitAllSuccessfullyEmittedValues() {
180+
try (var driver = neo4j.driver()) {
181+
var session = driver.session(ReactiveSession.class);
182+
var succeed = new AtomicBoolean();
183+
var numbers = flowPublisherToFlux(session.executeRead(tx -> {
184+
var numbersFlux = flowPublisherToFlux(tx.run("UNWIND range(0, 3) AS x RETURN x"))
185+
.flatMap(result -> flowPublisherToFlux(result.records()))
186+
.map(record -> record.get("x").asInt());
187+
return succeed.getAndSet(true)
188+
? publisherToFlowPublisher(numbersFlux)
189+
: publisherToFlowPublisher(numbersFlux.handle((value, sink) -> {
190+
if (value == 2) {
191+
sink.error(new ServiceUnavailableException("simulated"));
192+
} else {
193+
sink.next(value);
194+
}
195+
}));
196+
}))
197+
.collectList()
198+
.block();
199+
assertEquals(List.of(0, 1, 0, 1, 2, 3), numbers);
200+
}
201+
}
202+
175203
static List<Function<ReactiveSession, Publisher<ReactiveResult>>>
176204
managedTransactionsReturningReactiveResultPublisher() {
177205
return List.of(

driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveStreamsSessionIT.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.UUID;
3131
import java.util.concurrent.CompletableFuture;
3232
import java.util.concurrent.Executors;
33+
import java.util.concurrent.atomic.AtomicBoolean;
3334
import java.util.function.Function;
3435
import java.util.stream.IntStream;
3536
import org.junit.jupiter.api.Test;
@@ -40,6 +41,7 @@
4041
import org.neo4j.driver.Config;
4142
import org.neo4j.driver.ConnectionPoolMetrics;
4243
import org.neo4j.driver.exceptions.ClientException;
44+
import org.neo4j.driver.exceptions.ServiceUnavailableException;
4345
import org.neo4j.driver.internal.util.EnabledOnNeo4jWith;
4446
import org.neo4j.driver.reactivestreams.ReactiveResult;
4547
import org.neo4j.driver.reactivestreams.ReactiveSession;
@@ -157,6 +159,31 @@ protected void hookOnSubscribe(Subscription subscription) {
157159
}
158160
}
159161

162+
@Test
163+
void shouldEmitAllSuccessfullyEmittedValues() {
164+
try (var driver = neo4j.driver()) {
165+
var session = driver.session(ReactiveSession.class);
166+
var succeed = new AtomicBoolean();
167+
var numbers = Flux.from(session.executeRead(tx -> {
168+
var numbersFlux = Mono.from(tx.run("UNWIND range(0, 3) AS x RETURN x"))
169+
.flatMapMany(ReactiveResult::records)
170+
.map(record -> record.get("x").asInt());
171+
return succeed.getAndSet(true)
172+
? numbersFlux
173+
: numbersFlux.handle((value, sink) -> {
174+
if (value == 2) {
175+
sink.error(new ServiceUnavailableException("simulated"));
176+
} else {
177+
sink.next(value);
178+
}
179+
});
180+
}))
181+
.collectList()
182+
.block();
183+
assertEquals(List.of(0, 1, 0, 1, 2, 3), numbers);
184+
}
185+
}
186+
160187
static List<Function<ReactiveSession, Publisher<ReactiveResult>>>
161188
managedTransactionsReturningReactiveResultPublisher() {
162189
return List.of(

0 commit comments

Comments
 (0)