diff --git a/driver/src/main/java/org/neo4j/driver/reactive/ReactiveSession.java b/driver/src/main/java/org/neo4j/driver/reactive/ReactiveSession.java index 7683e35059..f87f149898 100644 --- a/driver/src/main/java/org/neo4j/driver/reactive/ReactiveSession.java +++ b/driver/src/main/java/org/neo4j/driver/reactive/ReactiveSession.java @@ -26,7 +26,6 @@ import org.neo4j.driver.BaseSession; import org.neo4j.driver.Bookmark; import org.neo4j.driver.Query; -import org.neo4j.driver.Result; import org.neo4j.driver.Session; import org.neo4j.driver.TransactionConfig; import org.neo4j.driver.Values; @@ -71,10 +70,18 @@ default Publisher beginTransaction() { * The driver will attempt committing the transaction when the provided unit of work completes successfully. Any exception emitted by the unit of work will * result in a rollback attempt and abortion of execution unless exception is considered to be valid for retry attempt by the driver. *

- * The provided unit of work should not return {@link Result} object as it won't be valid outside the scope of the transaction. + * The provided unit of work should not return {@link ReactiveResult} object as it won't be valid outside the scope of the transaction. *

* It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a * different thread. + *

+ * The driver uses the provided {@link ReactiveTransactionCallback} to get a publisher and emits its + * signals via the resulting publisher. If the supplied publisher emits a + * {@link org.neo4j.driver.exceptions.RetryableException} and the driver is in a position to retry, it calls the + * provided callback again to get a new publisher and attempts to stream its signals. In case of retries, the + * resulting publisher contains the successfully emitted values from all retry attempts. For instance, if a + * retryable exception occurs after streaming values [v1, v2, v3] and a successful retry emits values [v1, v2, v3, + * v4] then the resulting publisher emits the following values: [v1, v2, v3, v1, v2, v3, v4]. * * @param callback the callback representing the unit of work. * @param the return type of the given unit of work. @@ -91,10 +98,18 @@ default Publisher executeRead(ReactiveTransactionCallback - * The provided unit of work should not return {@link Result} object as it won't be valid outside the scope of the transaction. + * The provided unit of work should not return {@link ReactiveResult} object as it won't be valid outside the scope of the transaction. *

* It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a * different thread. + *

+ * The driver uses the provided {@link ReactiveTransactionCallback} to get a publisher and emits its + * signals via the resulting publisher. If the supplied publisher emits a + * {@link org.neo4j.driver.exceptions.RetryableException} and the driver is in a position to retry, it calls the + * provided callback again to get a new publisher and attempts to stream its signals. In case of retries, the + * resulting publisher contains the successfully emitted values from all retry attempts. For instance, if a + * retryable exception occurs after streaming values [v1, v2, v3] and a successful retry emits values [v1, v2, v3, + * v4] then the resulting publisher emits the following values: [v1, v2, v3, v1, v2, v3, v4]. * * @param callback the callback representing the unit of work. * @param config configuration for all transactions started to execute the unit of work. @@ -111,10 +126,18 @@ Publisher executeRead( * The driver will attempt committing the transaction when the provided unit of work completes successfully. Any exception emitted by the unit of work will * result in a rollback attempt and abortion of execution unless exception is considered to be valid for retry attempt by the driver. *

- * The provided unit of work should not return {@link Result} object as it won't be valid outside the scope of the transaction. + * The provided unit of work should not return {@link ReactiveResult} object as it won't be valid outside the scope of the transaction. *

* It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a * different thread. + *

+ * The driver uses the provided {@link ReactiveTransactionCallback} to get a publisher and emits its + * signals via the resulting publisher. If the supplied publisher emits a + * {@link org.neo4j.driver.exceptions.RetryableException} and the driver is in a position to retry, it calls the + * provided callback again to get a new publisher and attempts to stream its signals. In case of retries, the + * resulting publisher contains the successfully emitted values from all retry attempts. For instance, if a + * retryable exception occurs after streaming values [v1, v2, v3] and a successful retry emits values [v1, v2, v3, + * v4] then the resulting publisher emits the following values: [v1, v2, v3, v1, v2, v3, v4]. * * @param callback the callback representing the unit of work. * @param the return type of the given unit of work. @@ -131,10 +154,18 @@ default Publisher executeWrite(ReactiveTransactionCallback - * The provided unit of work should not return {@link Result} object as it won't be valid outside the scope of the transaction. + * The provided unit of work should not return {@link ReactiveResult} object as it won't be valid outside the scope of the transaction. *

* It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a * different thread. + *

+ * The driver uses the provided {@link ReactiveTransactionCallback} to get a publisher and emits its + * signals via the resulting publisher. If the supplied publisher emits a + * {@link org.neo4j.driver.exceptions.RetryableException} and the driver is in a position to retry, it calls the + * provided callback again to get a new publisher and attempts to stream its signals. In case of retries, the + * resulting publisher contains the successfully emitted values from all retry attempts. For instance, if a + * retryable exception occurs after streaming values [v1, v2, v3] and a successful retry emits values [v1, v2, v3, + * v4] then the resulting publisher emits the following values: [v1, v2, v3, v1, v2, v3, v4]. * * @param callback the callback representing the unit of work. * @param config configuration for all transactions started to execute the unit of work. diff --git a/driver/src/main/java/org/neo4j/driver/reactivestreams/ReactiveSession.java b/driver/src/main/java/org/neo4j/driver/reactivestreams/ReactiveSession.java index bfdf9c176a..ce761a08e0 100644 --- a/driver/src/main/java/org/neo4j/driver/reactivestreams/ReactiveSession.java +++ b/driver/src/main/java/org/neo4j/driver/reactivestreams/ReactiveSession.java @@ -25,7 +25,6 @@ import org.neo4j.driver.BaseSession; import org.neo4j.driver.Bookmark; import org.neo4j.driver.Query; -import org.neo4j.driver.Result; import org.neo4j.driver.Session; import org.neo4j.driver.TransactionConfig; import org.neo4j.driver.Values; @@ -71,10 +70,18 @@ default Publisher beginTransaction() { * The driver will attempt committing the transaction when the provided unit of work completes successfully. Any exception emitted by the unit of work will * result in a rollback attempt and abortion of execution unless exception is considered to be valid for retry attempt by the driver. *

- * The provided unit of work should not return {@link Result} object as it won't be valid outside the scope of the transaction. + * The provided unit of work should not return {@link ReactiveResult} object as it won't be valid outside the scope of the transaction. *

* It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a * different thread. + *

+ * The driver uses the provided {@link org.neo4j.driver.reactive.ReactiveTransactionCallback} to get a publisher and emits its + * signals via the resulting publisher. If the supplied publisher emits a + * {@link org.neo4j.driver.exceptions.RetryableException} and the driver is in a position to retry, it calls the + * provided callback again to get a new publisher and attempts to stream its signals. In case of retries, the + * resulting publisher contains the successfully emitted values from all retry attempts. For instance, if a + * retryable exception occurs after streaming values [v1, v2, v3] and a successful retry emits values [v1, v2, v3, + * v4] then the resulting publisher emits the following values: [v1, v2, v3, v1, v2, v3, v4]. * * @param callback the callback representing the unit of work. * @param the return type of the given unit of work. @@ -91,10 +98,18 @@ default Publisher executeRead(ReactiveTransactionCallback - * The provided unit of work should not return {@link Result} object as it won't be valid outside the scope of the transaction. + * The provided unit of work should not return {@link ReactiveResult} object as it won't be valid outside the scope of the transaction. *

* It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a * different thread. + *

+ * The driver uses the provided {@link org.neo4j.driver.reactive.ReactiveTransactionCallback} to get a publisher and emits its + * signals via the resulting publisher. If the supplied publisher emits a + * {@link org.neo4j.driver.exceptions.RetryableException} and the driver is in a position to retry, it calls the + * provided callback again to get a new publisher and attempts to stream its signals. In case of retries, the + * resulting publisher contains the successfully emitted values from all retry attempts. For instance, if a + * retryable exception occurs after streaming values [v1, v2, v3] and a successful retry emits values [v1, v2, v3, + * v4] then the resulting publisher emits the following values: [v1, v2, v3, v1, v2, v3, v4]. * * @param callback the callback representing the unit of work. * @param config configuration for all transactions started to execute the unit of work. @@ -111,10 +126,18 @@ Publisher executeRead( * The driver will attempt committing the transaction when the provided unit of work completes successfully. Any exception emitted by the unit of work will * result in a rollback attempt and abortion of execution unless exception is considered to be valid for retry attempt by the driver. *

- * The provided unit of work should not return {@link Result} object as it won't be valid outside the scope of the transaction. + * The provided unit of work should not return {@link ReactiveResult} object as it won't be valid outside the scope of the transaction. *

* It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a * different thread. + *

+ * The driver uses the provided {@link org.neo4j.driver.reactive.ReactiveTransactionCallback} to get a publisher and emits its + * signals via the resulting publisher. If the supplied publisher emits a + * {@link org.neo4j.driver.exceptions.RetryableException} and the driver is in a position to retry, it calls the + * provided callback again to get a new publisher and attempts to stream its signals. In case of retries, the + * resulting publisher contains the successfully emitted values from all retry attempts. For instance, if a + * retryable exception occurs after streaming values [v1, v2, v3] and a successful retry emits values [v1, v2, v3, + * v4] then the resulting publisher emits the following values: [v1, v2, v3, v1, v2, v3, v4]. * * @param callback the callback representing the unit of work. * @param the return type of the given unit of work. @@ -131,10 +154,18 @@ default Publisher executeWrite(ReactiveTransactionCallback - * The provided unit of work should not return {@link Result} object as it won't be valid outside the scope of the transaction. + * The provided unit of work should not return {@link ReactiveResult} object as it won't be valid outside the scope of the transaction. *

* It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a * different thread. + *

+ * The driver uses the provided {@link org.neo4j.driver.reactive.ReactiveTransactionCallback} to get a publisher and emits its + * signals via the resulting publisher. If the supplied publisher emits a + * {@link org.neo4j.driver.exceptions.RetryableException} and the driver is in a position to retry, it calls the + * provided callback again to get a new publisher and attempts to stream its signals. In case of retries, the + * resulting publisher contains the successfully emitted values from all retry attempts. For instance, if a + * retryable exception occurs after streaming values [v1, v2, v3] and a successful retry emits values [v1, v2, v3, + * v4] then the resulting publisher emits the following values: [v1, v2, v3, v1, v2, v3, v4]. * * @param callback the callback representing the unit of work. * @param config configuration for all transactions started to execute the unit of work. diff --git a/driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveSessionIT.java b/driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveSessionIT.java index fd15054ce7..767b302229 100644 --- a/driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveSessionIT.java +++ b/driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveSessionIT.java @@ -23,6 +23,7 @@ import static org.junit.jupiter.api.Assertions.fail; import static org.neo4j.driver.internal.util.Neo4jFeature.BOLT_V4; import static reactor.adapter.JdkFlowAdapter.flowPublisherToFlux; +import static reactor.adapter.JdkFlowAdapter.publisherToFlowPublisher; import java.time.Instant; import java.time.temporal.ChronoUnit; @@ -32,6 +33,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.Flow; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.stream.IntStream; import org.junit.jupiter.api.Test; @@ -42,6 +44,7 @@ import org.neo4j.driver.Config; import org.neo4j.driver.ConnectionPoolMetrics; import org.neo4j.driver.exceptions.ClientException; +import org.neo4j.driver.exceptions.ServiceUnavailableException; import org.neo4j.driver.internal.util.EnabledOnNeo4jWith; import org.neo4j.driver.reactive.ReactiveResult; import org.neo4j.driver.reactive.ReactiveSession; @@ -172,6 +175,30 @@ protected void hookOnSubscribe(Subscription subscription) { } } + @Test + void shouldEmitAllSuccessfullyEmittedValues() { + @SuppressWarnings("resource") + var session = neo4j.driver().session(ReactiveSession.class); + var succeed = new AtomicBoolean(); + var numbers = flowPublisherToFlux(session.executeRead(tx -> { + var numbersFlux = flowPublisherToFlux(tx.run("UNWIND range(0, 5) AS x RETURN x")) + .flatMap(result -> flowPublisherToFlux(result.records())) + .map(record -> record.get("x").asInt()); + return succeed.getAndSet(true) + ? publisherToFlowPublisher(numbersFlux) + : publisherToFlowPublisher(numbersFlux.handle((value, sink) -> { + if (value == 2) { + sink.error(new ServiceUnavailableException("simulated")); + } else { + sink.next(value); + } + })); + })) + .collectList() + .block(); + assertEquals(List.of(0, 1, 0, 1, 2, 3, 4, 5), numbers); + } + static List>> managedTransactionsReturningReactiveResultPublisher() { return List.of( diff --git a/driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveStreamsSessionIT.java b/driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveStreamsSessionIT.java index eb82c25484..eb8e84e4bb 100644 --- a/driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveStreamsSessionIT.java +++ b/driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveStreamsSessionIT.java @@ -30,6 +30,7 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.stream.IntStream; import org.junit.jupiter.api.Test; @@ -40,6 +41,7 @@ import org.neo4j.driver.Config; import org.neo4j.driver.ConnectionPoolMetrics; import org.neo4j.driver.exceptions.ClientException; +import org.neo4j.driver.exceptions.ServiceUnavailableException; import org.neo4j.driver.internal.util.EnabledOnNeo4jWith; import org.neo4j.driver.reactivestreams.ReactiveResult; import org.neo4j.driver.reactivestreams.ReactiveSession; @@ -157,6 +159,30 @@ protected void hookOnSubscribe(Subscription subscription) { } } + @Test + void shouldEmitAllSuccessfullyEmittedValues() { + @SuppressWarnings("resource") + var session = neo4j.driver().session(ReactiveSession.class); + var succeed = new AtomicBoolean(); + var numbers = Flux.from(session.executeRead(tx -> { + var numbersFlux = Mono.from(tx.run("UNWIND range(0, 5) AS x RETURN x")) + .flatMapMany(ReactiveResult::records) + .map(record -> record.get("x").asInt()); + return succeed.getAndSet(true) + ? numbersFlux + : numbersFlux.handle((value, sink) -> { + if (value == 2) { + sink.error(new ServiceUnavailableException("simulated")); + } else { + sink.next(value); + } + }); + })) + .collectList() + .block(); + assertEquals(List.of(0, 1, 0, 1, 2, 3, 4, 5), numbers); + } + static List>> managedTransactionsReturningReactiveResultPublisher() { return List.of(