Skip to content

Change RxResult#keys to return a single list of keys #643

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.neo4j.driver.exceptions;

/**
* This exception indicates a user is nesting new transaction with a on-going transaction (explicit and/or auto-commit).
* This exception indicates a user is nesting new transaction with an on-going transaction (explicit and/or auto-commit).
*/
public class TransactionNestingException extends ClientException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.function.Supplier;

Expand All @@ -44,10 +45,10 @@ public InternalRxStatementResult( Supplier<CompletionStage<RxStatementResultCurs
}

@Override
public Publisher<String> keys()
public Publisher<List<String>> keys()
{
return Flux.defer( () -> Mono.fromCompletionStage( getCursorFuture() )
.flatMapIterable( RxStatementResultCursor::keys ).onErrorMap( Futures::completionExceptionCause ) );
return Mono.defer( () -> Mono.fromCompletionStage( getCursorFuture() ).map( RxStatementResultCursor::keys )
.onErrorMap( Futures::completionExceptionCause ) );
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.util.List;

import org.neo4j.driver.Record;
import org.neo4j.driver.Statement;
import org.neo4j.driver.exceptions.ResultConsumedException;
Expand All @@ -43,12 +45,13 @@ public interface RxStatementResult
{
/**
* Returns a cold publisher of keys.
* This publisher always publishes one item - a list of keys. The list could be empty which indicates no keys in the result.
* <p>
* When this publisher is {@linkplain Publisher#subscribe(Subscriber) subscribed}, the query statement is sent to the server and executed.
* This method does not start the record streaming nor publish query execution error.
* When this publisher is {@linkplain Publisher#subscribe(Subscriber) subscribed}, the statement is sent to the server and executed.
* This method does not start the record streaming nor publish statement execution result.
* To retrieve the execution result, either {@link #records()} or {@link #consume()} can be used.
* {@link #records()} starts record streaming and reports query execution error.
* {@link #consume()} skips record streaming and directly reports query execution error.
* {@link #records()} starts record streaming and reports statement execution result.
* {@link #consume()} skips record streaming and directly reports statement execution result.
* <p>
* Consuming of execution result ensures the resources (such as network connections) used by this result is freed correctly.
* Consuming the keys without consuming the execution result will result in resource leak.
Expand All @@ -60,23 +63,23 @@ public interface RxStatementResult
* then the buffered keys will be returned.
* @return a cold publisher of keys.
*/
Publisher<String> keys();
Publisher<List<String>> keys();

/**
* Returns a cold unicast publisher of records.
* <p>
* When the record publisher is {@linkplain Publisher#subscribe(Subscriber) subscribed},
* the query statement is executed and the query result is streamed back as a record stream followed by a result summary.
* the query statement is executed and the statement result is streamed back as a record stream followed by a result summary.
* This record publisher publishes all records in the result and signals the completion.
* However before completion or error reporting if any, a cleanup of result resources such as network connection will be carried out automatically.
* <p>
* Therefore the {@link Subscriber} of this record publisher shall wait for the termination signal (complete or error)
* to ensure that the resources used by this result are released correctly.
* Then the session is ready to be used to run more queries.
* Then the session is ready to be used to run more statements.
* <p>
* Cancelling of the record streaming will immediately terminate the propagation of new records.
* But it will not cancel the query execution.
* As a result, a termination signal (complete or error) will still be sent to the {@link Subscriber} after the query execution is finished.
* But it will not cancel statement execution on the server.
* When the execution is finished, the {@link Subscriber} will be notified with a termination signal (complete or error).
* <p>
* The record publishing event by default runs in an Network IO thread, as a result no blocking operation is allowed in this thread.
* Otherwise network IO might be blocked by application logic.
Expand All @@ -92,12 +95,12 @@ public interface RxStatementResult
/**
* Returns a cold publisher of result summary which arrives after all records.
* <p>
* {@linkplain Publisher#subscribe(Subscriber) Subscribing} the summary publisher results in the execution of the query followed by the result summary returned.
* The summary publisher cancels record publishing if not yet subscribed and directly streams back the summary on query execution completion.
* {@linkplain Publisher#subscribe(Subscriber) Subscribing} the summary publisher results in the execution of the statement followed by the result summary being returned.
* The summary publisher cancels record publishing if not yet subscribed and directly streams back the summary on statement execution completion.
* As a result, the invocation of {@link #records()} after this method, would receive an {@link ResultConsumedException}.
* <p>
* If subscribed after {@link #keys()}, then the result summary will be published after the query execution without streaming any record to client.
* If subscribed after {@link #records()}, then the result summary will be published after the query execution and the streaming of records.
* If subscribed after {@link #keys()}, then the result summary will be published after the statement execution without streaming any record to client.
* If subscribed after {@link #records()}, then the result summary will be published after the statement execution and the streaming of records.
* <p>
* Usually, this method shall be chained after {@link #records()} to ensure that all records are processed before summary.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import org.neo4j.driver.AuthToken;
import org.neo4j.driver.Config;
import org.neo4j.driver.Driver;
import org.neo4j.driver.Logging;
import org.neo4j.driver.Record;
import org.neo4j.driver.Session;
import org.neo4j.driver.StatementResult;
import org.neo4j.driver.StatementRunner;
import org.neo4j.driver.Transaction;
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.ConnectionSettings;
import org.neo4j.driver.internal.DriverFactory;
Expand All @@ -47,23 +57,14 @@
import org.neo4j.driver.internal.spi.ConnectionPool;
import org.neo4j.driver.internal.util.Clock;
import org.neo4j.driver.internal.util.EnabledOnNeo4jWith;
import org.neo4j.driver.reactive.RxStatementResult;
import org.neo4j.driver.reactive.RxSession;
import org.neo4j.driver.reactive.RxStatementResult;
import org.neo4j.driver.reactive.RxTransaction;
import org.neo4j.driver.AuthToken;
import org.neo4j.driver.Config;
import org.neo4j.driver.Driver;
import org.neo4j.driver.Logging;
import org.neo4j.driver.Record;
import org.neo4j.driver.Session;
import org.neo4j.driver.StatementResult;
import org.neo4j.driver.StatementRunner;
import org.neo4j.driver.Transaction;
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.summary.ResultSummary;
import org.neo4j.driver.util.DatabaseExtension;
import org.neo4j.driver.util.ParallelizableIT;

import static java.util.Collections.singletonList;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand All @@ -75,10 +76,10 @@
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.neo4j.driver.internal.metrics.InternalAbstractMetrics.DEV_NULL_METRICS;
import static org.neo4j.driver.internal.util.Neo4jFeature.BOLT_V4;
import static org.neo4j.driver.Config.defaultConfig;
import static org.neo4j.driver.Values.parameters;
import static org.neo4j.driver.internal.metrics.InternalAbstractMetrics.DEV_NULL_METRICS;
import static org.neo4j.driver.internal.util.Neo4jFeature.BOLT_V4;
import static org.neo4j.driver.util.TestUtil.await;

@ParallelizableIT
Expand Down Expand Up @@ -317,7 +318,7 @@ void sessionCloseShouldReleaseConnectionUsedBySessionRun() throws Throwable
RxStatementResult res = session.run( "UNWIND [1,2,3,4] AS a RETURN a" );

// When we only run but not pull
StepVerifier.create( Flux.from( res.keys() ) ).expectNext( "a" ).verifyComplete();
StepVerifier.create( Flux.from( res.keys() ) ).expectNext( singletonList( "a" ) ).verifyComplete();
Connection connection1 = connectionPool.lastAcquiredConnectionSpy;
verify( connection1, never() ).release();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import java.util.List;

import org.neo4j.driver.Record;
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.internal.util.EnabledOnNeo4jWith;
Expand All @@ -34,6 +36,8 @@
import org.neo4j.driver.util.DatabaseExtension;
import org.neo4j.driver.util.ParallelizableIT;

import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -180,7 +184,7 @@ void shouldHaveFieldNamesInResult()
RxStatementResult res = session.run( "CREATE (n:TestNode {name:'test'}) RETURN n" );

// Then
StepVerifier.create( res.keys() ).expectNext( "n" ).expectComplete().verify();
StepVerifier.create( res.keys() ).expectNext( singletonList( "n" ) ).expectComplete().verify();
StepVerifier.create( res.records() )
.assertNext( record -> {
assertEquals( "[n]", record.keys().toString() );
Expand All @@ -197,7 +201,7 @@ void shouldReturnEmptyKeyAndRecordOnEmptyResult()
RxStatementResult rs = session.run( "CREATE (n:Person {name:$name})", parameters( "name", "Tom Hanks" ) );

// Then
StepVerifier.create( rs.keys() ).expectComplete().verify();
StepVerifier.create( rs.keys() ).expectNext( emptyList() ).expectComplete().verify();
StepVerifier.create( rs.records() ).expectComplete().verify();
}

Expand All @@ -209,12 +213,12 @@ void shouldOnlyErrorRecordAfterFailure()
RxStatementResult result = session.run( "INVALID" );

// When
Flux<String> keys = Flux.from( result.keys() );
Flux<List<String>> keys = Flux.from( result.keys() );
Flux<Record> records = Flux.from( result.records() );
Mono<ResultSummary> summaryMono = Mono.from( result.consume() );

// Then
StepVerifier.create( keys ).verifyComplete();
StepVerifier.create( keys ).expectNext( emptyList() ).verifyComplete();

StepVerifier.create( records ).expectErrorSatisfies( error -> {
assertThat( error, instanceOf( ClientException.class ) );
Expand All @@ -238,11 +242,11 @@ void shouldErrorOnSummaryIfNoRecord() throws Throwable
RxStatementResult result = session.run( "INVALID" );

// When
Flux<String> keys = Flux.from( result.keys() );
Flux<List<String>> keys = Flux.from( result.keys() );
Mono<ResultSummary> summaryMono = Mono.from( result.consume() );

// Then
StepVerifier.create( keys ).verifyComplete();
StepVerifier.create( keys ).expectNext( emptyList() ).verifyComplete();

StepVerifier.create( summaryMono ).expectErrorSatisfies( error -> {
assertThat( error, instanceOf( ClientException.class ) );
Expand Down Expand Up @@ -322,7 +326,7 @@ private void verifyCanAccessFullRecords( RxStatementResult res )

private void verifyCanAccessKeys( RxStatementResult res )
{
StepVerifier.create( res.keys() ).expectNext( "a" ).verifyComplete();
StepVerifier.create( res.keys() ).expectNext( singletonList( "a" ) ).verifyComplete();
}

private RxStatementResult sessionRunUnwind()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ void shouldExposeStatementKeysForColumnsWithAliases()
RxTransaction tx = await( Mono.from( session.beginTransaction() ) );
RxStatementResult result = tx.run( "RETURN 1 AS one, 2 AS two, 3 AS three, 4 AS five" );

List<String> keys = await( result.keys() );
List<String> keys = await( Mono.from( result.keys() ) );
assertEquals( Arrays.asList( "one", "two", "three", "five" ), keys );

assertCanRollback( tx ); // you still need to rollback the tx as tx will not automatically closed
Expand All @@ -372,7 +372,7 @@ void shouldExposeStatementKeysForColumnsWithoutAliases()
RxTransaction tx = await( Mono.from( session.beginTransaction() ) );
RxStatementResult result = tx.run( "RETURN 1, 2, 3, 5" );

List<String> keys = await( result.keys() );
List<String> keys = await( Mono.from( result.keys() ) );
assertEquals( Arrays.asList( "1", "2", "3", "5" ), keys );

assertCanRollback( tx ); // you still need to rollback the tx as tx will not automatically closed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.neo4j.driver.util.StubServer;

import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.hamcrest.junit.MatcherAssert.assertThat;
Expand Down Expand Up @@ -427,11 +428,11 @@ void shouldDiscardIfPullNotFinished() throws Throwable

try ( Driver driver = GraphDatabase.driver( "bolt://localhost:9001", INSECURE_CONFIG ) )
{
Flux<String> keys = Flux.usingWhen(
Flux<List<String>> keys = Flux.usingWhen(
Mono.fromSupplier( driver::rxSession ),
session -> session.readTransaction( tx -> tx.run( "UNWIND [1,2,3,4] AS a RETURN a" ).keys() ),
RxSession::close );
StepVerifier.create( keys ).expectNext( "a" ).verifyComplete();
StepVerifier.create( keys ).expectNext( singletonList( "a" ) ).verifyComplete();
}
finally
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,7 @@ void shouldObtainKeys()

// When & Then
StepVerifier.create( Flux.from( rxResult.keys() ) )
.expectNext( "one" )
.expectNext( "two" )
.expectNext( "three" )
.expectNext( Arrays.asList( "one", "two", "three" ) )
.verifyComplete();
}

Expand Down Expand Up @@ -128,7 +126,7 @@ void shouldCancelKeys()

// When & Then
StepVerifier.create( Flux.from( rxResult.keys() ).limitRate( 1 ).take( 1 ) )
.expectNext( "one" )
.expectNext( Arrays.asList( "one", "two", "three" ) )
.verifyComplete();
}

Expand Down