Skip to content

Add more details to reactive managed transaction docs #1462

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.neo4j.driver.BaseSession;
import org.neo4j.driver.Bookmark;
import org.neo4j.driver.Query;
import org.neo4j.driver.Result;
import org.neo4j.driver.Session;
import org.neo4j.driver.TransactionConfig;
import org.neo4j.driver.Values;
Expand Down Expand Up @@ -71,10 +70,18 @@ default Publisher<ReactiveTransaction> beginTransaction() {
* The driver will attempt committing the transaction when the provided unit of work completes successfully. Any exception emitted by the unit of work will
* result in a rollback attempt and abortion of execution unless exception is considered to be valid for retry attempt by the driver.
* <p>
* The provided unit of work should not return {@link Result} object as it won't be valid outside the scope of the transaction.
* The provided unit of work should not return {@link ReactiveResult} object as it won't be valid outside the scope of the transaction.
* <p>
* It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a
* different thread.
* <p>
* The driver uses the provided {@link ReactiveTransactionCallback} to get a publisher and emits its
* signals via the resulting publisher. If the supplied publisher emits a
* {@link org.neo4j.driver.exceptions.RetryableException} and the driver is in a position to retry, it calls the
* provided callback again to get a new publisher and attempts to stream its signals. In case of retries, the
* resulting publisher contains the successfully emitted values from all retry attempts. For instance, if a
* retryable exception occurs after streaming values [v1, v2, v3] and a successful retry emits values [v1, v2, v3,
* v4] then the resulting publisher emits the following values: [v1, v2, v3, v1, v2, v3, v4].
*
* @param callback the callback representing the unit of work.
* @param <T> the return type of the given unit of work.
Expand All @@ -91,10 +98,18 @@ default <T> Publisher<T> executeRead(ReactiveTransactionCallback<? extends Publi
* The driver will attempt committing the transaction when the provided unit of work completes successfully. Any exception emitted by the unit of work will
* result in a rollback attempt and abortion of execution unless exception is considered to be valid for retry attempt by the driver.
* <p>
* The provided unit of work should not return {@link Result} object as it won't be valid outside the scope of the transaction.
* The provided unit of work should not return {@link ReactiveResult} object as it won't be valid outside the scope of the transaction.
* <p>
* It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a
* different thread.
* <p>
* The driver uses the provided {@link ReactiveTransactionCallback} to get a publisher and emits its
* signals via the resulting publisher. If the supplied publisher emits a
* {@link org.neo4j.driver.exceptions.RetryableException} and the driver is in a position to retry, it calls the
* provided callback again to get a new publisher and attempts to stream its signals. In case of retries, the
* resulting publisher contains the successfully emitted values from all retry attempts. For instance, if a
* retryable exception occurs after streaming values [v1, v2, v3] and a successful retry emits values [v1, v2, v3,
* v4] then the resulting publisher emits the following values: [v1, v2, v3, v1, v2, v3, v4].
*
* @param callback the callback representing the unit of work.
* @param config configuration for all transactions started to execute the unit of work.
Expand All @@ -111,10 +126,18 @@ <T> Publisher<T> executeRead(
* The driver will attempt committing the transaction when the provided unit of work completes successfully. Any exception emitted by the unit of work will
* result in a rollback attempt and abortion of execution unless exception is considered to be valid for retry attempt by the driver.
* <p>
* The provided unit of work should not return {@link Result} object as it won't be valid outside the scope of the transaction.
* The provided unit of work should not return {@link ReactiveResult} object as it won't be valid outside the scope of the transaction.
* <p>
* It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a
* different thread.
* <p>
* The driver uses the provided {@link ReactiveTransactionCallback} to get a publisher and emits its
* signals via the resulting publisher. If the supplied publisher emits a
* {@link org.neo4j.driver.exceptions.RetryableException} and the driver is in a position to retry, it calls the
* provided callback again to get a new publisher and attempts to stream its signals. In case of retries, the
* resulting publisher contains the successfully emitted values from all retry attempts. For instance, if a
* retryable exception occurs after streaming values [v1, v2, v3] and a successful retry emits values [v1, v2, v3,
* v4] then the resulting publisher emits the following values: [v1, v2, v3, v1, v2, v3, v4].
*
* @param callback the callback representing the unit of work.
* @param <T> the return type of the given unit of work.
Expand All @@ -131,10 +154,18 @@ default <T> Publisher<T> executeWrite(ReactiveTransactionCallback<? extends Publ
* The driver will attempt committing the transaction when the provided unit of work completes successfully. Any exception emitted by the unit of work will
* result in a rollback attempt and abortion of execution unless exception is considered to be valid for retry attempt by the driver.
* <p>
* The provided unit of work should not return {@link Result} object as it won't be valid outside the scope of the transaction.
* The provided unit of work should not return {@link ReactiveResult} object as it won't be valid outside the scope of the transaction.
* <p>
* It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a
* different thread.
* <p>
* The driver uses the provided {@link ReactiveTransactionCallback} to get a publisher and emits its
* signals via the resulting publisher. If the supplied publisher emits a
* {@link org.neo4j.driver.exceptions.RetryableException} and the driver is in a position to retry, it calls the
* provided callback again to get a new publisher and attempts to stream its signals. In case of retries, the
* resulting publisher contains the successfully emitted values from all retry attempts. For instance, if a
* retryable exception occurs after streaming values [v1, v2, v3] and a successful retry emits values [v1, v2, v3,
* v4] then the resulting publisher emits the following values: [v1, v2, v3, v1, v2, v3, v4].
*
* @param callback the callback representing the unit of work.
* @param config configuration for all transactions started to execute the unit of work.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.neo4j.driver.BaseSession;
import org.neo4j.driver.Bookmark;
import org.neo4j.driver.Query;
import org.neo4j.driver.Result;
import org.neo4j.driver.Session;
import org.neo4j.driver.TransactionConfig;
import org.neo4j.driver.Values;
Expand Down Expand Up @@ -71,10 +70,18 @@ default Publisher<ReactiveTransaction> beginTransaction() {
* The driver will attempt committing the transaction when the provided unit of work completes successfully. Any exception emitted by the unit of work will
* result in a rollback attempt and abortion of execution unless exception is considered to be valid for retry attempt by the driver.
* <p>
* The provided unit of work should not return {@link Result} object as it won't be valid outside the scope of the transaction.
* The provided unit of work should not return {@link ReactiveResult} object as it won't be valid outside the scope of the transaction.
* <p>
* It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a
* different thread.
* <p>
* The driver uses the provided {@link org.neo4j.driver.reactive.ReactiveTransactionCallback} to get a publisher and emits its
* signals via the resulting publisher. If the supplied publisher emits a
* {@link org.neo4j.driver.exceptions.RetryableException} and the driver is in a position to retry, it calls the
* provided callback again to get a new publisher and attempts to stream its signals. In case of retries, the
* resulting publisher contains the successfully emitted values from all retry attempts. For instance, if a
* retryable exception occurs after streaming values [v1, v2, v3] and a successful retry emits values [v1, v2, v3,
* v4] then the resulting publisher emits the following values: [v1, v2, v3, v1, v2, v3, v4].
*
* @param callback the callback representing the unit of work.
* @param <T> the return type of the given unit of work.
Expand All @@ -91,10 +98,18 @@ default <T> Publisher<T> executeRead(ReactiveTransactionCallback<? extends Publi
* The driver will attempt committing the transaction when the provided unit of work completes successfully. Any exception emitted by the unit of work will
* result in a rollback attempt and abortion of execution unless exception is considered to be valid for retry attempt by the driver.
* <p>
* The provided unit of work should not return {@link Result} object as it won't be valid outside the scope of the transaction.
* The provided unit of work should not return {@link ReactiveResult} object as it won't be valid outside the scope of the transaction.
* <p>
* It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a
* different thread.
* <p>
* The driver uses the provided {@link org.neo4j.driver.reactive.ReactiveTransactionCallback} to get a publisher and emits its
* signals via the resulting publisher. If the supplied publisher emits a
* {@link org.neo4j.driver.exceptions.RetryableException} and the driver is in a position to retry, it calls the
* provided callback again to get a new publisher and attempts to stream its signals. In case of retries, the
* resulting publisher contains the successfully emitted values from all retry attempts. For instance, if a
* retryable exception occurs after streaming values [v1, v2, v3] and a successful retry emits values [v1, v2, v3,
* v4] then the resulting publisher emits the following values: [v1, v2, v3, v1, v2, v3, v4].
*
* @param callback the callback representing the unit of work.
* @param config configuration for all transactions started to execute the unit of work.
Expand All @@ -111,10 +126,18 @@ <T> Publisher<T> executeRead(
* The driver will attempt committing the transaction when the provided unit of work completes successfully. Any exception emitted by the unit of work will
* result in a rollback attempt and abortion of execution unless exception is considered to be valid for retry attempt by the driver.
* <p>
* The provided unit of work should not return {@link Result} object as it won't be valid outside the scope of the transaction.
* The provided unit of work should not return {@link ReactiveResult} object as it won't be valid outside the scope of the transaction.
* <p>
* It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a
* different thread.
* <p>
* The driver uses the provided {@link org.neo4j.driver.reactive.ReactiveTransactionCallback} to get a publisher and emits its
* signals via the resulting publisher. If the supplied publisher emits a
* {@link org.neo4j.driver.exceptions.RetryableException} and the driver is in a position to retry, it calls the
* provided callback again to get a new publisher and attempts to stream its signals. In case of retries, the
* resulting publisher contains the successfully emitted values from all retry attempts. For instance, if a
* retryable exception occurs after streaming values [v1, v2, v3] and a successful retry emits values [v1, v2, v3,
* v4] then the resulting publisher emits the following values: [v1, v2, v3, v1, v2, v3, v4].
*
* @param callback the callback representing the unit of work.
* @param <T> the return type of the given unit of work.
Expand All @@ -131,10 +154,18 @@ default <T> Publisher<T> executeWrite(ReactiveTransactionCallback<? extends Publ
* The driver will attempt committing the transaction when the provided unit of work completes successfully. Any exception emitted by the unit of work will
* result in a rollback attempt and abortion of execution unless exception is considered to be valid for retry attempt by the driver.
* <p>
* The provided unit of work should not return {@link Result} object as it won't be valid outside the scope of the transaction.
* The provided unit of work should not return {@link ReactiveResult} object as it won't be valid outside the scope of the transaction.
* <p>
* It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a
* different thread.
* <p>
* The driver uses the provided {@link org.neo4j.driver.reactive.ReactiveTransactionCallback} to get a publisher and emits its
* signals via the resulting publisher. If the supplied publisher emits a
* {@link org.neo4j.driver.exceptions.RetryableException} and the driver is in a position to retry, it calls the
* provided callback again to get a new publisher and attempts to stream its signals. In case of retries, the
* resulting publisher contains the successfully emitted values from all retry attempts. For instance, if a
* retryable exception occurs after streaming values [v1, v2, v3] and a successful retry emits values [v1, v2, v3,
* v4] then the resulting publisher emits the following values: [v1, v2, v3, v1, v2, v3, v4].
*
* @param callback the callback representing the unit of work.
* @param config configuration for all transactions started to execute the unit of work.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.junit.jupiter.api.Assertions.fail;
import static org.neo4j.driver.internal.util.Neo4jFeature.BOLT_V4;
import static reactor.adapter.JdkFlowAdapter.flowPublisherToFlux;
import static reactor.adapter.JdkFlowAdapter.publisherToFlowPublisher;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
Expand All @@ -32,6 +33,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.IntStream;
import org.junit.jupiter.api.Test;
Expand All @@ -42,6 +44,7 @@
import org.neo4j.driver.Config;
import org.neo4j.driver.ConnectionPoolMetrics;
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.exceptions.ServiceUnavailableException;
import org.neo4j.driver.internal.util.EnabledOnNeo4jWith;
import org.neo4j.driver.reactive.ReactiveResult;
import org.neo4j.driver.reactive.ReactiveSession;
Expand Down Expand Up @@ -172,6 +175,30 @@ protected void hookOnSubscribe(Subscription subscription) {
}
}

@Test
void shouldEmitAllSuccessfullyEmittedValues() {
@SuppressWarnings("resource")
var session = neo4j.driver().session(ReactiveSession.class);
var succeed = new AtomicBoolean();
var numbers = flowPublisherToFlux(session.executeRead(tx -> {
var numbersFlux = flowPublisherToFlux(tx.run("UNWIND range(0, 5) AS x RETURN x"))
.flatMap(result -> flowPublisherToFlux(result.records()))
.map(record -> record.get("x").asInt());
return succeed.getAndSet(true)
? publisherToFlowPublisher(numbersFlux)
: publisherToFlowPublisher(numbersFlux.handle((value, sink) -> {
if (value == 2) {
sink.error(new ServiceUnavailableException("simulated"));
} else {
sink.next(value);
}
}));
}))
.collectList()
.block();
assertEquals(List.of(0, 1, 0, 1, 2, 3, 4, 5), numbers);
}

static List<Function<ReactiveSession, Publisher<ReactiveResult>>>
managedTransactionsReturningReactiveResultPublisher() {
return List.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.IntStream;
import org.junit.jupiter.api.Test;
Expand All @@ -40,6 +41,7 @@
import org.neo4j.driver.Config;
import org.neo4j.driver.ConnectionPoolMetrics;
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.exceptions.ServiceUnavailableException;
import org.neo4j.driver.internal.util.EnabledOnNeo4jWith;
import org.neo4j.driver.reactivestreams.ReactiveResult;
import org.neo4j.driver.reactivestreams.ReactiveSession;
Expand Down Expand Up @@ -157,6 +159,30 @@ protected void hookOnSubscribe(Subscription subscription) {
}
}

@Test
void shouldEmitAllSuccessfullyEmittedValues() {
@SuppressWarnings("resource")
var session = neo4j.driver().session(ReactiveSession.class);
var succeed = new AtomicBoolean();
var numbers = Flux.from(session.executeRead(tx -> {
var numbersFlux = Mono.from(tx.run("UNWIND range(0, 5) AS x RETURN x"))
.flatMapMany(ReactiveResult::records)
.map(record -> record.get("x").asInt());
return succeed.getAndSet(true)
? numbersFlux
: numbersFlux.handle((value, sink) -> {
if (value == 2) {
sink.error(new ServiceUnavailableException("simulated"));
} else {
sink.next(value);
}
});
}))
.collectList()
.block();
assertEquals(List.of(0, 1, 0, 1, 2, 3, 4, 5), numbers);
}

static List<Function<ReactiveSession, Publisher<ReactiveResult>>>
managedTransactionsReturningReactiveResultPublisher() {
return List.of(
Expand Down