From e7a38189a35907c360557fb76ad78fdf4620370b Mon Sep 17 00:00:00 2001 From: Zhen Li Date: Wed, 17 Apr 2019 14:51:09 +0200 Subject: [PATCH] Renamed `RxResult` to `RxStatementResult` --- .../reactive/AbstractRxStatementRunner.java | 10 +-- .../internal/reactive/InternalRxSession.java | 12 +-- ...lt.java => InternalRxStatementResult.java} | 6 +- .../reactive/InternalRxTransaction.java | 6 +- .../org/neo4j/driver/reactive/RxSession.java | 8 +- .../{RxResult.java => RxStatementResult.java} | 2 +- .../driver/reactive/RxStatementRunner.java | 10 +-- .../integration/ConnectionHandlingIT.java | 12 +-- .../neo4j/driver/integration/SessionIT.java | 4 +- .../integration/reactive/RxSessionIT.java | 8 +- ...ResultIT.java => RxStatementResultIT.java} | 44 +++++----- .../integration/reactive/RxTransactionIT.java | 88 +++++++++---------- .../reactive/InternalRxSessionTest.java | 16 ++-- ...ava => InternalRxStatementResultTest.java} | 32 +++---- .../reactive/InternalRxTransactionTest.java | 16 ++-- .../org/neo4j/driver/stress/RxReadQuery.java | 4 +- .../neo4j/driver/stress/RxReadQueryInTx.java | 4 +- .../driver/stress/RxReadQueryWithRetries.java | 4 +- .../driver/RxTransactionFunctionExample.java | 6 +- 19 files changed, 146 insertions(+), 146 deletions(-) rename driver/src/main/java/org/neo4j/driver/internal/reactive/{InternalRxResult.java => InternalRxStatementResult.java} (95%) rename driver/src/main/java/org/neo4j/driver/reactive/{RxResult.java => RxStatementResult.java} (99%) rename driver/src/test/java/org/neo4j/driver/integration/reactive/{RxResultIT.java => RxStatementResultIT.java} (88%) rename driver/src/test/java/org/neo4j/driver/internal/reactive/{InternalRxResultTest.java => InternalRxStatementResultTest.java} (87%) diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/AbstractRxStatementRunner.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/AbstractRxStatementRunner.java index 59f2f6bf9d..a8f6b4558b 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactive/AbstractRxStatementRunner.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/AbstractRxStatementRunner.java @@ -20,7 +20,7 @@ import java.util.Map; -import org.neo4j.driver.reactive.RxResult; +import org.neo4j.driver.reactive.RxStatementResult; import org.neo4j.driver.reactive.RxStatementRunner; import org.neo4j.driver.Record; import org.neo4j.driver.Statement; @@ -31,25 +31,25 @@ public abstract class AbstractRxStatementRunner implements RxStatementRunner { @Override - public final RxResult run( String statementTemplate, Value parameters ) + public final RxStatementResult run( String statementTemplate, Value parameters ) { return run( new Statement( statementTemplate, parameters ) ); } @Override - public final RxResult run( String statementTemplate, Map statementParameters ) + public final RxStatementResult run( String statementTemplate, Map statementParameters ) { return run( statementTemplate, parameters( statementParameters ) ); } @Override - public final RxResult run( String statementTemplate, Record statementParameters ) + public final RxStatementResult run( String statementTemplate, Record statementParameters ) { return run( statementTemplate, parameters( statementParameters ) ); } @Override - public final RxResult run( String statementTemplate ) + public final RxStatementResult run( String statementTemplate ) { return run( new Statement( statementTemplate ) ); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java index fb1fdb4649..ea877d7243 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java @@ -30,7 +30,7 @@ import org.neo4j.driver.internal.async.NetworkSession; import org.neo4j.driver.internal.cursor.RxStatementResultCursor; import org.neo4j.driver.internal.util.Futures; -import org.neo4j.driver.reactive.RxResult; +import org.neo4j.driver.reactive.RxStatementResult; import org.neo4j.driver.reactive.RxSession; import org.neo4j.driver.reactive.RxTransaction; import org.neo4j.driver.reactive.RxTransactionWork; @@ -126,27 +126,27 @@ private Publisher runTransaction( AccessMode mode, RxTransactionWork parameters, TransactionConfig config ) + public RxStatementResult run( String statement, Map parameters, TransactionConfig config ) { return run( new Statement( statement, parameters ), config ); } @Override - public RxResult run( Statement statement ) + public RxStatementResult run( Statement statement ) { return run( statement, TransactionConfig.empty() ); } @Override - public RxResult run( Statement statement, TransactionConfig config ) + public RxStatementResult run( Statement statement, TransactionConfig config ) { - return new InternalRxResult( () -> { + return new InternalRxStatementResult( () -> { CompletableFuture resultCursorFuture = new CompletableFuture<>(); session.runRx( statement, config ).whenComplete( ( cursor, completionError ) -> { if ( cursor != null ) diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxResult.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxStatementResult.java similarity index 95% rename from driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxResult.java rename to driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxStatementResult.java index d24fa9b45d..2f2342b644 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxResult.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxStatementResult.java @@ -27,16 +27,16 @@ import org.neo4j.driver.Record; import org.neo4j.driver.internal.util.Futures; -import org.neo4j.driver.reactive.RxResult; +import org.neo4j.driver.reactive.RxStatementResult; import org.neo4j.driver.internal.cursor.RxStatementResultCursor; import org.neo4j.driver.summary.ResultSummary; -public class InternalRxResult implements RxResult +public class InternalRxStatementResult implements RxStatementResult { private Supplier> cursorFutureSupplier; private volatile CompletionStage cursorFuture; - public InternalRxResult( Supplier> cursorFuture ) + public InternalRxStatementResult( Supplier> cursorFuture ) { this.cursorFutureSupplier = cursorFuture; } diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxTransaction.java index 7a5322a477..8c7d888e2e 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxTransaction.java @@ -26,7 +26,7 @@ import org.neo4j.driver.internal.async.ExplicitTransaction; import org.neo4j.driver.internal.cursor.RxStatementResultCursor; import org.neo4j.driver.internal.util.Futures; -import org.neo4j.driver.reactive.RxResult; +import org.neo4j.driver.reactive.RxStatementResult; import org.neo4j.driver.reactive.RxTransaction; import static org.neo4j.driver.internal.reactive.RxUtils.createEmptyPublisher; @@ -41,9 +41,9 @@ public InternalRxTransaction( ExplicitTransaction tx ) } @Override - public RxResult run( Statement statement ) + public RxStatementResult run( Statement statement ) { - return new InternalRxResult( () -> { + return new InternalRxStatementResult( () -> { CompletableFuture cursorFuture = new CompletableFuture<>(); tx.runRx( statement ).whenComplete( ( cursor, completionError ) -> { if ( cursor != null ) diff --git a/driver/src/main/java/org/neo4j/driver/reactive/RxSession.java b/driver/src/main/java/org/neo4j/driver/reactive/RxSession.java index 9c04f060f0..69a394adee 100644 --- a/driver/src/main/java/org/neo4j/driver/reactive/RxSession.java +++ b/driver/src/main/java/org/neo4j/driver/reactive/RxSession.java @@ -32,7 +32,7 @@ /** * A reactive session is the same as {@link Session} except it provides a reactive API. * @see Session - * @see RxResult + * @see RxStatementResult * @see RxTransaction * @see Publisher * @since 2.0 @@ -158,7 +158,7 @@ public interface RxSession extends RxStatementRunner * @param config configuration for the new transaction. * @return a reactive result. */ - RxResult run( String statement, TransactionConfig config ); + RxStatementResult run( String statement, TransactionConfig config ); /** * Run a statement with parameters in an auto-commit transaction with specified {@link TransactionConfig} and return a reactive result stream. @@ -196,7 +196,7 @@ public interface RxSession extends RxStatementRunner * @param config configuration for the new transaction. * @return a reactive result. */ - RxResult run( String statement, Map parameters, TransactionConfig config ); + RxStatementResult run( String statement, Map parameters, TransactionConfig config ); /** * Run a statement in an auto-commit transaction with specified {@link TransactionConfig configuration} and return a reactive result stream. @@ -222,7 +222,7 @@ public interface RxSession extends RxStatementRunner * @param config configuration for the new transaction. * @return a reactive result. */ - RxResult run( Statement statement, TransactionConfig config ); + RxStatementResult run( Statement statement, TransactionConfig config ); /** * Return the bookmark received following the last completed statement within this session. diff --git a/driver/src/main/java/org/neo4j/driver/reactive/RxResult.java b/driver/src/main/java/org/neo4j/driver/reactive/RxStatementResult.java similarity index 99% rename from driver/src/main/java/org/neo4j/driver/reactive/RxResult.java rename to driver/src/main/java/org/neo4j/driver/reactive/RxStatementResult.java index 2ccf52a9ee..e1456e251e 100644 --- a/driver/src/main/java/org/neo4j/driver/reactive/RxResult.java +++ b/driver/src/main/java/org/neo4j/driver/reactive/RxStatementResult.java @@ -38,7 +38,7 @@ * @see Subscription * @since 2.0 */ -public interface RxResult +public interface RxStatementResult { /** * Returns a cold publisher of keys. diff --git a/driver/src/main/java/org/neo4j/driver/reactive/RxStatementRunner.java b/driver/src/main/java/org/neo4j/driver/reactive/RxStatementRunner.java index f7c97f9652..85bdfa7730 100644 --- a/driver/src/main/java/org/neo4j/driver/reactive/RxStatementRunner.java +++ b/driver/src/main/java/org/neo4j/driver/reactive/RxStatementRunner.java @@ -54,7 +54,7 @@ public interface RxStatementRunner * @param parameters input parameters, should be a map Value, see {@link Values#parameters(Object...)}. * @return a reactive result. */ - RxResult run( String statementTemplate, Value parameters ); + RxStatementResult run( String statementTemplate, Value parameters ); /** * Register running of a statement and return a reactive result stream. @@ -74,7 +74,7 @@ public interface RxStatementRunner * @param statementParameters input data for the statement * @return a reactive result. */ - RxResult run( String statementTemplate, Map statementParameters ); + RxStatementResult run( String statementTemplate, Map statementParameters ); /** * Register running of a statement and return a reactive result stream. @@ -93,7 +93,7 @@ public interface RxStatementRunner * @param statementParameters input data for the statement * @return a reactive result. */ - RxResult run( String statementTemplate, Record statementParameters ); + RxStatementResult run( String statementTemplate, Record statementParameters ); /** * Register running of a statement and return a reactive result stream. @@ -103,7 +103,7 @@ public interface RxStatementRunner * @param statementTemplate text of a Neo4j statement * @return a reactive result. */ - RxResult run( String statementTemplate ); + RxStatementResult run( String statementTemplate ); /** * Register running of a statement and return a reactive result stream. @@ -113,5 +113,5 @@ public interface RxStatementRunner * @param statement a Neo4j statement * @return a reactive result. */ - RxResult run( Statement statement ); + RxStatementResult run( Statement statement ); } diff --git a/driver/src/test/java/org/neo4j/driver/integration/ConnectionHandlingIT.java b/driver/src/test/java/org/neo4j/driver/integration/ConnectionHandlingIT.java index 863394a2ff..4d2a86fdcc 100644 --- a/driver/src/test/java/org/neo4j/driver/integration/ConnectionHandlingIT.java +++ b/driver/src/test/java/org/neo4j/driver/integration/ConnectionHandlingIT.java @@ -47,7 +47,7 @@ import org.neo4j.driver.internal.spi.ConnectionPool; import org.neo4j.driver.internal.util.Clock; import org.neo4j.driver.internal.util.EnabledOnNeo4jWith; -import org.neo4j.driver.reactive.RxResult; +import org.neo4j.driver.reactive.RxStatementResult; import org.neo4j.driver.reactive.RxSession; import org.neo4j.driver.reactive.RxTransaction; import org.neo4j.driver.AuthToken; @@ -313,7 +313,7 @@ void connectionUsedForBeginTxReturnedToThePoolWhenSessionClose() void sessionCloseShouldReleaseConnectionUsedBySessionRun() throws Throwable { RxSession session = driver.rxSession(); - RxResult res = session.run( "UNWIND [1,2,3,4] AS a RETURN a" ); + RxStatementResult res = session.run( "UNWIND [1,2,3,4] AS a RETURN a" ); // When we only run but not pull StepVerifier.create( Flux.from( res.keys() ) ).expectNext( "a" ).verifyComplete(); @@ -332,7 +332,7 @@ void sessionCloseShouldReleaseConnectionUsedBySessionRun() throws Throwable void resultRecordsShouldReleaseConnectionUsedBySessionRun() throws Throwable { RxSession session = driver.rxSession(); - RxResult res = session.run( "UNWIND [1,2,3,4] AS a RETURN a" ); + RxStatementResult res = session.run( "UNWIND [1,2,3,4] AS a RETURN a" ); Connection connection1 = connectionPool.lastAcquiredConnectionSpy; assertNull( connection1 ); @@ -350,7 +350,7 @@ void resultRecordsShouldReleaseConnectionUsedBySessionRun() throws Throwable void resultSummaryShouldReleaseConnectionUsedBySessionRun() throws Throwable { RxSession session = driver.rxSession(); - RxResult res = session.run( "UNWIND [1,2,3,4] AS a RETURN a" ); + RxStatementResult res = session.run( "UNWIND [1,2,3,4] AS a RETURN a" ); Connection connection1 = connectionPool.lastAcquiredConnectionSpy; assertNull( connection1 ); @@ -371,7 +371,7 @@ void txCommitShouldReleaseConnectionUsedByBeginTx() throws Throwable Connection connection1 = connectionPool.lastAcquiredConnectionSpy; verify( connection1, never() ).release(); - RxResult result = tx.run( "UNWIND [1,2,3,4] AS a RETURN a" ); + RxStatementResult result = tx.run( "UNWIND [1,2,3,4] AS a RETURN a" ); StepVerifier.create( Flux.from( result.records() ).map( record -> record.get( "a" ).asInt() ) ) .expectNext( 1, 2, 3, 4 ).verifyComplete(); @@ -393,7 +393,7 @@ void txRollbackShouldReleaseConnectionUsedByBeginTx() throws Throwable Connection connection1 = connectionPool.lastAcquiredConnectionSpy; verify( connection1, never() ).release(); - RxResult result = tx.run( "UNWIND [1,2,3,4] AS a RETURN a" ); + RxStatementResult result = tx.run( "UNWIND [1,2,3,4] AS a RETURN a" ); StepVerifier.create( Flux.from( result.records() ).map( record -> record.get( "a" ).asInt() ) ) .expectNext( 1, 2, 3, 4 ).verifyComplete(); diff --git a/driver/src/test/java/org/neo4j/driver/integration/SessionIT.java b/driver/src/test/java/org/neo4j/driver/integration/SessionIT.java index 2e5ce53a3f..d20c46236e 100644 --- a/driver/src/test/java/org/neo4j/driver/integration/SessionIT.java +++ b/driver/src/test/java/org/neo4j/driver/integration/SessionIT.java @@ -59,7 +59,7 @@ import org.neo4j.driver.internal.util.DriverFactoryWithFixedRetryLogic; import org.neo4j.driver.internal.util.DriverFactoryWithOneEventLoopThread; import org.neo4j.driver.internal.util.EnabledOnNeo4jWith; -import org.neo4j.driver.reactive.RxResult; +import org.neo4j.driver.reactive.RxStatementResult; import org.neo4j.driver.reactive.RxSession; import org.neo4j.driver.summary.ResultSummary; import org.neo4j.driver.summary.StatementType; @@ -1208,7 +1208,7 @@ void shouldErrorWhenTryingToUseRxAPIWithoutBoltV4() throws Throwable { // Given RxSession session = neo4j.driver().rxSession(); - RxResult result = session.run( "RETURN 1" ); + RxStatementResult result = session.run( "RETURN 1" ); // When trying to run the query on a server that is using a protocol that is lower than V4 StepVerifier.create( result.records() ).expectErrorSatisfies( error -> { diff --git a/driver/src/test/java/org/neo4j/driver/integration/reactive/RxSessionIT.java b/driver/src/test/java/org/neo4j/driver/integration/reactive/RxSessionIT.java index 0f5531a802..34db8ede54 100644 --- a/driver/src/test/java/org/neo4j/driver/integration/reactive/RxSessionIT.java +++ b/driver/src/test/java/org/neo4j/driver/integration/reactive/RxSessionIT.java @@ -38,7 +38,7 @@ import org.neo4j.driver.exceptions.SessionExpiredException; import org.neo4j.driver.exceptions.TransientException; import org.neo4j.driver.internal.util.EnabledOnNeo4jWith; -import org.neo4j.driver.reactive.RxResult; +import org.neo4j.driver.reactive.RxStatementResult; import org.neo4j.driver.reactive.RxSession; import org.neo4j.driver.reactive.RxTransaction; import org.neo4j.driver.reactive.RxTransactionWork; @@ -65,7 +65,7 @@ void shouldAllowSessionRun() { // When RxSession session = neo4j.driver().rxSession(); - RxResult res = session.run( "UNWIND [1,2,3,4] AS a RETURN a" ); + RxStatementResult res = session.run( "UNWIND [1,2,3,4] AS a RETURN a" ); // Then I should be able to iterate over the result StepVerifier.create( Flux.from( res.records() ).map( r -> r.get( "a" ).asInt() ) ) @@ -82,12 +82,12 @@ void shouldBeAbleToReuseSessionAfterFailure() { // Given RxSession session = neo4j.driver().rxSession(); - RxResult res1 = session.run( "INVALID" ); + RxStatementResult res1 = session.run( "INVALID" ); StepVerifier.create( res1.records() ).expectError( ClientException.class ).verify(); // When - RxResult res2 = session.run( "RETURN 1" ); + RxStatementResult res2 = session.run( "RETURN 1" ); // Then StepVerifier.create( res2.records() ).assertNext( record -> { diff --git a/driver/src/test/java/org/neo4j/driver/integration/reactive/RxResultIT.java b/driver/src/test/java/org/neo4j/driver/integration/reactive/RxStatementResultIT.java similarity index 88% rename from driver/src/test/java/org/neo4j/driver/integration/reactive/RxResultIT.java rename to driver/src/test/java/org/neo4j/driver/integration/reactive/RxStatementResultIT.java index 45713309bf..3391849450 100644 --- a/driver/src/test/java/org/neo4j/driver/integration/reactive/RxResultIT.java +++ b/driver/src/test/java/org/neo4j/driver/integration/reactive/RxStatementResultIT.java @@ -27,7 +27,7 @@ import org.neo4j.driver.Record; import org.neo4j.driver.exceptions.ClientException; import org.neo4j.driver.internal.util.EnabledOnNeo4jWith; -import org.neo4j.driver.reactive.RxResult; +import org.neo4j.driver.reactive.RxStatementResult; import org.neo4j.driver.reactive.RxSession; import org.neo4j.driver.summary.ResultSummary; import org.neo4j.driver.summary.StatementType; @@ -47,7 +47,7 @@ @EnabledOnNeo4jWith( BOLT_V4 ) @ParallelizableIT -class RxResultIT +class RxStatementResultIT { @RegisterExtension static final DatabaseExtension neo4j = new DatabaseExtension(); @@ -56,7 +56,7 @@ class RxResultIT void shouldAllowIteratingOverResultStream() { // When - RxResult res = sessionRunUnwind(); + RxStatementResult res = sessionRunUnwind(); // Then I should be able to iterate over the result verifyCanAccessFullRecords( res ); @@ -68,7 +68,7 @@ void shouldAllowIteratingOverLargeResultStream() // When int size = 100000; RxSession session = neo4j.driver().rxSession(); - RxResult res = session.run( "UNWIND range(1, $size) AS x RETURN x", parameters( "size", size ) ); + RxStatementResult res = session.run( "UNWIND range(1, $size) AS x RETURN x", parameters( "size", size ) ); // Then I should be able to iterate over the result StepVerifier.FirstStep step = StepVerifier.create( Flux.from( res.records() ).limitRate( 100 ).map( r -> r.get( "x" ).asInt() ) ); @@ -84,7 +84,7 @@ void shouldAllowIteratingOverLargeResultStream() void shouldReturnKeysRecordsAndSummaryInOrder() { // When - RxResult res = sessionRunUnwind(); + RxStatementResult res = sessionRunUnwind(); // Then I should be able to iterate over the result verifyCanAccessKeys( res ); @@ -96,7 +96,7 @@ void shouldReturnKeysRecordsAndSummaryInOrder() void shouldSecondVisitOfRecordReceiveEmptyRecordStream() throws Throwable { // When - RxResult res = sessionRunUnwind(); + RxStatementResult res = sessionRunUnwind(); // Then I should be able to iterate over the result verifyCanAccessFullRecords( res ); @@ -108,7 +108,7 @@ void shouldSecondVisitOfRecordReceiveEmptyRecordStream() throws Throwable void shouldReturnKeysSummaryAndDiscardRecords() { // When - RxResult res = sessionRunUnwind(); + RxStatementResult res = sessionRunUnwind(); verifyCanAccessKeys( res ); verifyCanAccessSummary( res ); @@ -119,7 +119,7 @@ void shouldReturnKeysSummaryAndDiscardRecords() void shouldAllowOnlySummary() { // When - RxResult res = sessionRunUnwind(); + RxStatementResult res = sessionRunUnwind(); verifyCanAccessSummary( res ); } @@ -128,7 +128,7 @@ void shouldAllowOnlySummary() void shouldAllowAccessKeysAndSummaryAfterRecord() throws Throwable { // Given - RxResult res = sessionRunUnwind(); + RxStatementResult res = sessionRunUnwind(); // Then I should be able to iterate over the result verifyCanAccessFullRecords( res ); @@ -147,7 +147,7 @@ void shouldGiveHelpfulFailureMessageWhenAccessNonExistingField() { // Given RxSession session = neo4j.driver().rxSession(); - RxResult rs = + RxStatementResult rs = session.run( "CREATE (n:Person {name:{name}}) RETURN n", parameters( "name", "Tom Hanks" ) ); // When @@ -162,7 +162,7 @@ void shouldGiveHelpfulFailureMessageWhenAccessNonExistingPropertyOnNode() { // Given RxSession session = neo4j.driver().rxSession(); - RxResult rs = + RxStatementResult rs = session.run( "CREATE (n:Person {name:{name}}) RETURN n", parameters( "name", "Tom Hanks" ) ); // When @@ -177,7 +177,7 @@ void shouldHaveFieldNamesInResult() { // When RxSession session = neo4j.driver().rxSession(); - RxResult res = session.run( "CREATE (n:TestNode {name:'test'}) RETURN n" ); + RxStatementResult res = session.run( "CREATE (n:TestNode {name:'test'}) RETURN n" ); // Then StepVerifier.create( res.keys() ).expectNext( "n" ).expectComplete().verify(); @@ -194,7 +194,7 @@ void shouldReturnEmptyKeyAndRecordOnEmptyResult() { // Given RxSession session = neo4j.driver().rxSession(); - RxResult rs = session.run( "CREATE (n:Person {name:{name}})", parameters( "name", "Tom Hanks" ) ); + RxStatementResult rs = session.run( "CREATE (n:Person {name:{name}})", parameters( "name", "Tom Hanks" ) ); // Then StepVerifier.create( rs.keys() ).expectComplete().verify(); @@ -206,7 +206,7 @@ void shouldOnlyErrorRecordAfterFailure() { // Given RxSession session = neo4j.driver().rxSession(); - RxResult result = session.run( "INVALID" ); + RxStatementResult result = session.run( "INVALID" ); // When Flux keys = Flux.from( result.keys() ); @@ -235,7 +235,7 @@ void shouldErrorOnSummaryIfNoRecord() throws Throwable { // Given RxSession session = neo4j.driver().rxSession(); - RxResult result = session.run( "INVALID" ); + RxStatementResult result = session.run( "INVALID" ); // When Flux keys = Flux.from( result.keys() ); @@ -261,7 +261,7 @@ void shouldDiscardRecords() { // Given RxSession session = neo4j.driver().rxSession(); - RxResult result = session.run("UNWIND [1,2] AS a RETURN a"); + RxStatementResult result = session.run("UNWIND [1,2] AS a RETURN a"); // When StepVerifier.create( Flux.from( result.records() ) @@ -285,7 +285,7 @@ void shouldStreamCorrectRecordsBackBeforeError() { RxSession session = neo4j.driver().rxSession(); - RxResult result = session.run( "UNWIND range(5, 0, -1) AS x RETURN x / x" ); + RxStatementResult result = session.run( "UNWIND range(5, 0, -1) AS x RETURN x / x" ); StepVerifier.create( Flux.from( result.records() ).map( record -> record.get( 0 ).asInt() ) ) .expectNext( 1 ) .expectNext( 1 ) @@ -298,7 +298,7 @@ void shouldStreamCorrectRecordsBackBeforeError() .verify(); } - private void verifyCanAccessSummary( RxResult res ) + private void verifyCanAccessSummary( RxStatementResult res ) { StepVerifier.create( res.summary() ).assertNext( summary -> { assertThat( summary.statement().text(), equalTo( "UNWIND [1,2,3,4] AS a RETURN a" ) ); @@ -307,25 +307,25 @@ private void verifyCanAccessSummary( RxResult res ) } ).verifyComplete(); } - private void verifyRecordsAlreadyDiscarded( RxResult res ) + private void verifyRecordsAlreadyDiscarded( RxStatementResult res ) { StepVerifier.create( Flux.from( res.records() ).map( r -> r.get( "a" ).asInt() ) ) .expectComplete() .verify(); } - private void verifyCanAccessFullRecords( RxResult res ) + private void verifyCanAccessFullRecords( RxStatementResult res ) { StepVerifier.create( Flux.from( res.records() ).map( r -> r.get( "a" ).asInt() ) ).expectNext( 1 ).expectNext( 2 ).expectNext( 3 ).expectNext( 4 ).expectComplete().verify(); } - private void verifyCanAccessKeys( RxResult res ) + private void verifyCanAccessKeys( RxStatementResult res ) { StepVerifier.create( res.keys() ).expectNext( "a" ).verifyComplete(); } - private RxResult sessionRunUnwind() + private RxStatementResult sessionRunUnwind() { RxSession session = neo4j.driver().rxSession(); return session.run( "UNWIND [1,2,3,4] AS a RETURN a" ); diff --git a/driver/src/test/java/org/neo4j/driver/integration/reactive/RxTransactionIT.java b/driver/src/test/java/org/neo4j/driver/integration/reactive/RxTransactionIT.java index b2e1ead44f..3b635e92fb 100644 --- a/driver/src/test/java/org/neo4j/driver/integration/reactive/RxTransactionIT.java +++ b/driver/src/test/java/org/neo4j/driver/integration/reactive/RxTransactionIT.java @@ -44,7 +44,7 @@ import org.neo4j.driver.exceptions.ClientException; import org.neo4j.driver.exceptions.ServiceUnavailableException; import org.neo4j.driver.internal.util.EnabledOnNeo4jWith; -import org.neo4j.driver.reactive.RxResult; +import org.neo4j.driver.reactive.RxStatementResult; import org.neo4j.driver.reactive.RxSession; import org.neo4j.driver.reactive.RxTransaction; import org.neo4j.driver.summary.ResultSummary; @@ -144,13 +144,13 @@ void shouldBePossibleToRunMultipleStatements( boolean commit ) { RxTransaction tx = await( Mono.from( session.beginTransaction() ) ); - RxResult cursor1 = tx.run( "CREATE (n:Node {id: 1})" ); + RxStatementResult cursor1 = tx.run( "CREATE (n:Node {id: 1})" ); await( cursor1.records() ); - RxResult cursor2 = tx.run( "CREATE (n:Node {id: 2})" ); + RxStatementResult cursor2 = tx.run( "CREATE (n:Node {id: 2})" ); await( cursor2.records() ); - RxResult cursor3 = tx.run( "CREATE (n:Node {id: 1})" ); + RxStatementResult cursor3 = tx.run( "CREATE (n:Node {id: 1})" ); await( cursor3.records() ); assertCanCommitOrRollback( commit, tx ); @@ -164,9 +164,9 @@ void shouldBePossibleToRunMultipleStatementsWithoutWaiting( boolean commit ) { RxTransaction tx = await( Mono.from( session.beginTransaction() ) ); - RxResult cursor1 = tx.run( "CREATE (n:Node {id: 1})" ); - RxResult cursor2 = tx.run( "CREATE (n:Node {id: 2})" ); - RxResult cursor3 = tx.run( "CREATE (n:Node {id: 1})" ); + RxStatementResult cursor1 = tx.run( "CREATE (n:Node {id: 1})" ); + RxStatementResult cursor2 = tx.run( "CREATE (n:Node {id: 2})" ); + RxStatementResult cursor3 = tx.run( "CREATE (n:Node {id: 1})" ); await( Flux.from( cursor1.records() ).concatWith( cursor2.records() ).concatWith( cursor3.records() ) ); assertCanCommitOrRollback( commit, tx ); @@ -180,9 +180,9 @@ void shouldBePossibleToRunMultipleStatementsWithoutStreaming( boolean commit ) { RxTransaction tx = await( Mono.from( session.beginTransaction() ) ); - RxResult cursor1 = tx.run( "CREATE (n:Node {id: 1})" ); - RxResult cursor2 = tx.run( "CREATE (n:Node {id: 2})" ); - RxResult cursor3 = tx.run( "CREATE (n:Node {id: 1})" ); + RxStatementResult cursor1 = tx.run( "CREATE (n:Node {id: 1})" ); + RxStatementResult cursor2 = tx.run( "CREATE (n:Node {id: 2})" ); + RxStatementResult cursor3 = tx.run( "CREATE (n:Node {id: 1})" ); await( Flux.from( cursor1.keys() ).concatWith( cursor2.keys() ).concatWith( cursor3.keys() ) ); assertCanCommitOrRollback( commit, tx ); @@ -236,7 +236,7 @@ void shouldNotAllowNewStatementsAfterAnIncorrectStatement() RxTransaction tx = await( Mono.from( session.beginTransaction() ) ); assertFailToRunWrongStatement( tx ); - RxResult result = tx.run( "CREATE ()" ); + RxStatementResult result = tx.run( "CREATE ()" ); Exception e = assertThrows( Exception.class, () -> await( result.records() ) ); assertThat( e.getMessage(), startsWith( "Cannot run more statements in this transaction" ) ); @@ -317,7 +317,7 @@ void shouldAllowRollbackAfterFailedCommit() void shouldExposeStatementKeysForColumnsWithAliases() { RxTransaction tx = await( Mono.from( session.beginTransaction() ) ); - RxResult result = tx.run( "RETURN 1 AS one, 2 AS two, 3 AS three, 4 AS five" ); + RxStatementResult result = tx.run( "RETURN 1 AS one, 2 AS two, 3 AS three, 4 AS five" ); List keys = await( result.keys() ); assertEquals( Arrays.asList( "one", "two", "three", "five" ), keys ); @@ -329,7 +329,7 @@ void shouldExposeStatementKeysForColumnsWithAliases() void shouldExposeStatementKeysForColumnsWithoutAliases() { RxTransaction tx = await( Mono.from( session.beginTransaction() ) ); - RxResult result = tx.run( "RETURN 1, 2, 3, 5" ); + RxStatementResult result = tx.run( "RETURN 1, 2, 3, 5" ); List keys = await( result.keys() ); assertEquals( Arrays.asList( "1", "2", "3", "5" ), keys ); @@ -344,7 +344,7 @@ void shouldExposeResultSummaryForSimpleQuery() String query = "CREATE (p1:Person {name: $name1})-[:KNOWS]->(p2:Person {name: $name2}) RETURN p1, p2"; Value params = parameters( "name1", "Bob", "name2", "John" ); - RxResult result = tx.run( query, params ); + RxStatementResult result = tx.run( query, params ); await( result.records() ); // we run and stream ResultSummary summary = await( Mono.from( result.summary() ) ); @@ -372,7 +372,7 @@ void shouldExposeResultSummaryForExplainQuery() RxTransaction tx = await( Mono.from( session.beginTransaction() ) ); String query = "EXPLAIN MATCH (n) RETURN n"; - RxResult result = tx.run( query ); + RxStatementResult result = tx.run( query ); await( result.records() ); // we run and stream ResultSummary summary = await( Mono.from( result.summary() ) ); @@ -404,7 +404,7 @@ void shouldExposeResultSummaryForProfileQuery() Value params = parameters( "name", "Bob" ); - RxResult result = tx.run( query, params ); + RxStatementResult result = tx.run( query, params ); await( result.records() ); // we run and stream ResultSummary summary = await( Mono.from( result.summary() ) ); @@ -432,7 +432,7 @@ void shouldExposeResultSummaryForProfileQuery() void shouldCancelRecordStream() { RxTransaction tx = await( Mono.from( session.beginTransaction() ) ); - RxResult result = tx.run( "UNWIND ['a', 'b', 'c'] AS x RETURN x" ); + RxStatementResult result = tx.run( "UNWIND ['a', 'b', 'c'] AS x RETURN x" ); Flux abc = Flux.from( result.records() ).limitRate( 1 ).take( 1 ).map( record -> record.get( 0 ).asString() ); StepVerifier.create( abc ).expectNext( "a" ).verifyComplete(); @@ -481,7 +481,7 @@ void shouldConvertToListWithNonEmptyCursor() void shouldConvertToTransformedListWithEmptyCursor() { RxTransaction tx = await( Mono.from( session.beginTransaction() ) ); - RxResult result = tx.run( "CREATE ()" ); + RxStatementResult result = tx.run( "CREATE ()" ); List> maps = await( Flux.from( result.records() ).map( record -> record.get( 0 ).asMap() ) ); assertEquals( 0, maps.size() ); assertCanRollback( tx ); @@ -491,7 +491,7 @@ void shouldConvertToTransformedListWithEmptyCursor() void shouldConvertToTransformedListWithNonEmptyCursor() { RxTransaction tx = await( Mono.from( session.beginTransaction() ) ); - RxResult result = tx.run( "UNWIND ['a', 'b', 'c'] AS x RETURN x" ); + RxStatementResult result = tx.run( "UNWIND ['a', 'b', 'c'] AS x RETURN x" ); List strings = await( Flux.from( result.records() ).map( record -> record.get( 0 ).asString() + "!" ) ); assertEquals( Arrays.asList( "a!", "b!", "c!" ), strings ); @@ -516,7 +516,7 @@ void shouldFailWhenListTransformationFunctionFails() void shouldFailToCommitWhenServerIsRestarted() { RxTransaction tx = await( Mono.from( session.beginTransaction() ) ); - RxResult result = tx.run( "RETURN 1" ); + RxStatementResult result = tx.run( "RETURN 1" ); assertThrows( ServiceUnavailableException.class, () -> { await( Flux.from( result.records() ).doOnSubscribe( subscription -> { @@ -532,7 +532,7 @@ void shouldFailToCommitWhenServerIsRestarted() void shouldFailSingleWithEmptyCursor() { RxTransaction tx = await( Mono.from( session.beginTransaction() ) ); - RxResult result = tx.run( "MATCH (n:NoSuchLabel) RETURN n" ); + RxStatementResult result = tx.run( "MATCH (n:NoSuchLabel) RETURN n" ); NoSuchElementException e = assertThrows( NoSuchElementException.class, () -> await( Flux.from( result.records() ).single() ) ); assertThat( e.getMessage(), containsString( "Source was empty" ) ); @@ -543,7 +543,7 @@ void shouldFailSingleWithEmptyCursor() void shouldFailSingleWithMultiRecordCursor() { RxTransaction tx = await( Mono.from( session.beginTransaction() ) ); - RxResult result = tx.run( "UNWIND ['a', 'b'] AS x RETURN x" ); + RxStatementResult result = tx.run( "UNWIND ['a', 'b'] AS x RETURN x" ); IndexOutOfBoundsException e = assertThrows( IndexOutOfBoundsException.class, () -> await( Flux.from( result.records() ).single() ) ); assertThat( e.getMessage(), startsWith( "Source emitted more than one item" ) ); @@ -554,7 +554,7 @@ void shouldFailSingleWithMultiRecordCursor() void shouldReturnSingleWithSingleRecordCursor() { RxTransaction tx = await( Mono.from( session.beginTransaction() ) ); - RxResult result = tx.run( "RETURN 'Hello!'" ); + RxStatementResult result = tx.run( "RETURN 'Hello!'" ); Record record = await( Flux.from( result.records() ).single() ); assertEquals( "Hello!", record.get( 0 ).asString() ); @@ -565,7 +565,7 @@ void shouldReturnSingleWithSingleRecordCursor() void shouldPropagateFailureFromFirstRecordInSingleAsync() { RxTransaction tx = await( Mono.from( session.beginTransaction() ) ); - RxResult result = tx.run( "UNWIND [0] AS x RETURN 10 / x" ); + RxStatementResult result = tx.run( "UNWIND [0] AS x RETURN 10 / x" ); ClientException e = assertThrows( ClientException.class, () -> await( Flux.from( result.records() ).single() ) ); assertThat( e.getMessage(), containsString( "/ by zero" ) ); @@ -576,7 +576,7 @@ void shouldPropagateFailureFromFirstRecordInSingleAsync() void shouldPropagateFailureFromSecondRecordInSingleAsync() { RxTransaction tx = await( Mono.from( session.beginTransaction() ) ); - RxResult result = tx.run( "UNWIND [1, 0] AS x RETURN 10 / x" ); + RxStatementResult result = tx.run( "UNWIND [1, 0] AS x RETURN 10 / x" ); ClientException e = assertThrows( ClientException.class, () -> await( Flux.from( result.records() ).single() ) ); assertThat( e.getMessage(), containsString( "/ by zero" ) ); @@ -661,7 +661,7 @@ void shouldRollbackAfterTermination() void shouldFailToRunQueryAfterCommit( boolean commit ) { RxTransaction tx = await( Mono.from( session.beginTransaction() ) ); - RxResult result = tx.run( "CREATE (:MyLabel)" ); + RxStatementResult result = tx.run( "CREATE (:MyLabel)" ); await( result.records() ); assertCanCommitOrRollback( commit, tx ); @@ -713,10 +713,10 @@ void shouldFailToCommitWhenQueriesFailAndErrorNotConsumed() throws InterruptedEx { RxTransaction tx = await( Mono.from( session.beginTransaction() ) ); - RxResult result1 = tx.run( "CREATE (:TestNode)" ); - RxResult result2 = tx.run( "CREATE (:TestNode)" ); - RxResult result3 = tx.run( "RETURN 10 / 0" ); - RxResult result4 = tx.run( "CREATE (:TestNode)" ); + RxStatementResult result1 = tx.run( "CREATE (:TestNode)" ); + RxStatementResult result2 = tx.run( "CREATE (:TestNode)" ); + RxStatementResult result3 = tx.run( "RETURN 10 / 0" ); + RxStatementResult result4 = tx.run( "CREATE (:TestNode)" ); Flux records = Flux.from( result1.records() ).concatWith( result2.records() ).concatWith( result3.records() ).concatWith( result4.records() ); @@ -730,10 +730,10 @@ void shouldNotRunUntilPublisherIsConnected() throws Throwable { RxTransaction tx = await( Mono.from( session.beginTransaction() ) ); - RxResult result1 = tx.run( "RETURN 1" ); - RxResult result2 = tx.run( "RETURN 2" ); - RxResult result3 = tx.run( "RETURN 3" ); - RxResult result4 = tx.run( "RETURN 4" ); + RxStatementResult result1 = tx.run( "RETURN 1" ); + RxStatementResult result2 = tx.run( "RETURN 2" ); + RxStatementResult result3 = tx.run( "RETURN 3" ); + RxStatementResult result4 = tx.run( "RETURN 4" ); Flux records = Flux.from( result4.records() ).concatWith( result3.records() ).concatWith( result2.records() ).concatWith( result1.records() ); @@ -761,7 +761,7 @@ void shouldNotPropagateRunFailureIfNotExecuted( boolean commit ) void shouldPropagateRunFailureOnRecord() { RxTransaction tx = await( Mono.from( session.beginTransaction() ) ); - RxResult result = tx.run( "RETURN 42 / 0" ); + RxStatementResult result = tx.run( "RETURN 42 / 0" ); await( result.keys() ); // always returns keys ClientException e = assertThrows( ClientException.class, () -> await( result.records() ) ); @@ -773,7 +773,7 @@ void shouldPropagateRunFailureOnRecord() void shouldFailToCommitWhenPullAllFailureIsConsumed() { RxTransaction tx = await( Mono.from( session.beginTransaction() ) ); - RxResult result = tx.run( "FOREACH (value IN [1,2, 'aaa'] | CREATE (:Person {name: 10 / value}))" ); + RxStatementResult result = tx.run( "FOREACH (value IN [1,2, 'aaa'] | CREATE (:Person {name: 10 / value}))" ); ClientException e1 = assertThrows( ClientException.class, () -> await( result.records() ) ); assertThat( e1.code(), containsString( "TypeError" ) ); @@ -789,7 +789,7 @@ void shouldNotPropagateRunFailureFromSummary() { RxTransaction tx = await( Mono.from( session.beginTransaction() ) ); - RxResult result = tx.run( "RETURN Wrong" ); + RxStatementResult result = tx.run( "RETURN Wrong" ); ClientException e = assertThrows( ClientException.class, () -> await( result.records() ) ); assertThat( e.code(), containsString( "SyntaxError" ) ); @@ -804,10 +804,10 @@ void shouldHandleNestedQueries() throws Throwable Flux nodeIds = Flux.usingWhen( session.beginTransaction(), tx -> { - RxResult result = tx.run( "UNWIND range(1, $size) AS x RETURN x", Collections.singletonMap( "size", size ) ); + RxStatementResult result = tx.run( "UNWIND range(1, $size) AS x RETURN x", Collections.singletonMap( "size", size ) ); return Flux.from( result.records() ).limitRate( 20 ).flatMap( record -> { int x = record.get( "x" ).asInt(); - RxResult innerResult = tx.run( "CREATE (n:Node {id: $x}) RETURN n.id", Collections.singletonMap( "x", x ) ); + RxStatementResult innerResult = tx.run( "CREATE (n:Node {id: $x}) RETURN n.id", Collections.singletonMap( "x", x ) ); return innerResult.records(); } ).map( record -> record.get( 0 ).asInt() ); }, @@ -819,7 +819,7 @@ void shouldHandleNestedQueries() throws Throwable private int countNodes( Object id ) { - RxResult result = session.run( "MATCH (n:Node {id: $id}) RETURN count(n)", parameters( "id", id ) ); + RxStatementResult result = session.run( "MATCH (n:Node {id: $id}) RETURN count(n)", parameters( "id", id ) ); return await( Flux.from( result.records() ).single().map( record -> record.get( 0 ).asInt() ) ); } @@ -827,7 +827,7 @@ private void testForEach( String query, int expectedSeenRecords ) { Flux summary = Flux.usingWhen( session.beginTransaction(), tx -> { - RxResult result = tx.run( query ); + RxStatementResult result = tx.run( query ); AtomicInteger recordsSeen = new AtomicInteger(); return Flux.from( result.records() ) .doOnNext( record -> recordsSeen.incrementAndGet() ) @@ -916,7 +916,7 @@ private static Stream commit() private static void assertCanRunCreate( RxTransaction tx ) { - RxResult result = tx.run( "CREATE (n:Node {id: 4242}) RETURN n" ); + RxStatementResult result = tx.run( "CREATE (n:Node {id: 4242}) RETURN n" ); Record record = await( Flux.from(result.records()).single() ); @@ -927,14 +927,14 @@ private static void assertCanRunCreate( RxTransaction tx ) private static void assertFailToRunWrongStatement( RxTransaction tx ) { - RxResult result = tx.run( "RETURN" ); + RxStatementResult result = tx.run( "RETURN" ); Exception e = assertThrows( Exception.class, () -> await( result.records() ) ); assertThat( e, is( syntaxError( "Unexpected end of input" ) ) ); } private void assertCanRunReturnOne( RxTransaction tx ) { - RxResult result = tx.run( "RETURN 42" ); + RxStatementResult result = tx.run( "RETURN 42" ); List records = await( result.records() ); assertThat( records.size(), equalTo( 1 ) ); Record record = records.get( 0 ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxSessionTest.java index e4f5cde24d..95f60df326 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxSessionTest.java @@ -44,7 +44,7 @@ import org.neo4j.driver.internal.util.FixedRetryLogic; import org.neo4j.driver.internal.util.Futures; import org.neo4j.driver.internal.value.IntegerValue; -import org.neo4j.driver.reactive.RxResult; +import org.neo4j.driver.reactive.RxStatementResult; import org.neo4j.driver.reactive.RxSession; import org.neo4j.driver.reactive.RxTransaction; @@ -66,7 +66,7 @@ class InternalRxSessionTest { - private static Stream> allSessionRunMethods() + private static Stream> allSessionRunMethods() { return Stream.of( rxSession -> rxSession.run( "RETURN 1" ), @@ -101,7 +101,7 @@ private static Stream>> allRunTxMethods() @ParameterizedTest @MethodSource( "allSessionRunMethods" ) - void shouldDelegateRun( Function runReturnOne ) throws Throwable + void shouldDelegateRun( Function runReturnOne ) throws Throwable { // Given NetworkSession session = mock( NetworkSession.class ); @@ -112,9 +112,9 @@ void shouldDelegateRun( Function runReturnOne ) throws Throw InternalRxSession rxSession = new InternalRxSession( session ); // When - RxResult result = runReturnOne.apply( rxSession ); + RxStatementResult result = runReturnOne.apply( rxSession ); // Execute the run - CompletionStage cursorFuture = ((InternalRxResult) result).cursorFutureSupplier().get(); + CompletionStage cursorFuture = ((InternalRxStatementResult) result).cursorFutureSupplier().get(); // Then verify( session ).runRx( any( Statement.class ), any( TransactionConfig.class ) ); @@ -123,7 +123,7 @@ void shouldDelegateRun( Function runReturnOne ) throws Throw @ParameterizedTest @MethodSource( "allSessionRunMethods" ) - void shouldReleaseConnectionIfFailedToRun( Function runReturnOne ) throws Throwable + void shouldReleaseConnectionIfFailedToRun( Function runReturnOne ) throws Throwable { // Given Throwable error = new RuntimeException( "Hi there" ); @@ -136,9 +136,9 @@ void shouldReleaseConnectionIfFailedToRun( Function runRetur InternalRxSession rxSession = new InternalRxSession( session ); // When - RxResult result = runReturnOne.apply( rxSession ); + RxStatementResult result = runReturnOne.apply( rxSession ); // Execute the run - CompletionStage cursorFuture = ((InternalRxResult) result).cursorFutureSupplier().get(); + CompletionStage cursorFuture = ((InternalRxStatementResult) result).cursorFutureSupplier().get(); // Then verify( session ).runRx( any( Statement.class ), any( TransactionConfig.class ) ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxResultTest.java b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxStatementResultTest.java similarity index 87% rename from driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxResultTest.java rename to driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxStatementResultTest.java index 73bae7427c..34a4b51830 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxResultTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxStatementResultTest.java @@ -32,7 +32,7 @@ import org.neo4j.driver.internal.handlers.RunResponseHandler; import org.neo4j.driver.internal.handlers.pulln.BasicPullResponseHandler; import org.neo4j.driver.internal.util.Futures; -import org.neo4j.driver.reactive.RxResult; +import org.neo4j.driver.reactive.RxStatementResult; import org.neo4j.driver.internal.reactive.util.ListBasedPullHandler; import org.neo4j.driver.internal.cursor.RxStatementResultCursor; import org.neo4j.driver.Record; @@ -51,14 +51,14 @@ import static org.neo4j.driver.internal.util.Futures.failedFuture; import static org.neo4j.driver.Values.values; -class InternalRxResultTest +class InternalRxStatementResultTest { @Test void shouldInitCursorFuture() { // Given RxStatementResultCursor cursor = mock( RxStatementResultCursor.class ); - InternalRxResult rxResult = newRxResult( cursor ); + InternalRxStatementResult rxResult = newRxResult( cursor ); // When CompletableFuture cursorFuture = rxResult.initCursorFuture().toCompletableFuture(); @@ -73,7 +73,7 @@ void shouldInitCursorFutureWithFailedCursor() { // Given RuntimeException error = new RuntimeException( "Failed to obtain cursor probably due to connection problem" ); - InternalRxResult rxResult = newRxResult( error ); + InternalRxStatementResult rxResult = newRxResult( error ); // When CompletableFuture cursorFuture = rxResult.initCursorFuture().toCompletableFuture(); @@ -89,7 +89,7 @@ void shouldObtainKeys() { // Given RxStatementResultCursor cursor = mock( RxStatementResultCursor.class ); - RxResult rxResult = newRxResult( cursor ); + RxStatementResult rxResult = newRxResult( cursor ); List keys = Arrays.asList( "one", "two", "three" ); when( cursor.keys() ).thenReturn( keys ); @@ -107,7 +107,7 @@ void shouldErrorWhenFailedObtainKeys() { // Given RuntimeException error = new RuntimeException( "Failed to obtain cursor" ); - InternalRxResult rxResult = newRxResult( error ); + InternalRxStatementResult rxResult = newRxResult( error ); // When & Then StepVerifier.create( Flux.from( rxResult.keys() ) ) @@ -120,7 +120,7 @@ void shouldCancelKeys() { // Given RxStatementResultCursor cursor = mock( RxStatementResultCursor.class ); - RxResult rxResult = newRxResult( cursor ); + RxStatementResult rxResult = newRxResult( cursor ); List keys = Arrays.asList( "one", "two", "three" ); when( cursor.keys() ).thenReturn( keys ); @@ -140,7 +140,7 @@ void shouldObtainRecordsAndSummary() Record record3 = new InternalRecord( asList( "key1", "key2", "key3" ), values( 3, 3, 3 ) ); BasicPullResponseHandler pullHandler = new ListBasedPullHandler( Arrays.asList( record1, record2, record3 ) ); - RxResult rxResult = newRxResult( pullHandler ); + RxStatementResult rxResult = newRxResult( pullHandler ); // When StepVerifier.create( Flux.from( rxResult.records() ) ) @@ -160,7 +160,7 @@ void shouldCancelStreamingButObtainSummary() Record record3 = new InternalRecord( asList( "key1", "key2", "key3" ), values( 3, 3, 3 ) ); BasicPullResponseHandler pullHandler = new ListBasedPullHandler( Arrays.asList( record1, record2, record3 ) ); - RxResult rxResult = newRxResult( pullHandler ); + RxStatementResult rxResult = newRxResult( pullHandler ); // When StepVerifier.create( Flux.from( rxResult.records() ).limitRate( 1 ).take( 1 ) ) @@ -174,7 +174,7 @@ void shouldErrorIfFailedToCreateCursor() { // Given Throwable error = new RuntimeException( "Hi" ); - RxResult rxResult = newRxResult( error ); + RxStatementResult rxResult = newRxResult( error ); // When & Then StepVerifier.create( Flux.from( rxResult.records() ) ).expectErrorMatches( isEqual( error ) ).verify(); @@ -186,7 +186,7 @@ void shouldErrorIfFailedToStream() { // Given Throwable error = new RuntimeException( "Hi" ); - RxResult rxResult = newRxResult( new ListBasedPullHandler( error ) ); + RxStatementResult rxResult = newRxResult( new ListBasedPullHandler( error ) ); // When & Then StepVerifier.create( Flux.from( rxResult.records() ) ).expectErrorMatches( isEqual( error ) ).verify(); @@ -195,7 +195,7 @@ void shouldErrorIfFailedToStream() } ).verifyComplete(); } - private InternalRxResult newRxResult( BasicPullResponseHandler pullHandler ) + private InternalRxStatementResult newRxResult( BasicPullResponseHandler pullHandler ) { RunResponseHandler runHandler = mock( RunResponseHandler.class ); when( runHandler.runFuture() ).thenReturn( Futures.completedWithNull() ); @@ -203,17 +203,17 @@ private InternalRxResult newRxResult( BasicPullResponseHandler pullHandler ) return newRxResult( cursor ); } - private InternalRxResult newRxResult( RxStatementResultCursor cursor ) + private InternalRxStatementResult newRxResult( RxStatementResultCursor cursor ) { - return new InternalRxResult( () -> { + return new InternalRxStatementResult( () -> { // now we successfully run return completedFuture( cursor ); } ); } - private InternalRxResult newRxResult( Throwable error ) + private InternalRxStatementResult newRxResult( Throwable error ) { - return new InternalRxResult( () -> { + return new InternalRxStatementResult( () -> { // now we successfully run return failedFuture( new CompletionException( error ) ); } ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxTransactionTest.java b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxTransactionTest.java index cd4d5a736f..02c28719cb 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxTransactionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxTransactionTest.java @@ -36,7 +36,7 @@ import org.neo4j.driver.internal.cursor.RxStatementResultCursor; import org.neo4j.driver.internal.util.Futures; import org.neo4j.driver.internal.value.IntegerValue; -import org.neo4j.driver.reactive.RxResult; +import org.neo4j.driver.reactive.RxStatementResult; import org.neo4j.driver.reactive.RxTransaction; import static java.util.Collections.singletonList; @@ -79,7 +79,7 @@ void rollbackShouldDelegate() verify( tx ).rollbackAsync(); } - private static Stream> allTxRunMethods() + private static Stream> allTxRunMethods() { return Stream.of( rxSession -> rxSession.run( "RETURN 1" ), @@ -93,7 +93,7 @@ private static Stream> allTxRunMethods() @ParameterizedTest @MethodSource( "allTxRunMethods" ) - void shouldDelegateRun( Function runReturnOne ) throws Throwable + void shouldDelegateRun( Function runReturnOne ) throws Throwable { // Given ExplicitTransaction tx = mock( ExplicitTransaction.class ); @@ -104,9 +104,9 @@ void shouldDelegateRun( Function runReturnOne ) throws T InternalRxTransaction rxTx = new InternalRxTransaction( tx ); // When - RxResult result = runReturnOne.apply( rxTx ); + RxStatementResult result = runReturnOne.apply( rxTx ); // Execute the run - CompletionStage cursorFuture = ((InternalRxResult) result).cursorFutureSupplier().get(); + CompletionStage cursorFuture = ((InternalRxStatementResult) result).cursorFutureSupplier().get(); // Then verify( tx ).runRx( any( Statement.class ) ); @@ -115,7 +115,7 @@ void shouldDelegateRun( Function runReturnOne ) throws T @ParameterizedTest @MethodSource( "allTxRunMethods" ) - void shouldMarkTxIfFailedToRun( Function runReturnOne ) throws Throwable + void shouldMarkTxIfFailedToRun( Function runReturnOne ) throws Throwable { // Given Throwable error = new RuntimeException( "Hi there" ); @@ -126,9 +126,9 @@ void shouldMarkTxIfFailedToRun( Function runReturnOne ) InternalRxTransaction rxTx = new InternalRxTransaction( tx ); // When - RxResult result = runReturnOne.apply( rxTx ); + RxStatementResult result = runReturnOne.apply( rxTx ); // Execute the run - CompletionStage cursorFuture = ((InternalRxResult) result).cursorFutureSupplier().get(); + CompletionStage cursorFuture = ((InternalRxStatementResult) result).cursorFutureSupplier().get(); // Then verify( tx ).runRx( any( Statement.class ) ); diff --git a/driver/src/test/java/org/neo4j/driver/stress/RxReadQuery.java b/driver/src/test/java/org/neo4j/driver/stress/RxReadQuery.java index 0bca831f14..14e443ab3b 100644 --- a/driver/src/test/java/org/neo4j/driver/stress/RxReadQuery.java +++ b/driver/src/test/java/org/neo4j/driver/stress/RxReadQuery.java @@ -27,7 +27,7 @@ import org.neo4j.driver.AccessMode; import org.neo4j.driver.Driver; -import org.neo4j.driver.reactive.RxResult; +import org.neo4j.driver.reactive.RxStatementResult; import org.neo4j.driver.reactive.RxSession; import org.neo4j.driver.summary.ResultSummary; import org.neo4j.driver.types.Node; @@ -56,7 +56,7 @@ public CompletionStage execute( C context ) private Publisher processAndGetSummary( RxSession session ) { - RxResult result = session.run( "MATCH (n) RETURN n LIMIT 1" ); + RxStatementResult result = session.run( "MATCH (n) RETURN n LIMIT 1" ); Mono records = Flux.from( result.records() ).singleOrEmpty().map( record -> record.get( 0 ).asNode() ); Mono summaryMono = Mono.from( result.summary() ).single(); return records.then( summaryMono ); diff --git a/driver/src/test/java/org/neo4j/driver/stress/RxReadQueryInTx.java b/driver/src/test/java/org/neo4j/driver/stress/RxReadQueryInTx.java index 484889a16c..c4e9f6cb30 100644 --- a/driver/src/test/java/org/neo4j/driver/stress/RxReadQueryInTx.java +++ b/driver/src/test/java/org/neo4j/driver/stress/RxReadQueryInTx.java @@ -27,7 +27,7 @@ import org.neo4j.driver.AccessMode; import org.neo4j.driver.Driver; -import org.neo4j.driver.reactive.RxResult; +import org.neo4j.driver.reactive.RxStatementResult; import org.neo4j.driver.reactive.RxSession; import org.neo4j.driver.reactive.RxTransaction; import org.neo4j.driver.summary.ResultSummary; @@ -58,7 +58,7 @@ public CompletionStage execute( C context ) private Publisher processAndGetSummary( RxTransaction tx ) { - RxResult result = tx.run( "MATCH (n) RETURN n LIMIT 1" ); + RxStatementResult result = tx.run( "MATCH (n) RETURN n LIMIT 1" ); Mono records = Flux.from( result.records() ).singleOrEmpty().map( record -> record.get( 0 ).asNode() ); Mono summaryMono = Mono.from( result.summary() ).single(); return records.then( summaryMono ); diff --git a/driver/src/test/java/org/neo4j/driver/stress/RxReadQueryWithRetries.java b/driver/src/test/java/org/neo4j/driver/stress/RxReadQueryWithRetries.java index c56ae00430..3781dfe08a 100644 --- a/driver/src/test/java/org/neo4j/driver/stress/RxReadQueryWithRetries.java +++ b/driver/src/test/java/org/neo4j/driver/stress/RxReadQueryWithRetries.java @@ -27,7 +27,7 @@ import org.neo4j.driver.AccessMode; import org.neo4j.driver.Driver; -import org.neo4j.driver.reactive.RxResult; +import org.neo4j.driver.reactive.RxStatementResult; import org.neo4j.driver.reactive.RxSession; import org.neo4j.driver.summary.ResultSummary; import org.neo4j.driver.types.Node; @@ -57,7 +57,7 @@ public CompletionStage execute( C context ) private Publisher processAndGetSummary( RxSession session ) { return session.readTransaction( tx -> { - RxResult result = tx.run( "MATCH (n) RETURN n LIMIT 1" ); + RxStatementResult result = tx.run( "MATCH (n) RETURN n LIMIT 1" ); Mono records = Flux.from( result.records() ).singleOrEmpty().map( record -> record.get( 0 ).asNode() ); Mono summaryMono = Mono.from( result.summary() ).single(); return records.then( summaryMono ); diff --git a/examples/src/main/java/org/neo4j/docs/driver/RxTransactionFunctionExample.java b/examples/src/main/java/org/neo4j/docs/driver/RxTransactionFunctionExample.java index 660aed7af6..2a51dbe2e0 100644 --- a/examples/src/main/java/org/neo4j/docs/driver/RxTransactionFunctionExample.java +++ b/examples/src/main/java/org/neo4j/docs/driver/RxTransactionFunctionExample.java @@ -25,7 +25,7 @@ import java.util.Collections; import java.util.Map; -import org.neo4j.driver.reactive.RxResult; +import org.neo4j.driver.reactive.RxStatementResult; import org.neo4j.driver.reactive.RxSession; import org.neo4j.driver.summary.ResultSummary; @@ -44,7 +44,7 @@ public Flux printAllProductsReactor() return Flux.using( driver::rxSession, session -> session.readTransaction( tx -> { - RxResult result = tx.run( query, parameters ); + RxStatementResult result = tx.run( query, parameters ); return Flux.from( result.records() ) .doOnNext( record -> System.out.println( record.get( 0 ).asString() ) ).then( Mono.from( result.summary() ) ); } @@ -60,7 +60,7 @@ public Flowable printAllProductsRxJava() return Flowable.using( driver::rxSession, session -> session.readTransaction( tx -> { - RxResult result = tx.run( query, parameters ); + RxStatementResult result = tx.run( query, parameters ); return Flowable.fromPublisher( result.records() ) .doOnNext( record -> System.out.println( record.get( 0 ).asString() ) ).ignoreElements().andThen( result.summary() ); }