Skip to content

Commit 2f3648c

Browse files
authored
Merge pull request #445 from lutovich/1.5-fix-todos
Addressed couple TODOs
2 parents 2d2df51 + 77f5f38 commit 2f3648c

26 files changed

+935
-133
lines changed

driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import java.util.Map;
2222
import java.util.concurrent.CompletableFuture;
23-
import java.util.concurrent.CompletionException;
2423
import java.util.concurrent.CompletionStage;
2524
import java.util.function.BiConsumer;
2625
import java.util.function.BiFunction;
@@ -49,7 +48,6 @@
4948
import static java.util.Collections.emptyMap;
5049
import static java.util.concurrent.CompletableFuture.completedFuture;
5150
import static org.neo4j.driver.internal.util.Futures.blockingGet;
52-
import static org.neo4j.driver.internal.util.Futures.completionErrorCause;
5351
import static org.neo4j.driver.internal.util.Futures.failedFuture;
5452
import static org.neo4j.driver.v1.Values.value;
5553

@@ -122,7 +120,7 @@ public CompletionStage<ExplicitTransaction> beginAsync( Bookmark initialBookmark
122120
{
123121
// release connection if begin failed, transaction can't be started
124122
connection.release();
125-
throw new CompletionException( Futures.completionErrorCause( beginError ) );
123+
throw Futures.asCompletionException( beginError );
126124
}
127125
return tx;
128126
} );
@@ -285,18 +283,12 @@ public CompletionStage<StatementResultCursor> runAsync( Statement statement )
285283
return (CompletionStage) run( statement, true );
286284
}
287285

288-
private CompletionStage<InternalStatementResultCursor> run( Statement statement, boolean asAsync )
286+
private CompletionStage<InternalStatementResultCursor> run( Statement statement, boolean waitForRunResponse )
289287
{
290288
ensureCanRunQueries();
291-
CompletionStage<InternalStatementResultCursor> cursorStage;
292-
if ( asAsync )
293-
{
294-
cursorStage = QueryRunner.runAsAsync( connection, statement, this );
295-
}
296-
else
297-
{
298-
cursorStage = QueryRunner.runAsBlocking( connection, statement, this );
299-
}
289+
CompletionStage<InternalStatementResultCursor> cursorStage =
290+
QueryRunner.runInTransaction( connection, statement,
291+
this, waitForRunResponse );
300292
resultCursors.add( cursorStage );
301293
return cursorStage;
302294
}
@@ -378,11 +370,11 @@ private BiFunction<Void,Throwable,Void> handleCommitOrRollback( Throwable cursor
378370
{
379371
if ( cursorFailure != null )
380372
{
381-
throw new CompletionException( completionErrorCause( cursorFailure ) );
373+
throw Futures.asCompletionException( cursorFailure );
382374
}
383375
else if ( commitOrRollbackError != null )
384376
{
385-
throw new CompletionException( completionErrorCause( commitOrRollbackError ) );
377+
throw Futures.asCompletionException( commitOrRollbackError );
386378
}
387379
else
388380
{

driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,6 @@ public CompletionStage<Void> closeAsync()
119119
return completedFuture( null );
120120
}
121121

122-
// todo: test this method and it's usage in DriverFactory
123122
public CompletionStage<Void> verifyConnectivity()
124123
{
125124
return sessionFactory.verifyConnectivity();

driver/src/main/java/org/neo4j/driver/internal/InternalStatementResultCursor.java

Lines changed: 5 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -34,37 +34,17 @@
3434
import org.neo4j.driver.v1.util.Function;
3535
import org.neo4j.driver.v1.util.Functions;
3636

37-
// todo: unit tests
3837
public class InternalStatementResultCursor implements StatementResultCursor
3938
{
40-
// todo: maybe smth better than these two string constants?
41-
private static final String BLOCKING_NAME = "result";
42-
private static final String ASYNC_NAME = "cursor";
43-
44-
private final String name;
4539
private final RunResponseHandler runResponseHandler;
4640
private final PullAllResponseHandler pullAllHandler;
4741

48-
private InternalStatementResultCursor( String name, RunResponseHandler runResponseHandler,
49-
PullAllResponseHandler pullAllHandler )
42+
public InternalStatementResultCursor( RunResponseHandler runResponseHandler, PullAllResponseHandler pullAllHandler )
5043
{
51-
this.name = name;
5244
this.runResponseHandler = runResponseHandler;
5345
this.pullAllHandler = pullAllHandler;
5446
}
5547

56-
public static InternalStatementResultCursor forBlockingRun( RunResponseHandler runResponseHandler,
57-
PullAllResponseHandler pullAllHandler )
58-
{
59-
return new InternalStatementResultCursor( BLOCKING_NAME, runResponseHandler, pullAllHandler );
60-
}
61-
62-
public static InternalStatementResultCursor forAsyncRun( RunResponseHandler runResponseHandler,
63-
PullAllResponseHandler pullAllHandler )
64-
{
65-
return new InternalStatementResultCursor( ASYNC_NAME, runResponseHandler, pullAllHandler );
66-
}
67-
6848
@Override
6949
public List<String> keys()
7050
{
@@ -97,14 +77,14 @@ public CompletionStage<Record> singleAsync()
9777
if ( firstRecord == null )
9878
{
9979
throw new NoSuchRecordException(
100-
"Cannot retrieve a single record, because this " + name + " is empty." );
80+
"Cannot retrieve a single record, because this result is empty." );
10181
}
10282
return nextAsync().thenApply( secondRecord ->
10383
{
10484
if ( secondRecord != null )
10585
{
10686
throw new NoSuchRecordException(
107-
"Expected a " + name + " with a single record, but this " + name + " " +
87+
"Expected a result with a single record, but this result " +
10888
"contains at least one more. Ensure your query returns only " +
10989
"one record." );
11090
}
@@ -156,7 +136,7 @@ private void internalForEachAsync( Consumer<Record> action, CompletableFuture<Vo
156136
// the caller thread to get StackOverflowError when result is large and buffered
157137
recordFuture.whenCompleteAsync( ( record, completionError ) ->
158138
{
159-
Throwable error = Futures.completionErrorCause( completionError );
139+
Throwable error = Futures.completionExceptionCause( completionError );
160140
if ( error != null )
161141
{
162142
resultFuture.completeExceptionally( error );
@@ -190,7 +170,7 @@ private <T> void internalListAsync( List<T> result, CompletableFuture<List<T>> r
190170
// the caller thread to get StackOverflowError when result is large and buffered
191171
recordFuture.whenCompleteAsync( ( record, completionError ) ->
192172
{
193-
Throwable error = Futures.completionErrorCause( completionError );
173+
Throwable error = Futures.completionExceptionCause( completionError );
194174
if ( error != null )
195175
{
196176
resultFuture.completeExceptionally( error );

driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -132,15 +132,15 @@ public CompletionStage<StatementResultCursor> runAsync( String statementText, Va
132132
@Override
133133
public StatementResult run( Statement statement )
134134
{
135-
StatementResultCursor cursor = blockingGet( runAsync( statement, false ) );
135+
StatementResultCursor cursor = blockingGet( run( statement, false ) );
136136
return new InternalStatementResult( cursor );
137137
}
138138

139139
@Override
140140
public CompletionStage<StatementResultCursor> runAsync( Statement statement )
141141
{
142142
//noinspection unchecked
143-
return (CompletionStage) runAsync( statement, true );
143+
return (CompletionStage) run( statement, true );
144144
}
145145

146146
@Override
@@ -169,11 +169,10 @@ public CompletionStage<Void> closeAsync()
169169
return cursor.failureAsync();
170170
} ).thenCompose( error -> releaseResources().thenApply( ignore ->
171171
{
172-
Throwable queryError = Futures.completionErrorCause( error );
173-
if ( queryError != null )
172+
if ( error != null )
174173
{
175174
// connection has been acquired and there is an unconsumed error in result cursor
176-
throw new CompletionException( queryError );
175+
throw Futures.asCompletionException( error );
177176
}
178177
else
179178
{
@@ -316,7 +315,7 @@ private <T> CompletionStage<T> transactionAsync( AccessMode mode, TransactionWor
316315

317316
txFuture.whenComplete( ( tx, completionError ) ->
318317
{
319-
Throwable error = Futures.completionErrorCause( completionError );
318+
Throwable error = Futures.completionExceptionCause( completionError );
320319
if ( error != null )
321320
{
322321
resultFuture.completeExceptionally( error );
@@ -337,7 +336,7 @@ private <T> void executeWork( CompletableFuture<T> resultFuture, ExplicitTransac
337336
CompletionStage<T> workFuture = safeExecuteWork( tx, work );
338337
workFuture.whenComplete( ( result, completionError ) ->
339338
{
340-
Throwable error = Futures.completionErrorCause( completionError );
339+
Throwable error = Futures.completionExceptionCause( completionError );
341340
if ( error != null )
342341
{
343342
rollbackTxAfterFailedTransactionWork( tx, resultFuture, error );
@@ -393,7 +392,7 @@ private <T> void closeTxAfterSucceededTransactionWork( ExplicitTransaction tx, C
393392
tx.success();
394393
tx.closeAsync().whenComplete( ( ignore, completionError ) ->
395394
{
396-
Throwable commitError = Futures.completionErrorCause( completionError );
395+
Throwable commitError = Futures.completionExceptionCause( completionError );
397396
if ( commitError != null )
398397
{
399398
resultFuture.completeExceptionally( commitError );
@@ -410,23 +409,13 @@ private <T> void closeTxAfterSucceededTransactionWork( ExplicitTransaction tx, C
410409
}
411410
}
412411

413-
private CompletionStage<InternalStatementResultCursor> runAsync( Statement statement, boolean waitForRunResponse )
412+
private CompletionStage<InternalStatementResultCursor> run( Statement statement, boolean waitForRunResponse )
414413
{
415414
ensureSessionIsOpen();
416415

417416
CompletionStage<InternalStatementResultCursor> newResultCursorStage = ensureNoOpenTxBeforeRunningQuery()
418417
.thenCompose( ignore -> acquireConnection( mode ) )
419-
.thenCompose( connection ->
420-
{
421-
if ( waitForRunResponse )
422-
{
423-
return QueryRunner.runAsAsync( connection, statement );
424-
}
425-
else
426-
{
427-
return QueryRunner.runAsBlocking( connection, statement );
428-
}
429-
} );
418+
.thenCompose( connection -> QueryRunner.runInSession( connection, statement, waitForRunResponse ) );
430419

431420
resultCursorStage = newResultCursorStage.exceptionally( error -> null );
432421

@@ -508,7 +497,7 @@ private CompletionStage<Void> rollbackTransaction()
508497
return completedFuture( null );
509498
} ).exceptionally( error ->
510499
{
511-
Throwable cause = Futures.completionErrorCause( error );
500+
Throwable cause = Futures.completionExceptionCause( error );
512501
logger.warn( "Active transaction rolled back with an error", cause );
513502
return null;
514503
} );

driver/src/main/java/org/neo4j/driver/internal/async/QueryRunner.java

Lines changed: 40 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -29,45 +29,60 @@
2929
import org.neo4j.driver.internal.handlers.SessionPullAllResponseHandler;
3030
import org.neo4j.driver.internal.handlers.TransactionPullAllResponseHandler;
3131
import org.neo4j.driver.internal.spi.Connection;
32+
import org.neo4j.driver.v1.Session;
3233
import org.neo4j.driver.v1.Statement;
34+
import org.neo4j.driver.v1.Transaction;
3335
import org.neo4j.driver.v1.Value;
3436

3537
import static java.util.concurrent.CompletableFuture.completedFuture;
3638
import static org.neo4j.driver.v1.Values.ofValue;
3739

38-
// todo: better method naming in this class and tests!
40+
/**
41+
* Helper to execute queries in {@link Session} and {@link Transaction}. Query execution consists of sending
42+
* RUN and PULL_ALL messages. Different handles are used to process responses for those messages, depending on if
43+
* they were executed in session or transaction.
44+
*/
3945
public final class QueryRunner
4046
{
4147
private QueryRunner()
4248
{
4349
}
4450

45-
public static CompletionStage<InternalStatementResultCursor> runAsBlocking( Connection connection,
46-
Statement statement )
47-
{
48-
return runAsBlocking( connection, statement, null );
49-
}
50-
51-
public static CompletionStage<InternalStatementResultCursor> runAsBlocking( Connection connection,
52-
Statement statement, ExplicitTransaction tx )
53-
{
54-
return runAsAsync( connection, statement, tx, false );
55-
}
56-
57-
public static CompletionStage<InternalStatementResultCursor> runAsAsync( Connection connection,
58-
Statement statement )
51+
/**
52+
* Execute given statement for {@link Session#run(Statement)}.
53+
*
54+
* @param connection the network connection to use.
55+
* @param statement the cypher to execute.
56+
* @param waitForRunResponse {@code true} for async query execution and {@code false} for blocking query
57+
* execution. Makes returned cursor stage be chained after the RUN response arrives. Needed to have statement
58+
* keys populated.
59+
* @return stage with cursor.
60+
*/
61+
public static CompletionStage<InternalStatementResultCursor> runInSession( Connection connection,
62+
Statement statement, boolean waitForRunResponse )
5963
{
60-
return runAsAsync( connection, statement, null );
64+
return run( connection, statement, null, waitForRunResponse );
6165
}
6266

63-
public static CompletionStage<InternalStatementResultCursor> runAsAsync( Connection connection,
64-
Statement statement, ExplicitTransaction tx )
67+
/**
68+
* Execute given statement for {@link Transaction#run(Statement)}.
69+
*
70+
* @param connection the network connection to use.
71+
* @param statement the cypher to execute.
72+
* @param tx the transaction which executes the query.
73+
* @param waitForRunResponse {@code true} for async query execution and {@code false} for blocking query
74+
* execution. Makes returned cursor stage be chained after the RUN response arrives. Needed to have statement
75+
* keys populated.
76+
* @return stage with cursor.
77+
*/
78+
public static CompletionStage<InternalStatementResultCursor> runInTransaction( Connection connection,
79+
Statement statement, ExplicitTransaction tx, boolean waitForRunResponse )
6580
{
66-
return runAsAsync( connection, statement, tx, true );
81+
return run( connection, statement, tx, waitForRunResponse );
6782
}
6883

69-
private static CompletionStage<InternalStatementResultCursor> runAsAsync( Connection connection,
70-
Statement statement, ExplicitTransaction tx, boolean async )
84+
private static CompletionStage<InternalStatementResultCursor> run( Connection connection,
85+
Statement statement, ExplicitTransaction tx, boolean waitForRunResponse )
7186
{
7287
String query = statement.text();
7388
Map<String,Value> params = statement.parameters().asMap( ofValue() );
@@ -78,15 +93,15 @@ private static CompletionStage<InternalStatementResultCursor> runAsAsync( Connec
7893

7994
connection.runAndFlush( query, params, runHandler, pullAllHandler );
8095

81-
if ( async )
96+
if ( waitForRunResponse )
8297
{
83-
// wait for response of RUN before proceeding when execution is async
98+
// wait for response of RUN before proceeding
8499
return runCompletedFuture.thenApply( ignore ->
85-
InternalStatementResultCursor.forAsyncRun( runHandler, pullAllHandler ) );
100+
new InternalStatementResultCursor( runHandler, pullAllHandler ) );
86101
}
87102
else
88103
{
89-
return completedFuture( InternalStatementResultCursor.forBlockingRun( runHandler, pullAllHandler ) );
104+
return completedFuture( new InternalStatementResultCursor( runHandler, pullAllHandler ) );
90105
}
91106
}
92107

driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ private EventLoopGroup eventLoopGroup()
180180

181181
private void processAcquisitionError( Throwable error )
182182
{
183-
Throwable cause = Futures.completionErrorCause( error );
183+
Throwable cause = Futures.completionExceptionCause( error );
184184
if ( cause != null )
185185
{
186186
if ( cause instanceof TimeoutException )

0 commit comments

Comments
 (0)