Skip to content

Commit dbaa2e7

Browse files
author
Zhen Li
committed
Change RxResult#keys to return a single list of keys
This method always publishes one result. If there is no key, then an empty list, otherwise a list of keys inside.
1 parent ead87f1 commit dbaa2e7

File tree

7 files changed

+53
-45
lines changed

7 files changed

+53
-45
lines changed

driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxStatementResult.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import reactor.core.publisher.Flux;
2323
import reactor.core.publisher.Mono;
2424

25+
import java.util.List;
2526
import java.util.concurrent.CompletionStage;
2627
import java.util.function.Supplier;
2728

@@ -44,10 +45,10 @@ public InternalRxStatementResult( Supplier<CompletionStage<RxStatementResultCurs
4445
}
4546

4647
@Override
47-
public Publisher<String> keys()
48+
public Publisher<List<String>> keys()
4849
{
49-
return Flux.defer( () -> Mono.fromCompletionStage( getCursorFuture() )
50-
.flatMapIterable( RxStatementResultCursor::keys ).onErrorMap( Futures::completionExceptionCause ) );
50+
return Mono.defer( () -> Mono.fromCompletionStage( getCursorFuture() ).map( RxStatementResultCursor::keys )
51+
.onErrorMap( Futures::completionExceptionCause ) );
5152
}
5253

5354
@Override

driver/src/main/java/org/neo4j/driver/reactive/RxStatementResult.java

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.reactivestreams.Subscriber;
2323
import org.reactivestreams.Subscription;
2424

25+
import java.util.List;
26+
2527
import org.neo4j.driver.Record;
2628
import org.neo4j.driver.Statement;
2729
import org.neo4j.driver.summary.ResultSummary;
@@ -42,12 +44,13 @@ public interface RxStatementResult
4244
{
4345
/**
4446
* Returns a cold publisher of keys.
47+
* This publisher always publishes one item - a list of keys. The list could be empty which indicates no keys in the result.
4548
* <p>
46-
* When this publisher is {@linkplain Publisher#subscribe(Subscriber) subscribed}, the query statement is sent to the server and executed.
47-
* This method does not start the record streaming nor publish query execution error.
49+
* When this publisher is {@linkplain Publisher#subscribe(Subscriber) subscribed}, the statement is sent to the server and executed.
50+
* This method does not start the record streaming nor publish statement execution result.
4851
* To retrieve the execution result, either {@link #records()} or {@link #summary()} can be used.
49-
* {@link #records()} starts record streaming and reports query execution error.
50-
* {@link #summary()} skips record streaming and directly reports query execution error.
52+
* {@link #records()} starts record streaming and reports statement execution result.
53+
* {@link #summary()} skips record streaming and directly reports statement execution result.
5154
* <p>
5255
* Consuming of execution result ensures the resources (such as network connections) used by this result is freed correctly.
5356
* Consuming the keys without consuming the execution result will result in resource leak.
@@ -59,23 +62,23 @@ public interface RxStatementResult
5962
* then the buffered keys will be returned.
6063
* @return a cold publisher of keys.
6164
*/
62-
Publisher<String> keys();
65+
Publisher<List<String>> keys();
6366

6467
/**
6568
* Returns a cold unicast publisher of records.
6669
* <p>
6770
* When the record publisher is {@linkplain Publisher#subscribe(Subscriber) subscribed},
68-
* the query statement is executed and the query result is streamed back as a record stream followed by a result summary.
71+
* the query statement is executed and the statement result is streamed back as a record stream followed by a result summary.
6972
* This record publisher publishes all records in the result and signals the completion.
7073
* However before completion or error reporting if any, a cleanup of result resources such as network connection will be carried out automatically.
7174
* <p>
7275
* Therefore the {@link Subscriber} of this record publisher shall wait for the termination signal (complete or error)
7376
* to ensure that the resources used by this result are released correctly.
74-
* Then the session is ready to be used to run more queries.
77+
* Then the session is ready to be used to run more statements.
7578
* <p>
7679
* Cancelling of the record streaming will immediately terminate the propagation of new records.
77-
* But it will not cancel the query execution.
78-
* As a result, a termination signal (complete or error) will still be sent to the {@link Subscriber} after the query execution is finished.
80+
* But it will not cancel statement execution on the server.
81+
* When the execution is finished, the {@link Subscriber} will be notified with a termination signal (complete or error).
7982
* <p>
8083
* The record publishing event by default runs in an Network IO thread, as a result no blocking operation is allowed in this thread.
8184
* Otherwise network IO might be blocked by application logic.
@@ -92,12 +95,12 @@ public interface RxStatementResult
9295
/**
9396
* Returns a cold publisher of result summary which arrives after all records.
9497
* <p>
95-
* {@linkplain Publisher#subscribe(Subscriber) Subscribing} the summary publisher results in the execution of the query followed by the result summary returned.
96-
* The summary publisher cancels record publishing if not yet subscribed and directly streams back the summary on query execution completion.
98+
* {@linkplain Publisher#subscribe(Subscriber) Subscribing} the summary publisher results in the execution of the statement followed by the result summary being returned.
99+
* The summary publisher cancels record publishing if not yet subscribed and directly streams back the summary on statement execution completion.
97100
* As a result, the invocation of {@link #records()} after this method, would receive an empty publisher.
98101
* <p>
99-
* If subscribed after {@link #keys()}, then the result summary will be published after the query execution without streaming any record to client.
100-
* If subscribed after {@link #records()}, then the result summary will be published after the query execution and the streaming of records.
102+
* If subscribed after {@link #keys()}, then the result summary will be published after the statement execution without streaming any record to client.
103+
* If subscribed after {@link #records()}, then the result summary will be published after the statement execution and the streaming of records.
101104
* <p>
102105
* Usually, this method shall be chained after {@link #records()} to ensure that all records are processed before summary.
103106
* <p>

driver/src/test/java/org/neo4j/driver/integration/ConnectionHandlingIT.java

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,16 @@
3333
import java.util.concurrent.CompletableFuture;
3434
import java.util.concurrent.CompletionStage;
3535

36+
import org.neo4j.driver.AuthToken;
37+
import org.neo4j.driver.Config;
38+
import org.neo4j.driver.Driver;
39+
import org.neo4j.driver.Logging;
40+
import org.neo4j.driver.Record;
41+
import org.neo4j.driver.Session;
42+
import org.neo4j.driver.StatementResult;
43+
import org.neo4j.driver.StatementRunner;
44+
import org.neo4j.driver.Transaction;
45+
import org.neo4j.driver.exceptions.ClientException;
3646
import org.neo4j.driver.internal.BoltServerAddress;
3747
import org.neo4j.driver.internal.ConnectionSettings;
3848
import org.neo4j.driver.internal.DriverFactory;
@@ -47,23 +57,14 @@
4757
import org.neo4j.driver.internal.spi.ConnectionPool;
4858
import org.neo4j.driver.internal.util.Clock;
4959
import org.neo4j.driver.internal.util.EnabledOnNeo4jWith;
50-
import org.neo4j.driver.reactive.RxStatementResult;
5160
import org.neo4j.driver.reactive.RxSession;
61+
import org.neo4j.driver.reactive.RxStatementResult;
5262
import org.neo4j.driver.reactive.RxTransaction;
53-
import org.neo4j.driver.AuthToken;
54-
import org.neo4j.driver.Config;
55-
import org.neo4j.driver.Driver;
56-
import org.neo4j.driver.Logging;
57-
import org.neo4j.driver.Record;
58-
import org.neo4j.driver.Session;
59-
import org.neo4j.driver.StatementResult;
60-
import org.neo4j.driver.StatementRunner;
61-
import org.neo4j.driver.Transaction;
62-
import org.neo4j.driver.exceptions.ClientException;
6363
import org.neo4j.driver.summary.ResultSummary;
6464
import org.neo4j.driver.util.DatabaseExtension;
6565
import org.neo4j.driver.util.ParallelizableIT;
6666

67+
import static java.util.Collections.singletonList;
6768
import static org.junit.Assert.assertNotSame;
6869
import static org.junit.Assert.assertNull;
6970
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -75,10 +76,10 @@
7576
import static org.mockito.Mockito.spy;
7677
import static org.mockito.Mockito.times;
7778
import static org.mockito.Mockito.verify;
78-
import static org.neo4j.driver.internal.metrics.InternalAbstractMetrics.DEV_NULL_METRICS;
79-
import static org.neo4j.driver.internal.util.Neo4jFeature.BOLT_V4;
8079
import static org.neo4j.driver.Config.defaultConfig;
8180
import static org.neo4j.driver.Values.parameters;
81+
import static org.neo4j.driver.internal.metrics.InternalAbstractMetrics.DEV_NULL_METRICS;
82+
import static org.neo4j.driver.internal.util.Neo4jFeature.BOLT_V4;
8283
import static org.neo4j.driver.util.TestUtil.await;
8384

8485
@ParallelizableIT
@@ -317,7 +318,7 @@ void sessionCloseShouldReleaseConnectionUsedBySessionRun() throws Throwable
317318
RxStatementResult res = session.run( "UNWIND [1,2,3,4] AS a RETURN a" );
318319

319320
// When we only run but not pull
320-
StepVerifier.create( Flux.from( res.keys() ) ).expectNext( "a" ).verifyComplete();
321+
StepVerifier.create( Flux.from( res.keys() ) ).expectNext( singletonList( "a" ) ).verifyComplete();
321322
Connection connection1 = connectionPool.lastAcquiredConnectionSpy;
322323
verify( connection1, never() ).release();
323324

driver/src/test/java/org/neo4j/driver/integration/reactive/RxStatementResultIT.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import reactor.core.publisher.Mono;
2525
import reactor.test.StepVerifier;
2626

27+
import java.util.List;
28+
2729
import org.neo4j.driver.Record;
2830
import org.neo4j.driver.exceptions.ClientException;
2931
import org.neo4j.driver.internal.util.EnabledOnNeo4jWith;
@@ -34,6 +36,8 @@
3436
import org.neo4j.driver.util.DatabaseExtension;
3537
import org.neo4j.driver.util.ParallelizableIT;
3638

39+
import static java.util.Collections.emptyList;
40+
import static java.util.Collections.singletonList;
3741
import static org.hamcrest.CoreMatchers.containsString;
3842
import static org.hamcrest.CoreMatchers.instanceOf;
3943
import static org.hamcrest.Matchers.equalTo;
@@ -180,7 +184,7 @@ void shouldHaveFieldNamesInResult()
180184
RxStatementResult res = session.run( "CREATE (n:TestNode {name:'test'}) RETURN n" );
181185

182186
// Then
183-
StepVerifier.create( res.keys() ).expectNext( "n" ).expectComplete().verify();
187+
StepVerifier.create( res.keys() ).expectNext( singletonList( "n" ) ).expectComplete().verify();
184188
StepVerifier.create( res.records() )
185189
.assertNext( record -> {
186190
assertEquals( "[n]", record.keys().toString() );
@@ -197,7 +201,7 @@ void shouldReturnEmptyKeyAndRecordOnEmptyResult()
197201
RxStatementResult rs = session.run( "CREATE (n:Person {name:$name})", parameters( "name", "Tom Hanks" ) );
198202

199203
// Then
200-
StepVerifier.create( rs.keys() ).expectComplete().verify();
204+
StepVerifier.create( rs.keys() ).expectNext( emptyList() ).expectComplete().verify();
201205
StepVerifier.create( rs.records() ).expectComplete().verify();
202206
}
203207

@@ -209,12 +213,12 @@ void shouldOnlyErrorRecordAfterFailure()
209213
RxStatementResult result = session.run( "INVALID" );
210214

211215
// When
212-
Flux<String> keys = Flux.from( result.keys() );
216+
Flux<List<String>> keys = Flux.from( result.keys() );
213217
Flux<Record> records = Flux.from( result.records() );
214218
Mono<ResultSummary> summaryMono = Mono.from( result.summary() );
215219

216220
// Then
217-
StepVerifier.create( keys ).verifyComplete();
221+
StepVerifier.create( keys ).expectNext( emptyList() ).verifyComplete();
218222

219223
StepVerifier.create( records ).expectErrorSatisfies( error -> {
220224
assertThat( error, instanceOf( ClientException.class ) );
@@ -238,11 +242,11 @@ void shouldErrorOnSummaryIfNoRecord() throws Throwable
238242
RxStatementResult result = session.run( "INVALID" );
239243

240244
// When
241-
Flux<String> keys = Flux.from( result.keys() );
245+
Flux<List<String>> keys = Flux.from( result.keys() );
242246
Mono<ResultSummary> summaryMono = Mono.from( result.summary() );
243247

244248
// Then
245-
StepVerifier.create( keys ).verifyComplete();
249+
StepVerifier.create( keys ).expectNext( emptyList() ).verifyComplete();
246250

247251
StepVerifier.create( summaryMono ).expectErrorSatisfies( error -> {
248252
assertThat( error, instanceOf( ClientException.class ) );
@@ -322,7 +326,7 @@ private void verifyCanAccessFullRecords( RxStatementResult res )
322326

323327
private void verifyCanAccessKeys( RxStatementResult res )
324328
{
325-
StepVerifier.create( res.keys() ).expectNext( "a" ).verifyComplete();
329+
StepVerifier.create( res.keys() ).expectNext( singletonList( "a" ) ).verifyComplete();
326330
}
327331

328332
private RxStatementResult sessionRunUnwind()

driver/src/test/java/org/neo4j/driver/integration/reactive/RxTransactionIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,7 @@ void shouldExposeStatementKeysForColumnsWithAliases()
361361
RxTransaction tx = await( Mono.from( session.beginTransaction() ) );
362362
RxStatementResult result = tx.run( "RETURN 1 AS one, 2 AS two, 3 AS three, 4 AS five" );
363363

364-
List<String> keys = await( result.keys() );
364+
List<String> keys = await( Mono.from( result.keys() ) );
365365
assertEquals( Arrays.asList( "one", "two", "three", "five" ), keys );
366366

367367
assertCanRollback( tx ); // you still need to rollback the tx as tx will not automatically closed
@@ -373,7 +373,7 @@ void shouldExposeStatementKeysForColumnsWithoutAliases()
373373
RxTransaction tx = await( Mono.from( session.beginTransaction() ) );
374374
RxStatementResult result = tx.run( "RETURN 1, 2, 3, 5" );
375375

376-
List<String> keys = await( result.keys() );
376+
List<String> keys = await( Mono.from( result.keys() ) );
377377
assertEquals( Arrays.asList( "1", "2", "3", "5" ), keys );
378378

379379
assertCanRollback( tx ); // you still need to rollback the tx as tx will not automatically closed

driver/src/test/java/org/neo4j/driver/internal/DirectDriverBoltKitTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.neo4j.driver.util.StubServer;
5151

5252
import static java.util.Arrays.asList;
53+
import static java.util.Collections.singletonList;
5354
import static java.util.concurrent.TimeUnit.SECONDS;
5455
import static org.hamcrest.core.IsEqual.equalTo;
5556
import static org.hamcrest.junit.MatcherAssert.assertThat;
@@ -426,11 +427,11 @@ void shouldDiscardIfPullNotFinished() throws Throwable
426427

427428
try ( Driver driver = GraphDatabase.driver( "bolt://localhost:9001", INSECURE_CONFIG ) )
428429
{
429-
Flux<String> keys = Flux.using(
430+
Flux<List<String>> keys = Flux.using(
430431
driver::rxSession,
431432
session -> session.readTransaction( tx -> tx.run( "UNWIND [1,2,3,4] AS a RETURN a" ).keys() ),
432433
RxSession::close );
433-
StepVerifier.create( keys ).expectNext( "a" ).verifyComplete();
434+
StepVerifier.create( keys ).expectNext( singletonList( "a" ) ).verifyComplete();
434435
}
435436
finally
436437
{

driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxStatementResultTest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,9 +97,7 @@ void shouldObtainKeys()
9797

9898
// When & Then
9999
StepVerifier.create( Flux.from( rxResult.keys() ) )
100-
.expectNext( "one" )
101-
.expectNext( "two" )
102-
.expectNext( "three" )
100+
.expectNext( Arrays.asList( "one", "two", "three" ) )
103101
.verifyComplete();
104102
}
105103

@@ -128,7 +126,7 @@ void shouldCancelKeys()
128126

129127
// When & Then
130128
StepVerifier.create( Flux.from( rxResult.keys() ).limitRate( 1 ).take( 1 ) )
131-
.expectNext( "one" )
129+
.expectNext( Arrays.asList( "one", "two", "three" ) )
132130
.verifyComplete();
133131
}
134132

0 commit comments

Comments
 (0)