Skip to content

Commit e975b2c

Browse files
author
Zhen Li
committed
Change RxResult#keys to return a single list of keys
1 parent ead87f1 commit e975b2c

File tree

7 files changed

+48
-42
lines changed

7 files changed

+48
-42
lines changed

driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxStatementResult.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import reactor.core.publisher.Flux;
2323
import reactor.core.publisher.Mono;
2424

25+
import java.util.List;
2526
import java.util.concurrent.CompletionStage;
2627
import java.util.function.Supplier;
2728

@@ -44,10 +45,10 @@ public InternalRxStatementResult( Supplier<CompletionStage<RxStatementResultCurs
4445
}
4546

4647
@Override
47-
public Publisher<String> keys()
48+
public Publisher<List<String>> keys()
4849
{
49-
return Flux.defer( () -> Mono.fromCompletionStage( getCursorFuture() )
50-
.flatMapIterable( RxStatementResultCursor::keys ).onErrorMap( Futures::completionExceptionCause ) );
50+
return Flux.defer( () -> Mono.fromCompletionStage( getCursorFuture() ).map( RxStatementResultCursor::keys )
51+
.onErrorMap( Futures::completionExceptionCause ) );
5152
}
5253

5354
@Override

driver/src/main/java/org/neo4j/driver/reactive/RxStatementResult.java

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.reactivestreams.Subscriber;
2323
import org.reactivestreams.Subscription;
2424

25+
import java.util.List;
26+
2527
import org.neo4j.driver.Record;
2628
import org.neo4j.driver.Statement;
2729
import org.neo4j.driver.summary.ResultSummary;
@@ -43,11 +45,11 @@ public interface RxStatementResult
4345
/**
4446
* Returns a cold publisher of keys.
4547
* <p>
46-
* When this publisher is {@linkplain Publisher#subscribe(Subscriber) subscribed}, the query statement is sent to the server and executed.
47-
* This method does not start the record streaming nor publish query execution error.
48+
* When this publisher is {@linkplain Publisher#subscribe(Subscriber) subscribed}, the statement is sent to the server and executed.
49+
* This method does not start the record streaming nor publish statement execution result.
4850
* To retrieve the execution result, either {@link #records()} or {@link #summary()} can be used.
49-
* {@link #records()} starts record streaming and reports query execution error.
50-
* {@link #summary()} skips record streaming and directly reports query execution error.
51+
* {@link #records()} starts record streaming and reports statement execution result.
52+
* {@link #summary()} skips record streaming and directly reports statement execution result.
5153
* <p>
5254
* Consuming of execution result ensures the resources (such as network connections) used by this result is freed correctly.
5355
* Consuming the keys without consuming the execution result will result in resource leak.
@@ -59,23 +61,23 @@ public interface RxStatementResult
5961
* then the buffered keys will be returned.
6062
* @return a cold publisher of keys.
6163
*/
62-
Publisher<String> keys();
64+
Publisher<List<String>> keys();
6365

6466
/**
6567
* Returns a cold unicast publisher of records.
6668
* <p>
6769
* When the record publisher is {@linkplain Publisher#subscribe(Subscriber) subscribed},
68-
* the query statement is executed and the query result is streamed back as a record stream followed by a result summary.
70+
* the query statement is executed and the statement result is streamed back as a record stream followed by a result summary.
6971
* This record publisher publishes all records in the result and signals the completion.
7072
* However before completion or error reporting if any, a cleanup of result resources such as network connection will be carried out automatically.
7173
* <p>
7274
* Therefore the {@link Subscriber} of this record publisher shall wait for the termination signal (complete or error)
7375
* to ensure that the resources used by this result are released correctly.
74-
* Then the session is ready to be used to run more queries.
76+
* Then the session is ready to be used to run more statements.
7577
* <p>
7678
* Cancelling of the record streaming will immediately terminate the propagation of new records.
77-
* But it will not cancel the query execution.
78-
* As a result, a termination signal (complete or error) will still be sent to the {@link Subscriber} after the query execution is finished.
79+
* But it will not cancel statement execution on the server.
80+
* When the execution is finished, the {@link Subscriber} will be notified with a termination signal (complete or error).
7981
* <p>
8082
* The record publishing event by default runs in an Network IO thread, as a result no blocking operation is allowed in this thread.
8183
* Otherwise network IO might be blocked by application logic.
@@ -92,12 +94,12 @@ public interface RxStatementResult
9294
/**
9395
* Returns a cold publisher of result summary which arrives after all records.
9496
* <p>
95-
* {@linkplain Publisher#subscribe(Subscriber) Subscribing} the summary publisher results in the execution of the query followed by the result summary returned.
96-
* The summary publisher cancels record publishing if not yet subscribed and directly streams back the summary on query execution completion.
97+
* {@linkplain Publisher#subscribe(Subscriber) Subscribing} the summary publisher results in the execution of the statement followed by the result summary being returned.
98+
* The summary publisher cancels record publishing if not yet subscribed and directly streams back the summary on statement execution completion.
9799
* As a result, the invocation of {@link #records()} after this method, would receive an empty publisher.
98100
* <p>
99-
* If subscribed after {@link #keys()}, then the result summary will be published after the query execution without streaming any record to client.
100-
* If subscribed after {@link #records()}, then the result summary will be published after the query execution and the streaming of records.
101+
* If subscribed after {@link #keys()}, then the result summary will be published after the statement execution without streaming any record to client.
102+
* If subscribed after {@link #records()}, then the result summary will be published after the statement execution and the streaming of records.
101103
* <p>
102104
* Usually, this method shall be chained after {@link #records()} to ensure that all records are processed before summary.
103105
* <p>

driver/src/test/java/org/neo4j/driver/integration/ConnectionHandlingIT.java

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,16 @@
3333
import java.util.concurrent.CompletableFuture;
3434
import java.util.concurrent.CompletionStage;
3535

36+
import org.neo4j.driver.AuthToken;
37+
import org.neo4j.driver.Config;
38+
import org.neo4j.driver.Driver;
39+
import org.neo4j.driver.Logging;
40+
import org.neo4j.driver.Record;
41+
import org.neo4j.driver.Session;
42+
import org.neo4j.driver.StatementResult;
43+
import org.neo4j.driver.StatementRunner;
44+
import org.neo4j.driver.Transaction;
45+
import org.neo4j.driver.exceptions.ClientException;
3646
import org.neo4j.driver.internal.BoltServerAddress;
3747
import org.neo4j.driver.internal.ConnectionSettings;
3848
import org.neo4j.driver.internal.DriverFactory;
@@ -47,23 +57,14 @@
4757
import org.neo4j.driver.internal.spi.ConnectionPool;
4858
import org.neo4j.driver.internal.util.Clock;
4959
import org.neo4j.driver.internal.util.EnabledOnNeo4jWith;
50-
import org.neo4j.driver.reactive.RxStatementResult;
5160
import org.neo4j.driver.reactive.RxSession;
61+
import org.neo4j.driver.reactive.RxStatementResult;
5262
import org.neo4j.driver.reactive.RxTransaction;
53-
import org.neo4j.driver.AuthToken;
54-
import org.neo4j.driver.Config;
55-
import org.neo4j.driver.Driver;
56-
import org.neo4j.driver.Logging;
57-
import org.neo4j.driver.Record;
58-
import org.neo4j.driver.Session;
59-
import org.neo4j.driver.StatementResult;
60-
import org.neo4j.driver.StatementRunner;
61-
import org.neo4j.driver.Transaction;
62-
import org.neo4j.driver.exceptions.ClientException;
6363
import org.neo4j.driver.summary.ResultSummary;
6464
import org.neo4j.driver.util.DatabaseExtension;
6565
import org.neo4j.driver.util.ParallelizableIT;
6666

67+
import static java.util.Collections.singletonList;
6768
import static org.junit.Assert.assertNotSame;
6869
import static org.junit.Assert.assertNull;
6970
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -75,10 +76,10 @@
7576
import static org.mockito.Mockito.spy;
7677
import static org.mockito.Mockito.times;
7778
import static org.mockito.Mockito.verify;
78-
import static org.neo4j.driver.internal.metrics.InternalAbstractMetrics.DEV_NULL_METRICS;
79-
import static org.neo4j.driver.internal.util.Neo4jFeature.BOLT_V4;
8079
import static org.neo4j.driver.Config.defaultConfig;
8180
import static org.neo4j.driver.Values.parameters;
81+
import static org.neo4j.driver.internal.metrics.InternalAbstractMetrics.DEV_NULL_METRICS;
82+
import static org.neo4j.driver.internal.util.Neo4jFeature.BOLT_V4;
8283
import static org.neo4j.driver.util.TestUtil.await;
8384

8485
@ParallelizableIT
@@ -317,7 +318,7 @@ void sessionCloseShouldReleaseConnectionUsedBySessionRun() throws Throwable
317318
RxStatementResult res = session.run( "UNWIND [1,2,3,4] AS a RETURN a" );
318319

319320
// When we only run but not pull
320-
StepVerifier.create( Flux.from( res.keys() ) ).expectNext( "a" ).verifyComplete();
321+
StepVerifier.create( Flux.from( res.keys() ) ).expectNext( singletonList( "a" ) ).verifyComplete();
321322
Connection connection1 = connectionPool.lastAcquiredConnectionSpy;
322323
verify( connection1, never() ).release();
323324

driver/src/test/java/org/neo4j/driver/integration/reactive/RxStatementResultIT.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import reactor.core.publisher.Mono;
2525
import reactor.test.StepVerifier;
2626

27+
import java.util.List;
28+
2729
import org.neo4j.driver.Record;
2830
import org.neo4j.driver.exceptions.ClientException;
2931
import org.neo4j.driver.internal.util.EnabledOnNeo4jWith;
@@ -34,6 +36,7 @@
3436
import org.neo4j.driver.util.DatabaseExtension;
3537
import org.neo4j.driver.util.ParallelizableIT;
3638

39+
import static java.util.Collections.singletonList;
3740
import static org.hamcrest.CoreMatchers.containsString;
3841
import static org.hamcrest.CoreMatchers.instanceOf;
3942
import static org.hamcrest.Matchers.equalTo;
@@ -180,7 +183,7 @@ void shouldHaveFieldNamesInResult()
180183
RxStatementResult res = session.run( "CREATE (n:TestNode {name:'test'}) RETURN n" );
181184

182185
// Then
183-
StepVerifier.create( res.keys() ).expectNext( "n" ).expectComplete().verify();
186+
StepVerifier.create( res.keys() ).expectNext( singletonList( "n" ) ).expectComplete().verify();
184187
StepVerifier.create( res.records() )
185188
.assertNext( record -> {
186189
assertEquals( "[n]", record.keys().toString() );
@@ -209,7 +212,7 @@ void shouldOnlyErrorRecordAfterFailure()
209212
RxStatementResult result = session.run( "INVALID" );
210213

211214
// When
212-
Flux<String> keys = Flux.from( result.keys() );
215+
Flux<List<String>> keys = Flux.from( result.keys() );
213216
Flux<Record> records = Flux.from( result.records() );
214217
Mono<ResultSummary> summaryMono = Mono.from( result.summary() );
215218

@@ -238,7 +241,7 @@ void shouldErrorOnSummaryIfNoRecord() throws Throwable
238241
RxStatementResult result = session.run( "INVALID" );
239242

240243
// When
241-
Flux<String> keys = Flux.from( result.keys() );
244+
Flux<List<String>> keys = Flux.from( result.keys() );
242245
Mono<ResultSummary> summaryMono = Mono.from( result.summary() );
243246

244247
// Then
@@ -322,7 +325,7 @@ private void verifyCanAccessFullRecords( RxStatementResult res )
322325

323326
private void verifyCanAccessKeys( RxStatementResult res )
324327
{
325-
StepVerifier.create( res.keys() ).expectNext( "a" ).verifyComplete();
328+
StepVerifier.create( res.keys() ).expectNext( singletonList( "a" ) ).verifyComplete();
326329
}
327330

328331
private RxStatementResult sessionRunUnwind()

driver/src/test/java/org/neo4j/driver/integration/reactive/RxTransactionIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,7 @@ void shouldExposeStatementKeysForColumnsWithAliases()
361361
RxTransaction tx = await( Mono.from( session.beginTransaction() ) );
362362
RxStatementResult result = tx.run( "RETURN 1 AS one, 2 AS two, 3 AS three, 4 AS five" );
363363

364-
List<String> keys = await( result.keys() );
364+
List<String> keys = await( Mono.from( result.keys() ) );
365365
assertEquals( Arrays.asList( "one", "two", "three", "five" ), keys );
366366

367367
assertCanRollback( tx ); // you still need to rollback the tx as tx will not automatically closed
@@ -373,7 +373,7 @@ void shouldExposeStatementKeysForColumnsWithoutAliases()
373373
RxTransaction tx = await( Mono.from( session.beginTransaction() ) );
374374
RxStatementResult result = tx.run( "RETURN 1, 2, 3, 5" );
375375

376-
List<String> keys = await( result.keys() );
376+
List<String> keys = await( Mono.from( result.keys() ) );
377377
assertEquals( Arrays.asList( "1", "2", "3", "5" ), keys );
378378

379379
assertCanRollback( tx ); // you still need to rollback the tx as tx will not automatically closed

driver/src/test/java/org/neo4j/driver/internal/DirectDriverBoltKitTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.neo4j.driver.util.StubServer;
5151

5252
import static java.util.Arrays.asList;
53+
import static java.util.Collections.singletonList;
5354
import static java.util.concurrent.TimeUnit.SECONDS;
5455
import static org.hamcrest.core.IsEqual.equalTo;
5556
import static org.hamcrest.junit.MatcherAssert.assertThat;
@@ -426,11 +427,11 @@ void shouldDiscardIfPullNotFinished() throws Throwable
426427

427428
try ( Driver driver = GraphDatabase.driver( "bolt://localhost:9001", INSECURE_CONFIG ) )
428429
{
429-
Flux<String> keys = Flux.using(
430+
Flux<List<String>> keys = Flux.using(
430431
driver::rxSession,
431432
session -> session.readTransaction( tx -> tx.run( "UNWIND [1,2,3,4] AS a RETURN a" ).keys() ),
432433
RxSession::close );
433-
StepVerifier.create( keys ).expectNext( "a" ).verifyComplete();
434+
StepVerifier.create( keys ).expectNext( singletonList( "a" ) ).verifyComplete();
434435
}
435436
finally
436437
{

driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxStatementResultTest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,9 +97,7 @@ void shouldObtainKeys()
9797

9898
// When & Then
9999
StepVerifier.create( Flux.from( rxResult.keys() ) )
100-
.expectNext( "one" )
101-
.expectNext( "two" )
102-
.expectNext( "three" )
100+
.expectNext( Arrays.asList( "one", "two", "three" ) )
103101
.verifyComplete();
104102
}
105103

@@ -128,7 +126,7 @@ void shouldCancelKeys()
128126

129127
// When & Then
130128
StepVerifier.create( Flux.from( rxResult.keys() ).limitRate( 1 ).take( 1 ) )
131-
.expectNext( "one" )
129+
.expectNext( Arrays.asList( "one", "two", "three" ) )
132130
.verifyComplete();
133131
}
134132

0 commit comments

Comments
 (0)