Skip to content

Commit 5458c82

Browse files
committed
Docs, tests and better method names in QueryRunner
1 parent 8d61cb8 commit 5458c82

File tree

5 files changed

+214
-48
lines changed

5 files changed

+214
-48
lines changed

driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -285,18 +285,12 @@ public CompletionStage<StatementResultCursor> runAsync( Statement statement )
285285
return (CompletionStage) run( statement, true );
286286
}
287287

288-
private CompletionStage<InternalStatementResultCursor> run( Statement statement, boolean asAsync )
288+
private CompletionStage<InternalStatementResultCursor> run( Statement statement, boolean waitForRunResponse )
289289
{
290290
ensureCanRunQueries();
291-
CompletionStage<InternalStatementResultCursor> cursorStage;
292-
if ( asAsync )
293-
{
294-
cursorStage = QueryRunner.runAsAsync( connection, statement, this );
295-
}
296-
else
297-
{
298-
cursorStage = QueryRunner.runAsBlocking( connection, statement, this );
299-
}
291+
CompletionStage<InternalStatementResultCursor> cursorStage =
292+
QueryRunner.runInTransaction( connection, statement,
293+
this, waitForRunResponse );
300294
resultCursors.add( cursorStage );
301295
return cursorStage;
302296
}

driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -132,15 +132,15 @@ public CompletionStage<StatementResultCursor> runAsync( String statementText, Va
132132
@Override
133133
public StatementResult run( Statement statement )
134134
{
135-
StatementResultCursor cursor = blockingGet( runAsync( statement, false ) );
135+
StatementResultCursor cursor = blockingGet( run( statement, false ) );
136136
return new InternalStatementResult( cursor );
137137
}
138138

139139
@Override
140140
public CompletionStage<StatementResultCursor> runAsync( Statement statement )
141141
{
142142
//noinspection unchecked
143-
return (CompletionStage) runAsync( statement, true );
143+
return (CompletionStage) run( statement, true );
144144
}
145145

146146
@Override
@@ -410,23 +410,13 @@ private <T> void closeTxAfterSucceededTransactionWork( ExplicitTransaction tx, C
410410
}
411411
}
412412

413-
private CompletionStage<InternalStatementResultCursor> runAsync( Statement statement, boolean waitForRunResponse )
413+
private CompletionStage<InternalStatementResultCursor> run( Statement statement, boolean waitForRunResponse )
414414
{
415415
ensureSessionIsOpen();
416416

417417
CompletionStage<InternalStatementResultCursor> newResultCursorStage = ensureNoOpenTxBeforeRunningQuery()
418418
.thenCompose( ignore -> acquireConnection( mode ) )
419-
.thenCompose( connection ->
420-
{
421-
if ( waitForRunResponse )
422-
{
423-
return QueryRunner.runAsAsync( connection, statement );
424-
}
425-
else
426-
{
427-
return QueryRunner.runAsBlocking( connection, statement );
428-
}
429-
} );
419+
.thenCompose( connection -> QueryRunner.runInSession( connection, statement, waitForRunResponse ) );
430420

431421
resultCursorStage = newResultCursorStage.exceptionally( error -> null );
432422

driver/src/main/java/org/neo4j/driver/internal/async/QueryRunner.java

Lines changed: 38 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -29,45 +29,60 @@
2929
import org.neo4j.driver.internal.handlers.SessionPullAllResponseHandler;
3030
import org.neo4j.driver.internal.handlers.TransactionPullAllResponseHandler;
3131
import org.neo4j.driver.internal.spi.Connection;
32+
import org.neo4j.driver.v1.Session;
3233
import org.neo4j.driver.v1.Statement;
34+
import org.neo4j.driver.v1.Transaction;
3335
import org.neo4j.driver.v1.Value;
3436

3537
import static java.util.concurrent.CompletableFuture.completedFuture;
3638
import static org.neo4j.driver.v1.Values.ofValue;
3739

38-
// todo: better method naming in this class and tests!
40+
/**
41+
* Helper to execute queries in {@link Session} and {@link Transaction}. Query execution consists of sending
42+
* RUN and PULL_ALL messages. Different handles are used to process responses for those messages, depending on if
43+
* they were executed in session or transaction.
44+
*/
3945
public final class QueryRunner
4046
{
4147
private QueryRunner()
4248
{
4349
}
4450

45-
public static CompletionStage<InternalStatementResultCursor> runAsBlocking( Connection connection,
46-
Statement statement )
47-
{
48-
return runAsBlocking( connection, statement, null );
49-
}
50-
51-
public static CompletionStage<InternalStatementResultCursor> runAsBlocking( Connection connection,
52-
Statement statement, ExplicitTransaction tx )
53-
{
54-
return runAsAsync( connection, statement, tx, false );
55-
}
56-
57-
public static CompletionStage<InternalStatementResultCursor> runAsAsync( Connection connection,
58-
Statement statement )
51+
/**
52+
* Execute given statement for {@link Session#run(Statement)}.
53+
*
54+
* @param connection the network connection to use.
55+
* @param statement the cypher to execute.
56+
* @param waitForRunResponse {@code true} for async query execution and {@code false} for blocking query
57+
* execution. Makes returned cursor stage be chained after the RUN response arrives. Needed to have statement
58+
* keys populated.
59+
* @return stage with cursor.
60+
*/
61+
public static CompletionStage<InternalStatementResultCursor> runInSession( Connection connection,
62+
Statement statement, boolean waitForRunResponse )
5963
{
60-
return runAsAsync( connection, statement, null );
64+
return run( connection, statement, null, waitForRunResponse );
6165
}
6266

63-
public static CompletionStage<InternalStatementResultCursor> runAsAsync( Connection connection,
64-
Statement statement, ExplicitTransaction tx )
67+
/**
68+
* Execute given statement for {@link Transaction#run(Statement)}.
69+
*
70+
* @param connection the network connection to use.
71+
* @param statement the cypher to execute.
72+
* @param tx the transaction which executes the query.
73+
* @param waitForRunResponse {@code true} for async query execution and {@code false} for blocking query
74+
* execution. Makes returned cursor stage be chained after the RUN response arrives. Needed to have statement
75+
* keys populated.
76+
* @return stage with cursor.
77+
*/
78+
public static CompletionStage<InternalStatementResultCursor> runInTransaction( Connection connection,
79+
Statement statement, ExplicitTransaction tx, boolean waitForRunResponse )
6580
{
66-
return runAsAsync( connection, statement, tx, true );
81+
return run( connection, statement, tx, waitForRunResponse );
6782
}
6883

69-
private static CompletionStage<InternalStatementResultCursor> runAsAsync( Connection connection,
70-
Statement statement, ExplicitTransaction tx, boolean async )
84+
private static CompletionStage<InternalStatementResultCursor> run( Connection connection,
85+
Statement statement, ExplicitTransaction tx, boolean waitForRunResponse )
7186
{
7287
String query = statement.text();
7388
Map<String,Value> params = statement.parameters().asMap( ofValue() );
@@ -78,9 +93,9 @@ private static CompletionStage<InternalStatementResultCursor> runAsAsync( Connec
7893

7994
connection.runAndFlush( query, params, runHandler, pullAllHandler );
8095

81-
if ( async )
96+
if ( waitForRunResponse )
8297
{
83-
// wait for response of RUN before proceeding when execution is async
98+
// wait for response of RUN before proceeding
8499
return runCompletedFuture.thenApply( ignore ->
85100
InternalStatementResultCursor.forAsyncRun( runHandler, pullAllHandler ) );
86101
}

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureRunner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public CompletionStage<RoutingProcedureResponse> run( CompletionStage<Connection
6161

6262
CompletionStage<List<Record>> runProcedure( Connection connection, Statement procedure )
6363
{
64-
return QueryRunner.runAsAsync( connection, procedure )
64+
return QueryRunner.runInSession( connection, procedure, true )
6565
.thenCompose( StatementResultCursor::listAsync );
6666
}
6767

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
/*
2+
* Copyright (c) 2002-2017 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.async;
20+
21+
import org.junit.Test;
22+
import org.mockito.ArgumentCaptor;
23+
24+
import java.util.Map;
25+
import java.util.concurrent.CompletableFuture;
26+
import java.util.concurrent.CompletionStage;
27+
28+
import org.neo4j.driver.internal.ExplicitTransaction;
29+
import org.neo4j.driver.internal.InternalStatementResultCursor;
30+
import org.neo4j.driver.internal.handlers.RunResponseHandler;
31+
import org.neo4j.driver.internal.handlers.SessionPullAllResponseHandler;
32+
import org.neo4j.driver.internal.handlers.TransactionPullAllResponseHandler;
33+
import org.neo4j.driver.internal.spi.Connection;
34+
import org.neo4j.driver.internal.spi.ResponseHandler;
35+
import org.neo4j.driver.v1.Statement;
36+
import org.neo4j.driver.v1.Value;
37+
38+
import static java.util.Collections.emptyMap;
39+
import static java.util.Collections.singletonMap;
40+
import static org.hamcrest.Matchers.instanceOf;
41+
import static org.junit.Assert.assertFalse;
42+
import static org.junit.Assert.assertNotNull;
43+
import static org.junit.Assert.assertThat;
44+
import static org.junit.Assert.assertTrue;
45+
import static org.mockito.Matchers.eq;
46+
import static org.mockito.Mockito.mock;
47+
import static org.mockito.Mockito.verify;
48+
import static org.neo4j.driver.internal.async.QueryRunner.runInSession;
49+
import static org.neo4j.driver.internal.async.QueryRunner.runInTransaction;
50+
import static org.neo4j.driver.v1.Values.value;
51+
52+
public class QueryRunnerTest
53+
{
54+
private static final String QUERY = "RETURN $x";
55+
private static final Map<String,Value> PARAMS = singletonMap( "x", value( 42 ) );
56+
private static final Statement STATEMENT = new Statement( QUERY, value( PARAMS ) );
57+
58+
@Test
59+
public void shouldRunInSessionWithoutWaitingForRunResponse() throws Exception
60+
{
61+
testNotWaitingForRunResponse( true );
62+
}
63+
64+
@Test
65+
public void shouldRunInSessionAndWaitForSuccessRunResponse() throws Exception
66+
{
67+
testWaitingForRunResponse( true, true );
68+
}
69+
70+
@Test
71+
public void shouldRunInSessionAndWaitForFailureRunResponse() throws Exception
72+
{
73+
testWaitingForRunResponse( false, true );
74+
}
75+
76+
@Test
77+
public void shouldRunInTransactionWithoutWaitingForRunResponse() throws Exception
78+
{
79+
testNotWaitingForRunResponse( false );
80+
}
81+
82+
@Test
83+
public void shouldRunInTransactionAndWaitForSuccessRunResponse() throws Exception
84+
{
85+
testWaitingForRunResponse( true, false );
86+
}
87+
88+
@Test
89+
public void shouldRunInTransactionAndWaitForFailureRunResponse() throws Exception
90+
{
91+
testWaitingForRunResponse( false, false );
92+
}
93+
94+
private static void testNotWaitingForRunResponse( boolean session ) throws Exception
95+
{
96+
Connection connection = mock( Connection.class );
97+
98+
CompletionStage<InternalStatementResultCursor> cursorStage;
99+
if ( session )
100+
{
101+
102+
cursorStage = runInSession( connection, STATEMENT, false );
103+
}
104+
else
105+
{
106+
cursorStage = runInTransaction( connection, STATEMENT, mock( ExplicitTransaction.class ), false );
107+
}
108+
CompletableFuture<InternalStatementResultCursor> cursorFuture = cursorStage.toCompletableFuture();
109+
110+
assertTrue( cursorFuture.isDone() );
111+
assertNotNull( cursorFuture.get() );
112+
verifyRunInvoked( connection, session );
113+
}
114+
115+
private static void testWaitingForRunResponse( boolean success, boolean session ) throws Exception
116+
{
117+
Connection connection = mock( Connection.class );
118+
119+
CompletionStage<InternalStatementResultCursor> cursorStage;
120+
if ( session )
121+
{
122+
cursorStage = runInSession( connection, STATEMENT, true );
123+
}
124+
else
125+
{
126+
cursorStage = runInTransaction( connection, STATEMENT, mock( ExplicitTransaction.class ), true );
127+
}
128+
CompletableFuture<InternalStatementResultCursor> cursorFuture = cursorStage.toCompletableFuture();
129+
130+
assertFalse( cursorFuture.isDone() );
131+
ResponseHandler runResponseHandler = verifyRunInvoked( connection, session );
132+
133+
if ( success )
134+
{
135+
runResponseHandler.onSuccess( emptyMap() );
136+
}
137+
else
138+
{
139+
runResponseHandler.onFailure( new RuntimeException() );
140+
}
141+
142+
assertTrue( cursorFuture.isDone() );
143+
assertNotNull( cursorFuture.get() );
144+
}
145+
146+
private static ResponseHandler verifyRunInvoked( Connection connection, boolean session )
147+
{
148+
ArgumentCaptor<ResponseHandler> runHandlerCaptor = ArgumentCaptor.forClass( ResponseHandler.class );
149+
ArgumentCaptor<ResponseHandler> pullAllHandlerCaptor = ArgumentCaptor.forClass( ResponseHandler.class );
150+
151+
verify( connection ).runAndFlush( eq( QUERY ), eq( PARAMS ),
152+
runHandlerCaptor.capture(), pullAllHandlerCaptor.capture() );
153+
154+
assertThat( runHandlerCaptor.getValue(), instanceOf( RunResponseHandler.class ) );
155+
156+
if ( session )
157+
{
158+
assertThat( pullAllHandlerCaptor.getValue(), instanceOf( SessionPullAllResponseHandler.class ) );
159+
}
160+
else
161+
{
162+
assertThat( pullAllHandlerCaptor.getValue(), instanceOf( TransactionPullAllResponseHandler.class ) );
163+
}
164+
165+
return runHandlerCaptor.getValue();
166+
}
167+
}

0 commit comments

Comments
 (0)