Skip to content

Fix RxSession.beginTransaction returning an empty Publisher #934

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@
*/
package org.neo4j.driver.internal.reactive;

import org.neo4j.driver.Query;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

import java.util.Map;
import java.util.concurrent.CompletableFuture;

import org.neo4j.driver.AccessMode;
import org.neo4j.driver.TransactionConfig;
import org.neo4j.driver.Bookmark;
import org.neo4j.driver.Query;
import org.neo4j.driver.TransactionConfig;
import org.neo4j.driver.exceptions.TransactionNestingException;
import org.neo4j.driver.internal.async.NetworkSession;
import org.neo4j.driver.internal.cursor.RxResultCursor;
Expand All @@ -38,7 +38,7 @@
import org.neo4j.driver.reactive.RxTransactionWork;

import static org.neo4j.driver.internal.reactive.RxUtils.createEmptyPublisher;
import static org.neo4j.driver.internal.reactive.RxUtils.createMono;
import static org.neo4j.driver.internal.reactive.RxUtils.createSingleItemPublisher;

public class InternalRxSession extends AbstractRxQueryRunner implements RxSession
{
Expand All @@ -61,40 +61,46 @@ public Publisher<RxTransaction> beginTransaction()
@Override
public Publisher<RxTransaction> beginTransaction( TransactionConfig config )
{
return createMono( () ->
{
CompletableFuture<RxTransaction> txFuture = new CompletableFuture<>();
session.beginTransactionAsync( config ).whenComplete( ( tx, completionError ) -> {
if ( tx != null )
return createSingleItemPublisher(
() ->
{
txFuture.complete( new InternalRxTransaction( tx ) );
}
else
{
releaseConnectionBeforeReturning( txFuture, completionError );
}
} );
return txFuture;
} );
CompletableFuture<RxTransaction> txFuture = new CompletableFuture<>();
session.beginTransactionAsync( config ).whenComplete(
( tx, completionError ) ->
{
if ( tx != null )
{
txFuture.complete( new InternalRxTransaction( tx ) );
}
else
{
releaseConnectionBeforeReturning( txFuture, completionError );
}
} );
return txFuture;
}, () -> new IllegalStateException( "Unexpected condition, begin transaction call has completed successfully with transaction being null" ) );
}

private Publisher<RxTransaction> beginTransaction( AccessMode mode, TransactionConfig config )
{
return createMono( () ->
{
CompletableFuture<RxTransaction> txFuture = new CompletableFuture<>();
session.beginTransactionAsync( mode, config ).whenComplete( ( tx, completionError ) -> {
if ( tx != null )
return createSingleItemPublisher(
() ->
{
txFuture.complete( new InternalRxTransaction( tx ) );
}
else
{
releaseConnectionBeforeReturning( txFuture, completionError );
}
} );
return txFuture;
} );
CompletableFuture<RxTransaction> txFuture = new CompletableFuture<>();
session.beginTransactionAsync( mode, config ).whenComplete(
( tx, completionError ) ->
{
if ( tx != null )
{
txFuture.complete( new InternalRxTransaction( tx ) );
}
else
{
releaseConnectionBeforeReturning( txFuture, completionError );
}
} );
return txFuture;
}, () -> new IllegalStateException( "Unexpected condition, begin transaction call has completed successfully with transaction being null" ) );
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;

import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.function.Supplier;

Expand Down Expand Up @@ -49,27 +50,35 @@ public static <T> Publisher<T> createEmptyPublisher( Supplier<CompletionStage<Vo
}

/**
* Create a {@link Mono<T>} publisher from the given {@link CompletionStage<T>} supplier.
* @param supplier supplies a {@link CompletionStage<T>}.
* @param <T> the type of the item to publish.
* @return A {@link Mono<T>} publisher.
* The publisher created by this method will either succeed with exactly one item or fail with an error.
*
* @param supplier supplies a {@link CompletionStage<T>} that MUST produce a non-null result when completed successfully.
* @param nullResultThrowableSupplier supplies a {@link Throwable} that is used as an error when the supplied completion stage completes successfully with
* null.
* @param <T> the type of the item to publish.
* @return A publisher that succeeds exactly one item or fails with an error.
*/
public static <T> Publisher<T> createMono( Supplier<CompletionStage<T>> supplier )
public static <T> Publisher<T> createSingleItemPublisher( Supplier<CompletionStage<T>> supplier, Supplier<Throwable> nullResultThrowableSupplier )
{
return Mono.create( sink -> supplier.get().whenComplete( ( item, completionError ) -> {
Throwable error = Futures.completionExceptionCause( completionError );
if ( item != null )
{
sink.success( item );
}
if ( error != null )
{
sink.error( error );
}
else
{
sink.success();
}
} ) );
return Mono.create( sink -> supplier.get().whenComplete(
( item, completionError ) ->
{
if ( completionError == null )
{
if ( item != null )
{
sink.success( item );
}
else
{
sink.error( nullResultThrowableSupplier.get() );
}
}
else
{
Throwable error = Optional.ofNullable( Futures.completionExceptionCause( completionError ) ).orElse( completionError );
sink.error( error );
}
} ) );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,22 @@

import org.neo4j.driver.internal.util.Futures;

import static org.neo4j.driver.internal.util.Futures.failedFuture;
import static org.mockito.Mockito.mock;
import static org.neo4j.driver.internal.reactive.RxUtils.createEmptyPublisher;
import static org.neo4j.driver.internal.reactive.RxUtils.createMono;
import static org.neo4j.driver.internal.reactive.RxUtils.createSingleItemPublisher;
import static org.neo4j.driver.internal.util.Futures.failedFuture;

class RxUtilsTest
{
@Test
void emptyPublisherShouldComplete() throws Throwable
void emptyPublisherShouldComplete()
{
Publisher<Void> emptyPublisher = createEmptyPublisher( Futures::completedWithNull );
StepVerifier.create( emptyPublisher ).verifyComplete();
}

@Test
void emptyPublisherShouldErrorIfSupplierErrored() throws Throwable
void emptyPublisherShouldErrorWhenSupplierErrors()
{
RuntimeException error = new RuntimeException( "Error" );
Publisher<Void> emptyPublisher = createEmptyPublisher( () -> failedFuture( error ) );
Expand All @@ -50,18 +51,27 @@ void emptyPublisherShouldErrorIfSupplierErrored() throws Throwable
}

@Test
void monoPublisherShouldCompleteWithValue() throws Throwable
void singleItemPublisherShouldCompleteWithValue()
{
Publisher<String> mono = createMono( () -> CompletableFuture.completedFuture( "One" ) );
StepVerifier.create( mono ).expectNext( "One" ).verifyComplete();
Publisher<String> publisher = createSingleItemPublisher( () -> CompletableFuture.completedFuture( "One" ), () -> mock( Throwable.class ) );
StepVerifier.create( publisher ).expectNext( "One" ).verifyComplete();
}

@Test
void monoPublisherShouldErrorIfSupplierErrored() throws Throwable
void singleItemPublisherShouldErrorWhenFutureCompletesWithNull()
{
RuntimeException error = new RuntimeException( "Error" );
Publisher<String> mono = createMono( () -> failedFuture( error ) );
Throwable error = mock( Throwable.class );
Publisher<String> publisher = createSingleItemPublisher( Futures::completedWithNull, () -> error );

StepVerifier.create( publisher ).verifyErrorMatches( actualError -> error == actualError );
}

@Test
void singleItemPublisherShouldErrorWhenSupplierErrors()
{
RuntimeException error = mock( RuntimeException.class );
Publisher<String> publisher = createSingleItemPublisher( () -> failedFuture( error ), () -> mock( Throwable.class ) );

StepVerifier.create( mono ).verifyErrorMatches( Predicate.isEqual( error ) );
StepVerifier.create( publisher ).verifyErrorMatches( actualError -> error == actualError );
}
}