Skip to content

Commit db519ba

Browse files
authored
Fix RxSession.beginTransaction returning an empty Publisher (#932) (#933)
1 parent 6d84252 commit db519ba

File tree

3 files changed

+87
-62
lines changed

3 files changed

+87
-62
lines changed

driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java

Lines changed: 37 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,16 @@
1818
*/
1919
package org.neo4j.driver.internal.reactive;
2020

21-
import org.neo4j.driver.Query;
2221
import org.reactivestreams.Publisher;
2322
import reactor.core.publisher.Flux;
2423

2524
import java.util.Map;
2625
import java.util.concurrent.CompletableFuture;
2726

2827
import org.neo4j.driver.AccessMode;
29-
import org.neo4j.driver.TransactionConfig;
3028
import org.neo4j.driver.Bookmark;
29+
import org.neo4j.driver.Query;
30+
import org.neo4j.driver.TransactionConfig;
3131
import org.neo4j.driver.exceptions.TransactionNestingException;
3232
import org.neo4j.driver.internal.async.NetworkSession;
3333
import org.neo4j.driver.internal.cursor.RxResultCursor;
@@ -38,7 +38,7 @@
3838
import org.neo4j.driver.reactive.RxTransactionWork;
3939

4040
import static org.neo4j.driver.internal.reactive.RxUtils.createEmptyPublisher;
41-
import static org.neo4j.driver.internal.reactive.RxUtils.createMono;
41+
import static org.neo4j.driver.internal.reactive.RxUtils.createSingleItemPublisher;
4242

4343
public class InternalRxSession extends AbstractRxQueryRunner implements RxSession
4444
{
@@ -61,40 +61,46 @@ public Publisher<RxTransaction> beginTransaction()
6161
@Override
6262
public Publisher<RxTransaction> beginTransaction( TransactionConfig config )
6363
{
64-
return createMono( () ->
65-
{
66-
CompletableFuture<RxTransaction> txFuture = new CompletableFuture<>();
67-
session.beginTransactionAsync( config ).whenComplete( ( tx, completionError ) -> {
68-
if ( tx != null )
64+
return createSingleItemPublisher(
65+
() ->
6966
{
70-
txFuture.complete( new InternalRxTransaction( tx ) );
71-
}
72-
else
73-
{
74-
releaseConnectionBeforeReturning( txFuture, completionError );
75-
}
76-
} );
77-
return txFuture;
78-
} );
67+
CompletableFuture<RxTransaction> txFuture = new CompletableFuture<>();
68+
session.beginTransactionAsync( config ).whenComplete(
69+
( tx, completionError ) ->
70+
{
71+
if ( tx != null )
72+
{
73+
txFuture.complete( new InternalRxTransaction( tx ) );
74+
}
75+
else
76+
{
77+
releaseConnectionBeforeReturning( txFuture, completionError );
78+
}
79+
} );
80+
return txFuture;
81+
}, () -> new IllegalStateException( "Unexpected condition, begin transaction call has completed successfully with transaction being null" ) );
7982
}
8083

8184
private Publisher<RxTransaction> beginTransaction( AccessMode mode, TransactionConfig config )
8285
{
83-
return createMono( () ->
84-
{
85-
CompletableFuture<RxTransaction> txFuture = new CompletableFuture<>();
86-
session.beginTransactionAsync( mode, config ).whenComplete( ( tx, completionError ) -> {
87-
if ( tx != null )
86+
return createSingleItemPublisher(
87+
() ->
8888
{
89-
txFuture.complete( new InternalRxTransaction( tx ) );
90-
}
91-
else
92-
{
93-
releaseConnectionBeforeReturning( txFuture, completionError );
94-
}
95-
} );
96-
return txFuture;
97-
} );
89+
CompletableFuture<RxTransaction> txFuture = new CompletableFuture<>();
90+
session.beginTransactionAsync( mode, config ).whenComplete(
91+
( tx, completionError ) ->
92+
{
93+
if ( tx != null )
94+
{
95+
txFuture.complete( new InternalRxTransaction( tx ) );
96+
}
97+
else
98+
{
99+
releaseConnectionBeforeReturning( txFuture, completionError );
100+
}
101+
} );
102+
return txFuture;
103+
}, () -> new IllegalStateException( "Unexpected condition, begin transaction call has completed successfully with transaction being null" ) );
98104
}
99105

100106
@Override

driver/src/main/java/org/neo4j/driver/internal/reactive/RxUtils.java

Lines changed: 29 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.reactivestreams.Publisher;
2222
import reactor.core.publisher.Mono;
2323

24+
import java.util.Optional;
2425
import java.util.concurrent.CompletionStage;
2526
import java.util.function.Supplier;
2627

@@ -49,27 +50,35 @@ public static <T> Publisher<T> createEmptyPublisher( Supplier<CompletionStage<Vo
4950
}
5051

5152
/**
52-
* Create a {@link Mono<T>} publisher from the given {@link CompletionStage<T>} supplier.
53-
* @param supplier supplies a {@link CompletionStage<T>}.
54-
* @param <T> the type of the item to publish.
55-
* @return A {@link Mono<T>} publisher.
53+
* The publisher created by this method will either succeed with exactly one item or fail with an error.
54+
*
55+
* @param supplier supplies a {@link CompletionStage<T>} that MUST produce a non-null result when completed successfully.
56+
* @param nullResultThrowableSupplier supplies a {@link Throwable} that is used as an error when the supplied completion stage completes successfully with
57+
* null.
58+
* @param <T> the type of the item to publish.
59+
* @return A publisher that succeeds exactly one item or fails with an error.
5660
*/
57-
public static <T> Publisher<T> createMono( Supplier<CompletionStage<T>> supplier )
61+
public static <T> Publisher<T> createSingleItemPublisher( Supplier<CompletionStage<T>> supplier, Supplier<Throwable> nullResultThrowableSupplier )
5862
{
59-
return Mono.create( sink -> supplier.get().whenComplete( ( item, completionError ) -> {
60-
Throwable error = Futures.completionExceptionCause( completionError );
61-
if ( item != null )
62-
{
63-
sink.success( item );
64-
}
65-
if ( error != null )
66-
{
67-
sink.error( error );
68-
}
69-
else
70-
{
71-
sink.success();
72-
}
73-
} ) );
63+
return Mono.create( sink -> supplier.get().whenComplete(
64+
( item, completionError ) ->
65+
{
66+
if ( completionError == null )
67+
{
68+
if ( item != null )
69+
{
70+
sink.success( item );
71+
}
72+
else
73+
{
74+
sink.error( nullResultThrowableSupplier.get() );
75+
}
76+
}
77+
else
78+
{
79+
Throwable error = Optional.ofNullable( Futures.completionExceptionCause( completionError ) ).orElse( completionError );
80+
sink.error( error );
81+
}
82+
} ) );
7483
}
7584
}

driver/src/test/java/org/neo4j/driver/internal/reactive/RxUtilsTest.java

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,21 +27,22 @@
2727

2828
import org.neo4j.driver.internal.util.Futures;
2929

30-
import static org.neo4j.driver.internal.util.Futures.failedFuture;
30+
import static org.mockito.Mockito.mock;
3131
import static org.neo4j.driver.internal.reactive.RxUtils.createEmptyPublisher;
32-
import static org.neo4j.driver.internal.reactive.RxUtils.createMono;
32+
import static org.neo4j.driver.internal.reactive.RxUtils.createSingleItemPublisher;
33+
import static org.neo4j.driver.internal.util.Futures.failedFuture;
3334

3435
class RxUtilsTest
3536
{
3637
@Test
37-
void emptyPublisherShouldComplete() throws Throwable
38+
void emptyPublisherShouldComplete()
3839
{
3940
Publisher<Void> emptyPublisher = createEmptyPublisher( Futures::completedWithNull );
4041
StepVerifier.create( emptyPublisher ).verifyComplete();
4142
}
4243

4344
@Test
44-
void emptyPublisherShouldErrorIfSupplierErrored() throws Throwable
45+
void emptyPublisherShouldErrorWhenSupplierErrors()
4546
{
4647
RuntimeException error = new RuntimeException( "Error" );
4748
Publisher<Void> emptyPublisher = createEmptyPublisher( () -> failedFuture( error ) );
@@ -50,18 +51,27 @@ void emptyPublisherShouldErrorIfSupplierErrored() throws Throwable
5051
}
5152

5253
@Test
53-
void monoPublisherShouldCompleteWithValue() throws Throwable
54+
void singleItemPublisherShouldCompleteWithValue()
5455
{
55-
Publisher<String> mono = createMono( () -> CompletableFuture.completedFuture( "One" ) );
56-
StepVerifier.create( mono ).expectNext( "One" ).verifyComplete();
56+
Publisher<String> publisher = createSingleItemPublisher( () -> CompletableFuture.completedFuture( "One" ), () -> mock( Throwable.class ) );
57+
StepVerifier.create( publisher ).expectNext( "One" ).verifyComplete();
5758
}
5859

5960
@Test
60-
void monoPublisherShouldErrorIfSupplierErrored() throws Throwable
61+
void singleItemPublisherShouldErrorWhenFutureCompletesWithNull()
6162
{
62-
RuntimeException error = new RuntimeException( "Error" );
63-
Publisher<String> mono = createMono( () -> failedFuture( error ) );
63+
Throwable error = mock( Throwable.class );
64+
Publisher<String> publisher = createSingleItemPublisher( Futures::completedWithNull, () -> error );
65+
66+
StepVerifier.create( publisher ).verifyErrorMatches( actualError -> error == actualError );
67+
}
68+
69+
@Test
70+
void singleItemPublisherShouldErrorWhenSupplierErrors()
71+
{
72+
RuntimeException error = mock( RuntimeException.class );
73+
Publisher<String> publisher = createSingleItemPublisher( () -> failedFuture( error ), () -> mock( Throwable.class ) );
6474

65-
StepVerifier.create( mono ).verifyErrorMatches( Predicate.isEqual( error ) );
75+
StepVerifier.create( publisher ).verifyErrorMatches( actualError -> error == actualError );
6676
}
6777
}

0 commit comments

Comments
 (0)