Skip to content

Commit d0f9341

Browse files
committed
Async invocation of routing procedures
This commit makes `RoutingProcedureRunner` able to invoke routing procedures asynchronously using given connection future. Server version check is performed to determine which procedure to call.
1 parent 59f0d38 commit d0f9341

File tree

2 files changed

+167
-11
lines changed

2 files changed

+167
-11
lines changed

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureRunner.java

Lines changed: 56 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,17 @@
2020
package org.neo4j.driver.internal.cluster;
2121

2222
import java.util.List;
23+
import java.util.concurrent.CompletionException;
24+
import java.util.concurrent.CompletionStage;
2325

2426
import org.neo4j.driver.ResultResourcesHandler;
2527
import org.neo4j.driver.internal.NetworkSession;
28+
import org.neo4j.driver.internal.async.AsyncConnection;
29+
import org.neo4j.driver.internal.async.QueryRunner;
2630
import org.neo4j.driver.internal.spi.Connection;
2731
import org.neo4j.driver.v1.Record;
2832
import org.neo4j.driver.v1.Statement;
33+
import org.neo4j.driver.v1.StatementResultCursor;
2934
import org.neo4j.driver.v1.exceptions.ClientException;
3035

3136
import static org.neo4j.driver.internal.util.ServerVersion.v3_2_0;
@@ -47,16 +52,7 @@ public RoutingProcedureRunner( RoutingContext context )
4752

4853
public RoutingProcedureResponse run( Connection connection )
4954
{
50-
Statement procedure;
51-
if( version( connection.server().version() ).greaterThanOrEqual( v3_2_0 ) )
52-
{
53-
procedure = new Statement( "CALL " + GET_ROUTING_TABLE,
54-
parameters( GET_ROUTING_TABLE_PARAM, context.asMap() ) );
55-
}
56-
else
57-
{
58-
procedure = new Statement( "CALL " + GET_SERVERS );
59-
}
55+
Statement procedure = procedureStatement( connection.server().version() );
6056

6157
try
6258
{
@@ -68,8 +64,58 @@ public RoutingProcedureResponse run( Connection connection )
6864
}
6965
}
7066

67+
public CompletionStage<RoutingProcedureResponse> run( CompletionStage<AsyncConnection> connectionStage )
68+
{
69+
return connectionStage.thenCompose( connection ->
70+
{
71+
Statement procedure = procedureStatement( connection.serverInfo().version() );
72+
return runProcedure( connection, procedure ).handle( ( records, error ) ->
73+
{
74+
if ( error != null )
75+
{
76+
return handleError( procedure, error );
77+
}
78+
else
79+
{
80+
return new RoutingProcedureResponse( procedure, records );
81+
}
82+
} );
83+
} );
84+
}
85+
7186
List<Record> runProcedure( Connection connection, Statement procedure )
7287
{
7388
return NetworkSession.run( connection, procedure, ResultResourcesHandler.NO_OP ).list();
7489
}
90+
91+
CompletionStage<List<Record>> runProcedure( AsyncConnection connection, Statement procedure )
92+
{
93+
return QueryRunner.runAsync( connection, procedure )
94+
.thenCompose( StatementResultCursor::listAsync );
95+
}
96+
97+
private Statement procedureStatement( String serverVersionString )
98+
{
99+
if ( version( serverVersionString ).greaterThanOrEqual( v3_2_0 ) )
100+
{
101+
return new Statement( "CALL " + GET_ROUTING_TABLE,
102+
parameters( GET_ROUTING_TABLE_PARAM, context.asMap() ) );
103+
}
104+
else
105+
{
106+
return new Statement( "CALL " + GET_SERVERS );
107+
}
108+
}
109+
110+
private RoutingProcedureResponse handleError( Statement procedure, Throwable error )
111+
{
112+
if ( error instanceof ClientException )
113+
{
114+
return new RoutingProcedureResponse( procedure, error );
115+
}
116+
else
117+
{
118+
throw new CompletionException( error );
119+
}
120+
}
75121
}

driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingProcedureRunnerTest.java

Lines changed: 111 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,31 @@
2222

2323
import java.net.URI;
2424
import java.util.List;
25+
import java.util.concurrent.CompletionStage;
2526

27+
import org.neo4j.driver.internal.async.AsyncConnection;
2628
import org.neo4j.driver.internal.net.BoltServerAddress;
2729
import org.neo4j.driver.internal.spi.Connection;
2830
import org.neo4j.driver.internal.summary.InternalServerInfo;
2931
import org.neo4j.driver.v1.Record;
3032
import org.neo4j.driver.v1.Statement;
3133
import org.neo4j.driver.v1.Value;
34+
import org.neo4j.driver.v1.exceptions.ClientException;
3235

36+
import static java.util.Arrays.asList;
3337
import static java.util.Collections.EMPTY_MAP;
34-
import static org.hamcrest.MatcherAssert.assertThat;
38+
import static java.util.Collections.singletonList;
39+
import static java.util.concurrent.CompletableFuture.completedFuture;
3540
import static org.hamcrest.core.IsEqual.equalTo;
41+
import static org.junit.Assert.assertEquals;
42+
import static org.junit.Assert.assertFalse;
43+
import static org.junit.Assert.assertThat;
44+
import static org.junit.Assert.assertTrue;
45+
import static org.junit.Assert.fail;
3646
import static org.mockito.Mockito.mock;
3747
import static org.mockito.Mockito.when;
48+
import static org.neo4j.driver.internal.async.Futures.failedFuture;
49+
import static org.neo4j.driver.internal.async.Futures.getBlocking;
3850
import static org.neo4j.driver.internal.cluster.RoutingProcedureRunner.GET_ROUTING_TABLE;
3951
import static org.neo4j.driver.internal.cluster.RoutingProcedureRunner.GET_ROUTING_TABLE_PARAM;
4052
import static org.neo4j.driver.internal.cluster.RoutingProcedureRunner.GET_SERVERS;
@@ -58,6 +70,20 @@ public void shouldCallGetRoutingTableWithEmptyMap() throws Throwable
5870
new Statement( "CALL " + GET_ROUTING_TABLE, parameters( GET_ROUTING_TABLE_PARAM, EMPTY_MAP ) ) ) );
5971
}
6072

73+
@Test
74+
public void shouldCallGetRoutingTableWithEmptyMapAsync()
75+
{
76+
RoutingProcedureRunner runner = new TestRoutingProcedureRunner( RoutingContext.EMPTY,
77+
completedFuture( asList( mock( Record.class ), mock( Record.class ) ) ) );
78+
79+
RoutingProcedureResponse response = getBlocking( runner.run( connectionStage( "Neo4j/3.2.1" ) ) );
80+
81+
assertTrue( response.isSuccess() );
82+
assertEquals( 2, response.records().size() );
83+
assertEquals( new Statement( "CALL " + GET_ROUTING_TABLE, parameters( GET_ROUTING_TABLE_PARAM, EMPTY_MAP ) ),
84+
response.procedure() );
85+
}
86+
6187
@Test
6288
public void shouldCallGetRoutingTableWithParam() throws Throwable
6389
{
@@ -77,6 +103,23 @@ public void shouldCallGetRoutingTableWithParam() throws Throwable
77103
new Statement( "CALL " + GET_ROUTING_TABLE, expectedParams ) ) );
78104
}
79105

106+
@Test
107+
public void shouldCallGetRoutingTableWithParamAsync()
108+
{
109+
URI uri = URI.create( "bolt+routing://localhost/?key1=value1&key2=value2" );
110+
RoutingContext context = new RoutingContext( uri );
111+
112+
RoutingProcedureRunner runner = new TestRoutingProcedureRunner( context,
113+
completedFuture( singletonList( mock( Record.class ) ) ) );
114+
115+
RoutingProcedureResponse response = getBlocking( runner.run( connectionStage( "Neo4j/3.2.1" ) ) );
116+
117+
assertTrue( response.isSuccess() );
118+
assertEquals( 1, response.records().size() );
119+
Value expectedParams = parameters( GET_ROUTING_TABLE_PARAM, context.asMap() );
120+
assertEquals( new Statement( "CALL " + GET_ROUTING_TABLE, expectedParams ), response.procedure() );
121+
}
122+
80123
@Test
81124
public void shouldCallGetServers() throws Throwable
82125
{
@@ -95,11 +138,72 @@ public void shouldCallGetServers() throws Throwable
95138
new Statement( "CALL " + GET_SERVERS ) ) );
96139
}
97140

141+
@Test
142+
public void shouldCallGetServersAsync()
143+
{
144+
URI uri = URI.create( "bolt+routing://localhost/?key1=value1&key2=value2" );
145+
RoutingContext context = new RoutingContext( uri );
146+
147+
RoutingProcedureRunner runner = new TestRoutingProcedureRunner( context,
148+
completedFuture( asList( mock( Record.class ), mock( Record.class ) ) ) );
149+
150+
RoutingProcedureResponse response = getBlocking( runner.run( connectionStage( "Neo4j/3.1.8" ) ) );
151+
152+
assertTrue( response.isSuccess() );
153+
assertEquals( 2, response.records().size() );
154+
assertEquals( new Statement( "CALL " + GET_SERVERS ), response.procedure() );
155+
}
156+
157+
@Test
158+
public void shouldReturnFailedResponseOnClientException()
159+
{
160+
ClientException error = new ClientException( "Hi" );
161+
RoutingProcedureRunner runner = new TestRoutingProcedureRunner( RoutingContext.EMPTY, failedFuture( error ) );
162+
163+
RoutingProcedureResponse response = getBlocking( runner.run( connectionStage( "Neo4j/3.2.2" ) ) );
164+
165+
assertFalse( response.isSuccess() );
166+
assertEquals( error, response.error() );
167+
}
168+
169+
@Test
170+
public void shouldReturnFailedStageOnError()
171+
{
172+
Exception error = new Exception( "Hi" );
173+
RoutingProcedureRunner runner = new TestRoutingProcedureRunner( RoutingContext.EMPTY, failedFuture( error ) );
174+
175+
try
176+
{
177+
getBlocking( runner.run( connectionStage( "Neo4j/3.2.2" ) ) );
178+
fail( "Exception expected" );
179+
}
180+
catch ( Exception e )
181+
{
182+
assertEquals( error, e );
183+
}
184+
}
185+
186+
private static CompletionStage<AsyncConnection> connectionStage( String serverVersion )
187+
{
188+
AsyncConnection connection = mock( AsyncConnection.class );
189+
InternalServerInfo serverInfo = new InternalServerInfo( new BoltServerAddress( "123:45" ), serverVersion );
190+
when( connection.serverInfo() ).thenReturn( serverInfo );
191+
return completedFuture( connection );
192+
}
193+
98194
private static class TestRoutingProcedureRunner extends RoutingProcedureRunner
99195
{
196+
final CompletionStage<List<Record>> runProcedureResult;
197+
100198
TestRoutingProcedureRunner( RoutingContext context )
199+
{
200+
this( context, null );
201+
}
202+
203+
TestRoutingProcedureRunner( RoutingContext context, CompletionStage<List<Record>> runProcedureResult )
101204
{
102205
super( context );
206+
this.runProcedureResult = runProcedureResult;
103207
}
104208

105209
@Override
@@ -108,6 +212,12 @@ List<Record> runProcedure( Connection connection, Statement procedure )
108212
// I do not want any network traffic
109213
return null;
110214
}
215+
216+
@Override
217+
CompletionStage<List<Record>> runProcedure( AsyncConnection connection, Statement procedure )
218+
{
219+
return runProcedureResult;
220+
}
111221
}
112222

113223
}

0 commit comments

Comments
 (0)