Skip to content

Commit cadda6c

Browse files
author
Zhen Li
committed
Change RxResult#keys to return a single list of keys
This method always publishes one result. If there is no key, then an empty list, otherwise a list of keys inside.
1 parent ba5e5ac commit cadda6c

File tree

8 files changed

+54
-46
lines changed

8 files changed

+54
-46
lines changed

driver/src/main/java/org/neo4j/driver/exceptions/TransactionNestingException.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
package org.neo4j.driver.exceptions;
2020

2121
/**
22-
* This exception indicates a user is nesting new transaction with a on-going transaction (explicit and/or auto-commit).
22+
* This exception indicates a user is nesting new transaction with an on-going transaction (explicit and/or auto-commit).
2323
*/
2424
public class TransactionNestingException extends ClientException
2525
{

driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxStatementResult.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import reactor.core.publisher.Flux;
2323
import reactor.core.publisher.Mono;
2424

25+
import java.util.List;
2526
import java.util.concurrent.CompletionStage;
2627
import java.util.function.Supplier;
2728

@@ -44,10 +45,10 @@ public InternalRxStatementResult( Supplier<CompletionStage<RxStatementResultCurs
4445
}
4546

4647
@Override
47-
public Publisher<String> keys()
48+
public Publisher<List<String>> keys()
4849
{
49-
return Flux.defer( () -> Mono.fromCompletionStage( getCursorFuture() )
50-
.flatMapIterable( RxStatementResultCursor::keys ).onErrorMap( Futures::completionExceptionCause ) );
50+
return Mono.defer( () -> Mono.fromCompletionStage( getCursorFuture() ).map( RxStatementResultCursor::keys )
51+
.onErrorMap( Futures::completionExceptionCause ) );
5152
}
5253

5354
@Override

driver/src/main/java/org/neo4j/driver/reactive/RxStatementResult.java

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.reactivestreams.Subscriber;
2323
import org.reactivestreams.Subscription;
2424

25+
import java.util.List;
26+
2527
import org.neo4j.driver.Record;
2628
import org.neo4j.driver.Statement;
2729
import org.neo4j.driver.exceptions.ResultConsumedException;
@@ -43,12 +45,13 @@ public interface RxStatementResult
4345
{
4446
/**
4547
* Returns a cold publisher of keys.
48+
* This publisher always publishes one item - a list of keys. The list could be empty which indicates no keys in the result.
4649
* <p>
47-
* When this publisher is {@linkplain Publisher#subscribe(Subscriber) subscribed}, the query statement is sent to the server and executed.
48-
* This method does not start the record streaming nor publish query execution error.
50+
* When this publisher is {@linkplain Publisher#subscribe(Subscriber) subscribed}, the statement is sent to the server and executed.
51+
* This method does not start the record streaming nor publish statement execution result.
4952
* To retrieve the execution result, either {@link #records()} or {@link #consume()} can be used.
50-
* {@link #records()} starts record streaming and reports query execution error.
51-
* {@link #consume()} skips record streaming and directly reports query execution error.
53+
* {@link #records()} starts record streaming and reports statement execution result.
54+
* {@link #consume()} skips record streaming and directly reports statement execution result.
5255
* <p>
5356
* Consuming of execution result ensures the resources (such as network connections) used by this result is freed correctly.
5457
* Consuming the keys without consuming the execution result will result in resource leak.
@@ -60,23 +63,23 @@ public interface RxStatementResult
6063
* then the buffered keys will be returned.
6164
* @return a cold publisher of keys.
6265
*/
63-
Publisher<String> keys();
66+
Publisher<List<String>> keys();
6467

6568
/**
6669
* Returns a cold unicast publisher of records.
6770
* <p>
6871
* When the record publisher is {@linkplain Publisher#subscribe(Subscriber) subscribed},
69-
* the query statement is executed and the query result is streamed back as a record stream followed by a result summary.
72+
* the query statement is executed and the statement result is streamed back as a record stream followed by a result summary.
7073
* This record publisher publishes all records in the result and signals the completion.
7174
* However before completion or error reporting if any, a cleanup of result resources such as network connection will be carried out automatically.
7275
* <p>
7376
* Therefore the {@link Subscriber} of this record publisher shall wait for the termination signal (complete or error)
7477
* to ensure that the resources used by this result are released correctly.
75-
* Then the session is ready to be used to run more queries.
78+
* Then the session is ready to be used to run more statements.
7679
* <p>
7780
* Cancelling of the record streaming will immediately terminate the propagation of new records.
78-
* But it will not cancel the query execution.
79-
* As a result, a termination signal (complete or error) will still be sent to the {@link Subscriber} after the query execution is finished.
81+
* But it will not cancel statement execution on the server.
82+
* When the execution is finished, the {@link Subscriber} will be notified with a termination signal (complete or error).
8083
* <p>
8184
* The record publishing event by default runs in an Network IO thread, as a result no blocking operation is allowed in this thread.
8285
* Otherwise network IO might be blocked by application logic.
@@ -92,12 +95,12 @@ public interface RxStatementResult
9295
/**
9396
* Returns a cold publisher of result summary which arrives after all records.
9497
* <p>
95-
* {@linkplain Publisher#subscribe(Subscriber) Subscribing} the summary publisher results in the execution of the query followed by the result summary returned.
96-
* The summary publisher cancels record publishing if not yet subscribed and directly streams back the summary on query execution completion.
98+
* {@linkplain Publisher#subscribe(Subscriber) Subscribing} the summary publisher results in the execution of the statement followed by the result summary being returned.
99+
* The summary publisher cancels record publishing if not yet subscribed and directly streams back the summary on statement execution completion.
97100
* As a result, the invocation of {@link #records()} after this method, would receive an {@link ResultConsumedException}.
98101
* <p>
99-
* If subscribed after {@link #keys()}, then the result summary will be published after the query execution without streaming any record to client.
100-
* If subscribed after {@link #records()}, then the result summary will be published after the query execution and the streaming of records.
102+
* If subscribed after {@link #keys()}, then the result summary will be published after the statement execution without streaming any record to client.
103+
* If subscribed after {@link #records()}, then the result summary will be published after the statement execution and the streaming of records.
101104
* <p>
102105
* Usually, this method shall be chained after {@link #records()} to ensure that all records are processed before summary.
103106
* <p>

driver/src/test/java/org/neo4j/driver/integration/ConnectionHandlingIT.java

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,16 @@
3333
import java.util.concurrent.CompletableFuture;
3434
import java.util.concurrent.CompletionStage;
3535

36+
import org.neo4j.driver.AuthToken;
37+
import org.neo4j.driver.Config;
38+
import org.neo4j.driver.Driver;
39+
import org.neo4j.driver.Logging;
40+
import org.neo4j.driver.Record;
41+
import org.neo4j.driver.Session;
42+
import org.neo4j.driver.StatementResult;
43+
import org.neo4j.driver.StatementRunner;
44+
import org.neo4j.driver.Transaction;
45+
import org.neo4j.driver.exceptions.ClientException;
3646
import org.neo4j.driver.internal.BoltServerAddress;
3747
import org.neo4j.driver.internal.ConnectionSettings;
3848
import org.neo4j.driver.internal.DriverFactory;
@@ -47,23 +57,14 @@
4757
import org.neo4j.driver.internal.spi.ConnectionPool;
4858
import org.neo4j.driver.internal.util.Clock;
4959
import org.neo4j.driver.internal.util.EnabledOnNeo4jWith;
50-
import org.neo4j.driver.reactive.RxStatementResult;
5160
import org.neo4j.driver.reactive.RxSession;
61+
import org.neo4j.driver.reactive.RxStatementResult;
5262
import org.neo4j.driver.reactive.RxTransaction;
53-
import org.neo4j.driver.AuthToken;
54-
import org.neo4j.driver.Config;
55-
import org.neo4j.driver.Driver;
56-
import org.neo4j.driver.Logging;
57-
import org.neo4j.driver.Record;
58-
import org.neo4j.driver.Session;
59-
import org.neo4j.driver.StatementResult;
60-
import org.neo4j.driver.StatementRunner;
61-
import org.neo4j.driver.Transaction;
62-
import org.neo4j.driver.exceptions.ClientException;
6363
import org.neo4j.driver.summary.ResultSummary;
6464
import org.neo4j.driver.util.DatabaseExtension;
6565
import org.neo4j.driver.util.ParallelizableIT;
6666

67+
import static java.util.Collections.singletonList;
6768
import static org.junit.Assert.assertNotSame;
6869
import static org.junit.Assert.assertNull;
6970
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -75,10 +76,10 @@
7576
import static org.mockito.Mockito.spy;
7677
import static org.mockito.Mockito.times;
7778
import static org.mockito.Mockito.verify;
78-
import static org.neo4j.driver.internal.metrics.InternalAbstractMetrics.DEV_NULL_METRICS;
79-
import static org.neo4j.driver.internal.util.Neo4jFeature.BOLT_V4;
8079
import static org.neo4j.driver.Config.defaultConfig;
8180
import static org.neo4j.driver.Values.parameters;
81+
import static org.neo4j.driver.internal.metrics.InternalAbstractMetrics.DEV_NULL_METRICS;
82+
import static org.neo4j.driver.internal.util.Neo4jFeature.BOLT_V4;
8283
import static org.neo4j.driver.util.TestUtil.await;
8384

8485
@ParallelizableIT
@@ -317,7 +318,7 @@ void sessionCloseShouldReleaseConnectionUsedBySessionRun() throws Throwable
317318
RxStatementResult res = session.run( "UNWIND [1,2,3,4] AS a RETURN a" );
318319

319320
// When we only run but not pull
320-
StepVerifier.create( Flux.from( res.keys() ) ).expectNext( "a" ).verifyComplete();
321+
StepVerifier.create( Flux.from( res.keys() ) ).expectNext( singletonList( "a" ) ).verifyComplete();
321322
Connection connection1 = connectionPool.lastAcquiredConnectionSpy;
322323
verify( connection1, never() ).release();
323324

driver/src/test/java/org/neo4j/driver/integration/reactive/RxStatementResultIT.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import reactor.core.publisher.Mono;
2525
import reactor.test.StepVerifier;
2626

27+
import java.util.List;
28+
2729
import org.neo4j.driver.Record;
2830
import org.neo4j.driver.exceptions.ClientException;
2931
import org.neo4j.driver.internal.util.EnabledOnNeo4jWith;
@@ -34,6 +36,8 @@
3436
import org.neo4j.driver.util.DatabaseExtension;
3537
import org.neo4j.driver.util.ParallelizableIT;
3638

39+
import static java.util.Collections.emptyList;
40+
import static java.util.Collections.singletonList;
3741
import static org.hamcrest.CoreMatchers.containsString;
3842
import static org.hamcrest.CoreMatchers.instanceOf;
3943
import static org.hamcrest.Matchers.equalTo;
@@ -180,7 +184,7 @@ void shouldHaveFieldNamesInResult()
180184
RxStatementResult res = session.run( "CREATE (n:TestNode {name:'test'}) RETURN n" );
181185

182186
// Then
183-
StepVerifier.create( res.keys() ).expectNext( "n" ).expectComplete().verify();
187+
StepVerifier.create( res.keys() ).expectNext( singletonList( "n" ) ).expectComplete().verify();
184188
StepVerifier.create( res.records() )
185189
.assertNext( record -> {
186190
assertEquals( "[n]", record.keys().toString() );
@@ -197,7 +201,7 @@ void shouldReturnEmptyKeyAndRecordOnEmptyResult()
197201
RxStatementResult rs = session.run( "CREATE (n:Person {name:$name})", parameters( "name", "Tom Hanks" ) );
198202

199203
// Then
200-
StepVerifier.create( rs.keys() ).expectComplete().verify();
204+
StepVerifier.create( rs.keys() ).expectNext( emptyList() ).expectComplete().verify();
201205
StepVerifier.create( rs.records() ).expectComplete().verify();
202206
}
203207

@@ -209,12 +213,12 @@ void shouldOnlyErrorRecordAfterFailure()
209213
RxStatementResult result = session.run( "INVALID" );
210214

211215
// When
212-
Flux<String> keys = Flux.from( result.keys() );
216+
Flux<List<String>> keys = Flux.from( result.keys() );
213217
Flux<Record> records = Flux.from( result.records() );
214218
Mono<ResultSummary> summaryMono = Mono.from( result.consume() );
215219

216220
// Then
217-
StepVerifier.create( keys ).verifyComplete();
221+
StepVerifier.create( keys ).expectNext( emptyList() ).verifyComplete();
218222

219223
StepVerifier.create( records ).expectErrorSatisfies( error -> {
220224
assertThat( error, instanceOf( ClientException.class ) );
@@ -238,11 +242,11 @@ void shouldErrorOnSummaryIfNoRecord() throws Throwable
238242
RxStatementResult result = session.run( "INVALID" );
239243

240244
// When
241-
Flux<String> keys = Flux.from( result.keys() );
245+
Flux<List<String>> keys = Flux.from( result.keys() );
242246
Mono<ResultSummary> summaryMono = Mono.from( result.consume() );
243247

244248
// Then
245-
StepVerifier.create( keys ).verifyComplete();
249+
StepVerifier.create( keys ).expectNext( emptyList() ).verifyComplete();
246250

247251
StepVerifier.create( summaryMono ).expectErrorSatisfies( error -> {
248252
assertThat( error, instanceOf( ClientException.class ) );
@@ -322,7 +326,7 @@ private void verifyCanAccessFullRecords( RxStatementResult res )
322326

323327
private void verifyCanAccessKeys( RxStatementResult res )
324328
{
325-
StepVerifier.create( res.keys() ).expectNext( "a" ).verifyComplete();
329+
StepVerifier.create( res.keys() ).expectNext( singletonList( "a" ) ).verifyComplete();
326330
}
327331

328332
private RxStatementResult sessionRunUnwind()

driver/src/test/java/org/neo4j/driver/integration/reactive/RxTransactionIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,7 @@ void shouldExposeStatementKeysForColumnsWithAliases()
360360
RxTransaction tx = await( Mono.from( session.beginTransaction() ) );
361361
RxStatementResult result = tx.run( "RETURN 1 AS one, 2 AS two, 3 AS three, 4 AS five" );
362362

363-
List<String> keys = await( result.keys() );
363+
List<String> keys = await( Mono.from( result.keys() ) );
364364
assertEquals( Arrays.asList( "one", "two", "three", "five" ), keys );
365365

366366
assertCanRollback( tx ); // you still need to rollback the tx as tx will not automatically closed
@@ -372,7 +372,7 @@ void shouldExposeStatementKeysForColumnsWithoutAliases()
372372
RxTransaction tx = await( Mono.from( session.beginTransaction() ) );
373373
RxStatementResult result = tx.run( "RETURN 1, 2, 3, 5" );
374374

375-
List<String> keys = await( result.keys() );
375+
List<String> keys = await( Mono.from( result.keys() ) );
376376
assertEquals( Arrays.asList( "1", "2", "3", "5" ), keys );
377377

378378
assertCanRollback( tx ); // you still need to rollback the tx as tx will not automatically closed

driver/src/test/java/org/neo4j/driver/internal/DirectDriverBoltKitTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.neo4j.driver.util.StubServer;
5252

5353
import static java.util.Arrays.asList;
54+
import static java.util.Collections.singletonList;
5455
import static java.util.concurrent.TimeUnit.SECONDS;
5556
import static org.hamcrest.core.IsEqual.equalTo;
5657
import static org.hamcrest.junit.MatcherAssert.assertThat;
@@ -427,11 +428,11 @@ void shouldDiscardIfPullNotFinished() throws Throwable
427428

428429
try ( Driver driver = GraphDatabase.driver( "bolt://localhost:9001", INSECURE_CONFIG ) )
429430
{
430-
Flux<String> keys = Flux.usingWhen(
431+
Flux<List<String>> keys = Flux.usingWhen(
431432
Mono.fromSupplier( driver::rxSession ),
432433
session -> session.readTransaction( tx -> tx.run( "UNWIND [1,2,3,4] AS a RETURN a" ).keys() ),
433434
RxSession::close );
434-
StepVerifier.create( keys ).expectNext( "a" ).verifyComplete();
435+
StepVerifier.create( keys ).expectNext( singletonList( "a" ) ).verifyComplete();
435436
}
436437
finally
437438
{

driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxStatementResultTest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,9 +97,7 @@ void shouldObtainKeys()
9797

9898
// When & Then
9999
StepVerifier.create( Flux.from( rxResult.keys() ) )
100-
.expectNext( "one" )
101-
.expectNext( "two" )
102-
.expectNext( "three" )
100+
.expectNext( Arrays.asList( "one", "two", "three" ) )
103101
.verifyComplete();
104102
}
105103

@@ -128,7 +126,7 @@ void shouldCancelKeys()
128126

129127
// When & Then
130128
StepVerifier.create( Flux.from( rxResult.keys() ).limitRate( 1 ).take( 1 ) )
131-
.expectNext( "one" )
129+
.expectNext( Arrays.asList( "one", "two", "three" ) )
132130
.verifyComplete();
133131
}
134132

0 commit comments

Comments
 (0)