diff --git a/driver/src/main/java/org/neo4j/driver/exceptions/TransactionNestingException.java b/driver/src/main/java/org/neo4j/driver/exceptions/TransactionNestingException.java index 312a9e7c7f..2c6bb7020d 100644 --- a/driver/src/main/java/org/neo4j/driver/exceptions/TransactionNestingException.java +++ b/driver/src/main/java/org/neo4j/driver/exceptions/TransactionNestingException.java @@ -19,7 +19,7 @@ package org.neo4j.driver.exceptions; /** - * This exception indicates a user is nesting new transaction with a on-going transaction (explicit and/or auto-commit). + * This exception indicates a user is nesting new transaction with an on-going transaction (explicit and/or auto-commit). */ public class TransactionNestingException extends ClientException { diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxStatementResult.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxStatementResult.java index f7e7434e08..aa9029f471 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxStatementResult.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxStatementResult.java @@ -22,6 +22,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.util.List; import java.util.concurrent.CompletionStage; import java.util.function.Supplier; @@ -44,10 +45,10 @@ public InternalRxStatementResult( Supplier keys() + public Publisher> keys() { - return Flux.defer( () -> Mono.fromCompletionStage( getCursorFuture() ) - .flatMapIterable( RxStatementResultCursor::keys ).onErrorMap( Futures::completionExceptionCause ) ); + return Mono.defer( () -> Mono.fromCompletionStage( getCursorFuture() ).map( RxStatementResultCursor::keys ) + .onErrorMap( Futures::completionExceptionCause ) ); } @Override diff --git a/driver/src/main/java/org/neo4j/driver/reactive/RxStatementResult.java b/driver/src/main/java/org/neo4j/driver/reactive/RxStatementResult.java index 6f9abda2af..46471611b5 100644 --- a/driver/src/main/java/org/neo4j/driver/reactive/RxStatementResult.java +++ b/driver/src/main/java/org/neo4j/driver/reactive/RxStatementResult.java @@ -22,6 +22,8 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import java.util.List; + import org.neo4j.driver.Record; import org.neo4j.driver.Statement; import org.neo4j.driver.exceptions.ResultConsumedException; @@ -43,12 +45,13 @@ public interface RxStatementResult { /** * Returns a cold publisher of keys. + * This publisher always publishes one item - a list of keys. The list could be empty which indicates no keys in the result. *

- * When this publisher is {@linkplain Publisher#subscribe(Subscriber) subscribed}, the query statement is sent to the server and executed. - * This method does not start the record streaming nor publish query execution error. + * When this publisher is {@linkplain Publisher#subscribe(Subscriber) subscribed}, the statement is sent to the server and executed. + * This method does not start the record streaming nor publish statement execution result. * To retrieve the execution result, either {@link #records()} or {@link #consume()} can be used. - * {@link #records()} starts record streaming and reports query execution error. - * {@link #consume()} skips record streaming and directly reports query execution error. + * {@link #records()} starts record streaming and reports statement execution result. + * {@link #consume()} skips record streaming and directly reports statement execution result. *

* Consuming of execution result ensures the resources (such as network connections) used by this result is freed correctly. * Consuming the keys without consuming the execution result will result in resource leak. @@ -60,23 +63,23 @@ public interface RxStatementResult * then the buffered keys will be returned. * @return a cold publisher of keys. */ - Publisher keys(); + Publisher> keys(); /** * Returns a cold unicast publisher of records. *

* When the record publisher is {@linkplain Publisher#subscribe(Subscriber) subscribed}, - * the query statement is executed and the query result is streamed back as a record stream followed by a result summary. + * the query statement is executed and the statement result is streamed back as a record stream followed by a result summary. * This record publisher publishes all records in the result and signals the completion. * However before completion or error reporting if any, a cleanup of result resources such as network connection will be carried out automatically. *

* Therefore the {@link Subscriber} of this record publisher shall wait for the termination signal (complete or error) * to ensure that the resources used by this result are released correctly. - * Then the session is ready to be used to run more queries. + * Then the session is ready to be used to run more statements. *

* Cancelling of the record streaming will immediately terminate the propagation of new records. - * But it will not cancel the query execution. - * As a result, a termination signal (complete or error) will still be sent to the {@link Subscriber} after the query execution is finished. + * But it will not cancel statement execution on the server. + * When the execution is finished, the {@link Subscriber} will be notified with a termination signal (complete or error). *

* The record publishing event by default runs in an Network IO thread, as a result no blocking operation is allowed in this thread. * Otherwise network IO might be blocked by application logic. @@ -92,12 +95,12 @@ public interface RxStatementResult /** * Returns a cold publisher of result summary which arrives after all records. *

- * {@linkplain Publisher#subscribe(Subscriber) Subscribing} the summary publisher results in the execution of the query followed by the result summary returned. - * The summary publisher cancels record publishing if not yet subscribed and directly streams back the summary on query execution completion. + * {@linkplain Publisher#subscribe(Subscriber) Subscribing} the summary publisher results in the execution of the statement followed by the result summary being returned. + * The summary publisher cancels record publishing if not yet subscribed and directly streams back the summary on statement execution completion. * As a result, the invocation of {@link #records()} after this method, would receive an {@link ResultConsumedException}. *

- * If subscribed after {@link #keys()}, then the result summary will be published after the query execution without streaming any record to client. - * If subscribed after {@link #records()}, then the result summary will be published after the query execution and the streaming of records. + * If subscribed after {@link #keys()}, then the result summary will be published after the statement execution without streaming any record to client. + * If subscribed after {@link #records()}, then the result summary will be published after the statement execution and the streaming of records. *

* Usually, this method shall be chained after {@link #records()} to ensure that all records are processed before summary. *

diff --git a/driver/src/test/java/org/neo4j/driver/integration/ConnectionHandlingIT.java b/driver/src/test/java/org/neo4j/driver/integration/ConnectionHandlingIT.java index 3564de3a2e..0dd91e9179 100644 --- a/driver/src/test/java/org/neo4j/driver/integration/ConnectionHandlingIT.java +++ b/driver/src/test/java/org/neo4j/driver/integration/ConnectionHandlingIT.java @@ -33,6 +33,16 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import org.neo4j.driver.AuthToken; +import org.neo4j.driver.Config; +import org.neo4j.driver.Driver; +import org.neo4j.driver.Logging; +import org.neo4j.driver.Record; +import org.neo4j.driver.Session; +import org.neo4j.driver.StatementResult; +import org.neo4j.driver.StatementRunner; +import org.neo4j.driver.Transaction; +import org.neo4j.driver.exceptions.ClientException; import org.neo4j.driver.internal.BoltServerAddress; import org.neo4j.driver.internal.ConnectionSettings; import org.neo4j.driver.internal.DriverFactory; @@ -47,23 +57,14 @@ import org.neo4j.driver.internal.spi.ConnectionPool; import org.neo4j.driver.internal.util.Clock; import org.neo4j.driver.internal.util.EnabledOnNeo4jWith; -import org.neo4j.driver.reactive.RxStatementResult; import org.neo4j.driver.reactive.RxSession; +import org.neo4j.driver.reactive.RxStatementResult; import org.neo4j.driver.reactive.RxTransaction; -import org.neo4j.driver.AuthToken; -import org.neo4j.driver.Config; -import org.neo4j.driver.Driver; -import org.neo4j.driver.Logging; -import org.neo4j.driver.Record; -import org.neo4j.driver.Session; -import org.neo4j.driver.StatementResult; -import org.neo4j.driver.StatementRunner; -import org.neo4j.driver.Transaction; -import org.neo4j.driver.exceptions.ClientException; import org.neo4j.driver.summary.ResultSummary; import org.neo4j.driver.util.DatabaseExtension; import org.neo4j.driver.util.ParallelizableIT; +import static java.util.Collections.singletonList; import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertNull; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -75,10 +76,10 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.neo4j.driver.internal.metrics.InternalAbstractMetrics.DEV_NULL_METRICS; -import static org.neo4j.driver.internal.util.Neo4jFeature.BOLT_V4; import static org.neo4j.driver.Config.defaultConfig; import static org.neo4j.driver.Values.parameters; +import static org.neo4j.driver.internal.metrics.InternalAbstractMetrics.DEV_NULL_METRICS; +import static org.neo4j.driver.internal.util.Neo4jFeature.BOLT_V4; import static org.neo4j.driver.util.TestUtil.await; @ParallelizableIT @@ -317,7 +318,7 @@ void sessionCloseShouldReleaseConnectionUsedBySessionRun() throws Throwable RxStatementResult res = session.run( "UNWIND [1,2,3,4] AS a RETURN a" ); // When we only run but not pull - StepVerifier.create( Flux.from( res.keys() ) ).expectNext( "a" ).verifyComplete(); + StepVerifier.create( Flux.from( res.keys() ) ).expectNext( singletonList( "a" ) ).verifyComplete(); Connection connection1 = connectionPool.lastAcquiredConnectionSpy; verify( connection1, never() ).release(); diff --git a/driver/src/test/java/org/neo4j/driver/integration/reactive/RxStatementResultIT.java b/driver/src/test/java/org/neo4j/driver/integration/reactive/RxStatementResultIT.java index 7498fd2995..e103db5332 100644 --- a/driver/src/test/java/org/neo4j/driver/integration/reactive/RxStatementResultIT.java +++ b/driver/src/test/java/org/neo4j/driver/integration/reactive/RxStatementResultIT.java @@ -24,6 +24,8 @@ import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import java.util.List; + import org.neo4j.driver.Record; import org.neo4j.driver.exceptions.ClientException; import org.neo4j.driver.internal.util.EnabledOnNeo4jWith; @@ -34,6 +36,8 @@ import org.neo4j.driver.util.DatabaseExtension; import org.neo4j.driver.util.ParallelizableIT; +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.Matchers.equalTo; @@ -180,7 +184,7 @@ void shouldHaveFieldNamesInResult() RxStatementResult res = session.run( "CREATE (n:TestNode {name:'test'}) RETURN n" ); // Then - StepVerifier.create( res.keys() ).expectNext( "n" ).expectComplete().verify(); + StepVerifier.create( res.keys() ).expectNext( singletonList( "n" ) ).expectComplete().verify(); StepVerifier.create( res.records() ) .assertNext( record -> { assertEquals( "[n]", record.keys().toString() ); @@ -197,7 +201,7 @@ void shouldReturnEmptyKeyAndRecordOnEmptyResult() RxStatementResult rs = session.run( "CREATE (n:Person {name:$name})", parameters( "name", "Tom Hanks" ) ); // Then - StepVerifier.create( rs.keys() ).expectComplete().verify(); + StepVerifier.create( rs.keys() ).expectNext( emptyList() ).expectComplete().verify(); StepVerifier.create( rs.records() ).expectComplete().verify(); } @@ -209,12 +213,12 @@ void shouldOnlyErrorRecordAfterFailure() RxStatementResult result = session.run( "INVALID" ); // When - Flux keys = Flux.from( result.keys() ); + Flux> keys = Flux.from( result.keys() ); Flux records = Flux.from( result.records() ); Mono summaryMono = Mono.from( result.consume() ); // Then - StepVerifier.create( keys ).verifyComplete(); + StepVerifier.create( keys ).expectNext( emptyList() ).verifyComplete(); StepVerifier.create( records ).expectErrorSatisfies( error -> { assertThat( error, instanceOf( ClientException.class ) ); @@ -238,11 +242,11 @@ void shouldErrorOnSummaryIfNoRecord() throws Throwable RxStatementResult result = session.run( "INVALID" ); // When - Flux keys = Flux.from( result.keys() ); + Flux> keys = Flux.from( result.keys() ); Mono summaryMono = Mono.from( result.consume() ); // Then - StepVerifier.create( keys ).verifyComplete(); + StepVerifier.create( keys ).expectNext( emptyList() ).verifyComplete(); StepVerifier.create( summaryMono ).expectErrorSatisfies( error -> { assertThat( error, instanceOf( ClientException.class ) ); @@ -322,7 +326,7 @@ private void verifyCanAccessFullRecords( RxStatementResult res ) private void verifyCanAccessKeys( RxStatementResult res ) { - StepVerifier.create( res.keys() ).expectNext( "a" ).verifyComplete(); + StepVerifier.create( res.keys() ).expectNext( singletonList( "a" ) ).verifyComplete(); } private RxStatementResult sessionRunUnwind() diff --git a/driver/src/test/java/org/neo4j/driver/integration/reactive/RxTransactionIT.java b/driver/src/test/java/org/neo4j/driver/integration/reactive/RxTransactionIT.java index e5515ecded..222b9ae5f4 100644 --- a/driver/src/test/java/org/neo4j/driver/integration/reactive/RxTransactionIT.java +++ b/driver/src/test/java/org/neo4j/driver/integration/reactive/RxTransactionIT.java @@ -360,7 +360,7 @@ void shouldExposeStatementKeysForColumnsWithAliases() RxTransaction tx = await( Mono.from( session.beginTransaction() ) ); RxStatementResult result = tx.run( "RETURN 1 AS one, 2 AS two, 3 AS three, 4 AS five" ); - List keys = await( result.keys() ); + List keys = await( Mono.from( result.keys() ) ); assertEquals( Arrays.asList( "one", "two", "three", "five" ), keys ); assertCanRollback( tx ); // you still need to rollback the tx as tx will not automatically closed @@ -372,7 +372,7 @@ void shouldExposeStatementKeysForColumnsWithoutAliases() RxTransaction tx = await( Mono.from( session.beginTransaction() ) ); RxStatementResult result = tx.run( "RETURN 1, 2, 3, 5" ); - List keys = await( result.keys() ); + List keys = await( Mono.from( result.keys() ) ); assertEquals( Arrays.asList( "1", "2", "3", "5" ), keys ); assertCanRollback( tx ); // you still need to rollback the tx as tx will not automatically closed diff --git a/driver/src/test/java/org/neo4j/driver/internal/DirectDriverBoltKitTest.java b/driver/src/test/java/org/neo4j/driver/internal/DirectDriverBoltKitTest.java index 9ff7cd29a3..aa7ed4d9d0 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/DirectDriverBoltKitTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/DirectDriverBoltKitTest.java @@ -51,6 +51,7 @@ import org.neo4j.driver.util.StubServer; import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; import static java.util.concurrent.TimeUnit.SECONDS; import static org.hamcrest.core.IsEqual.equalTo; import static org.hamcrest.junit.MatcherAssert.assertThat; @@ -427,11 +428,11 @@ void shouldDiscardIfPullNotFinished() throws Throwable try ( Driver driver = GraphDatabase.driver( "bolt://localhost:9001", INSECURE_CONFIG ) ) { - Flux keys = Flux.usingWhen( + Flux> keys = Flux.usingWhen( Mono.fromSupplier( driver::rxSession ), session -> session.readTransaction( tx -> tx.run( "UNWIND [1,2,3,4] AS a RETURN a" ).keys() ), RxSession::close ); - StepVerifier.create( keys ).expectNext( "a" ).verifyComplete(); + StepVerifier.create( keys ).expectNext( singletonList( "a" ) ).verifyComplete(); } finally { diff --git a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxStatementResultTest.java b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxStatementResultTest.java index 77724978b8..b0adeafcb3 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxStatementResultTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxStatementResultTest.java @@ -97,9 +97,7 @@ void shouldObtainKeys() // When & Then StepVerifier.create( Flux.from( rxResult.keys() ) ) - .expectNext( "one" ) - .expectNext( "two" ) - .expectNext( "three" ) + .expectNext( Arrays.asList( "one", "two", "three" ) ) .verifyComplete(); } @@ -128,7 +126,7 @@ void shouldCancelKeys() // When & Then StepVerifier.create( Flux.from( rxResult.keys() ).limitRate( 1 ).take( 1 ) ) - .expectNext( "one" ) + .expectNext( Arrays.asList( "one", "two", "three" ) ) .verifyComplete(); }