diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java index be0d7648dd..69073d6051 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java @@ -18,7 +18,6 @@ */ package org.neo4j.driver.internal.reactive; -import org.neo4j.driver.Query; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; @@ -26,8 +25,9 @@ import java.util.concurrent.CompletableFuture; import org.neo4j.driver.AccessMode; -import org.neo4j.driver.TransactionConfig; import org.neo4j.driver.Bookmark; +import org.neo4j.driver.Query; +import org.neo4j.driver.TransactionConfig; import org.neo4j.driver.exceptions.TransactionNestingException; import org.neo4j.driver.internal.async.NetworkSession; import org.neo4j.driver.internal.cursor.RxResultCursor; @@ -38,7 +38,7 @@ import org.neo4j.driver.reactive.RxTransactionWork; import static org.neo4j.driver.internal.reactive.RxUtils.createEmptyPublisher; -import static org.neo4j.driver.internal.reactive.RxUtils.createMono; +import static org.neo4j.driver.internal.reactive.RxUtils.createSingleItemPublisher; public class InternalRxSession extends AbstractRxQueryRunner implements RxSession { @@ -61,40 +61,46 @@ public Publisher beginTransaction() @Override public Publisher beginTransaction( TransactionConfig config ) { - return createMono( () -> - { - CompletableFuture txFuture = new CompletableFuture<>(); - session.beginTransactionAsync( config ).whenComplete( ( tx, completionError ) -> { - if ( tx != null ) + return createSingleItemPublisher( + () -> { - txFuture.complete( new InternalRxTransaction( tx ) ); - } - else - { - releaseConnectionBeforeReturning( txFuture, completionError ); - } - } ); - return txFuture; - } ); + CompletableFuture txFuture = new CompletableFuture<>(); + session.beginTransactionAsync( config ).whenComplete( + ( tx, completionError ) -> + { + if ( tx != null ) + { + txFuture.complete( new InternalRxTransaction( tx ) ); + } + else + { + releaseConnectionBeforeReturning( txFuture, completionError ); + } + } ); + return txFuture; + }, () -> new IllegalStateException( "Unexpected condition, begin transaction call has completed successfully with transaction being null" ) ); } private Publisher beginTransaction( AccessMode mode, TransactionConfig config ) { - return createMono( () -> - { - CompletableFuture txFuture = new CompletableFuture<>(); - session.beginTransactionAsync( mode, config ).whenComplete( ( tx, completionError ) -> { - if ( tx != null ) + return createSingleItemPublisher( + () -> { - txFuture.complete( new InternalRxTransaction( tx ) ); - } - else - { - releaseConnectionBeforeReturning( txFuture, completionError ); - } - } ); - return txFuture; - } ); + CompletableFuture txFuture = new CompletableFuture<>(); + session.beginTransactionAsync( mode, config ).whenComplete( + ( tx, completionError ) -> + { + if ( tx != null ) + { + txFuture.complete( new InternalRxTransaction( tx ) ); + } + else + { + releaseConnectionBeforeReturning( txFuture, completionError ); + } + } ); + return txFuture; + }, () -> new IllegalStateException( "Unexpected condition, begin transaction call has completed successfully with transaction being null" ) ); } @Override diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/RxUtils.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/RxUtils.java index 9bbebf8e23..392365e1da 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactive/RxUtils.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/RxUtils.java @@ -21,6 +21,7 @@ import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; +import java.util.Optional; import java.util.concurrent.CompletionStage; import java.util.function.Supplier; @@ -49,27 +50,35 @@ public static Publisher createEmptyPublisher( Supplier} publisher from the given {@link CompletionStage} supplier. - * @param supplier supplies a {@link CompletionStage}. - * @param the type of the item to publish. - * @return A {@link Mono} publisher. + * The publisher created by this method will either succeed with exactly one item or fail with an error. + * + * @param supplier supplies a {@link CompletionStage} that MUST produce a non-null result when completed successfully. + * @param nullResultThrowableSupplier supplies a {@link Throwable} that is used as an error when the supplied completion stage completes successfully with + * null. + * @param the type of the item to publish. + * @return A publisher that succeeds exactly one item or fails with an error. */ - public static Publisher createMono( Supplier> supplier ) + public static Publisher createSingleItemPublisher( Supplier> supplier, Supplier nullResultThrowableSupplier ) { - return Mono.create( sink -> supplier.get().whenComplete( ( item, completionError ) -> { - Throwable error = Futures.completionExceptionCause( completionError ); - if ( item != null ) - { - sink.success( item ); - } - if ( error != null ) - { - sink.error( error ); - } - else - { - sink.success(); - } - } ) ); + return Mono.create( sink -> supplier.get().whenComplete( + ( item, completionError ) -> + { + if ( completionError == null ) + { + if ( item != null ) + { + sink.success( item ); + } + else + { + sink.error( nullResultThrowableSupplier.get() ); + } + } + else + { + Throwable error = Optional.ofNullable( Futures.completionExceptionCause( completionError ) ).orElse( completionError ); + sink.error( error ); + } + } ) ); } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/reactive/RxUtilsTest.java b/driver/src/test/java/org/neo4j/driver/internal/reactive/RxUtilsTest.java index 92016d3c71..7adc1b7f2a 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/reactive/RxUtilsTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/reactive/RxUtilsTest.java @@ -27,21 +27,22 @@ import org.neo4j.driver.internal.util.Futures; -import static org.neo4j.driver.internal.util.Futures.failedFuture; +import static org.mockito.Mockito.mock; import static org.neo4j.driver.internal.reactive.RxUtils.createEmptyPublisher; -import static org.neo4j.driver.internal.reactive.RxUtils.createMono; +import static org.neo4j.driver.internal.reactive.RxUtils.createSingleItemPublisher; +import static org.neo4j.driver.internal.util.Futures.failedFuture; class RxUtilsTest { @Test - void emptyPublisherShouldComplete() throws Throwable + void emptyPublisherShouldComplete() { Publisher emptyPublisher = createEmptyPublisher( Futures::completedWithNull ); StepVerifier.create( emptyPublisher ).verifyComplete(); } @Test - void emptyPublisherShouldErrorIfSupplierErrored() throws Throwable + void emptyPublisherShouldErrorWhenSupplierErrors() { RuntimeException error = new RuntimeException( "Error" ); Publisher emptyPublisher = createEmptyPublisher( () -> failedFuture( error ) ); @@ -50,18 +51,27 @@ void emptyPublisherShouldErrorIfSupplierErrored() throws Throwable } @Test - void monoPublisherShouldCompleteWithValue() throws Throwable + void singleItemPublisherShouldCompleteWithValue() { - Publisher mono = createMono( () -> CompletableFuture.completedFuture( "One" ) ); - StepVerifier.create( mono ).expectNext( "One" ).verifyComplete(); + Publisher publisher = createSingleItemPublisher( () -> CompletableFuture.completedFuture( "One" ), () -> mock( Throwable.class ) ); + StepVerifier.create( publisher ).expectNext( "One" ).verifyComplete(); } @Test - void monoPublisherShouldErrorIfSupplierErrored() throws Throwable + void singleItemPublisherShouldErrorWhenFutureCompletesWithNull() { - RuntimeException error = new RuntimeException( "Error" ); - Publisher mono = createMono( () -> failedFuture( error ) ); + Throwable error = mock( Throwable.class ); + Publisher publisher = createSingleItemPublisher( Futures::completedWithNull, () -> error ); + + StepVerifier.create( publisher ).verifyErrorMatches( actualError -> error == actualError ); + } + + @Test + void singleItemPublisherShouldErrorWhenSupplierErrors() + { + RuntimeException error = mock( RuntimeException.class ); + Publisher publisher = createSingleItemPublisher( () -> failedFuture( error ), () -> mock( Throwable.class ) ); - StepVerifier.create( mono ).verifyErrorMatches( Predicate.isEqual( error ) ); + StepVerifier.create( publisher ).verifyErrorMatches( actualError -> error == actualError ); } }