Skip to content

Commit 13b5177

Browse files
committed
Refactor RoutingProcedureRunner
To return both result and invoked procedure at the same time. Previously procedure had to be accessed separately via getter. This is a preparation for async, where full result will make it easier to chain futures.
1 parent 24e87be commit 13b5177

File tree

6 files changed

+242
-44
lines changed

6 files changed

+242
-44
lines changed

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureClusterCompositionProvider.java

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.neo4j.driver.v1.Logger;
2626
import org.neo4j.driver.v1.Record;
2727
import org.neo4j.driver.v1.Statement;
28-
import org.neo4j.driver.v1.exceptions.ClientException;
2928
import org.neo4j.driver.v1.exceptions.ProtocolException;
3029
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
3130
import org.neo4j.driver.v1.exceptions.value.ValueException;
@@ -38,39 +37,36 @@ public class RoutingProcedureClusterCompositionProvider implements ClusterCompos
3837

3938
private final Clock clock;
4039
private final Logger log;
41-
private final RoutingProcedureRunner getServersRunner;
40+
private final RoutingProcedureRunner routingProcedureRunner;
4241

4342
public RoutingProcedureClusterCompositionProvider( Clock clock, Logger log, RoutingSettings settings )
4443
{
4544
this( clock, log, new RoutingProcedureRunner( settings.routingContext() ) );
4645
}
4746

48-
RoutingProcedureClusterCompositionProvider( Clock clock, Logger log, RoutingProcedureRunner getServersRunner )
47+
RoutingProcedureClusterCompositionProvider( Clock clock, Logger log, RoutingProcedureRunner routingProcedureRunner )
4948
{
5049
this.clock = clock;
5150
this.log = log;
52-
this.getServersRunner = getServersRunner;
51+
this.routingProcedureRunner = routingProcedureRunner;
5352
}
5453

5554
@Override
5655
public ClusterCompositionResponse getClusterComposition( Connection connection )
5756
{
58-
List<Record> records;
57+
RoutingProcedureResponse response = routingProcedureRunner.run( connection );
5958

60-
// failed to invoke procedure
61-
try
62-
{
63-
records = getServersRunner.run( connection );
64-
}
65-
catch ( ClientException e )
59+
if ( !response.isSuccess() )
6660
{
6761
return new ClusterCompositionResponse.Failure( new ServiceUnavailableException( format(
6862
"Failed to run '%s' on server. " +
6963
"Please make sure that there is a Neo4j 3.1+ causal cluster up running.",
70-
invokedProcedureString() ), e
64+
invokedProcedureString( response ) ), response.error()
7165
) );
7266
}
7367

68+
List<Record> records = response.records();
69+
7470
log.info( "Got getServers response: %s", records );
7571
long now = clock.millis();
7672

@@ -79,7 +75,7 @@ public ClusterCompositionResponse getClusterComposition( Connection connection )
7975
{
8076
return new ClusterCompositionResponse.Failure( new ProtocolException( format(
8177
PROTOCOL_ERROR_MESSAGE + "records received '%s' is too few or too many.",
82-
invokedProcedureString(), records.size() ) ) );
78+
invokedProcedureString( response ), records.size() ) ) );
8379
}
8480

8581
// failed to parse the record
@@ -92,24 +88,24 @@ public ClusterCompositionResponse getClusterComposition( Connection connection )
9288
{
9389
return new ClusterCompositionResponse.Failure( new ProtocolException( format(
9490
PROTOCOL_ERROR_MESSAGE + "unparsable record received.",
95-
invokedProcedureString() ), e ) );
91+
invokedProcedureString( response ) ), e ) );
9692
}
9793

9894
// the cluster result is not a legal reply
9995
if ( !cluster.hasRoutersAndReaders() )
10096
{
10197
return new ClusterCompositionResponse.Failure( new ProtocolException( format(
10298
PROTOCOL_ERROR_MESSAGE + "no router or reader found in response.",
103-
invokedProcedureString() ) ) );
99+
invokedProcedureString( response ) ) ) );
104100
}
105101

106102
// all good
107103
return new ClusterCompositionResponse.Success( cluster );
108104
}
109105

110-
private String invokedProcedureString()
106+
private static String invokedProcedureString( RoutingProcedureResponse response )
111107
{
112-
Statement statement = getServersRunner.invokedProcedure();
108+
Statement statement = response.procedure();
113109
return statement.text() + " " + statement.parameters();
114110
}
115111
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright (c) 2002-2017 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.cluster;
20+
21+
import java.util.List;
22+
23+
import org.neo4j.driver.v1.Record;
24+
import org.neo4j.driver.v1.Statement;
25+
26+
public class RoutingProcedureResponse
27+
{
28+
private final Statement procedure;
29+
private final List<Record> records;
30+
private final Throwable error;
31+
32+
public RoutingProcedureResponse( Statement procedure, List<Record> records )
33+
{
34+
this( procedure, records, null );
35+
}
36+
37+
public RoutingProcedureResponse( Statement procedure, Throwable error )
38+
{
39+
this( procedure, null, error );
40+
}
41+
42+
private RoutingProcedureResponse( Statement procedure, List<Record> records, Throwable error )
43+
{
44+
this.procedure = procedure;
45+
this.records = records;
46+
this.error = error;
47+
}
48+
49+
public boolean isSuccess()
50+
{
51+
return records != null;
52+
}
53+
54+
public Statement procedure()
55+
{
56+
return procedure;
57+
}
58+
59+
public List<Record> records()
60+
{
61+
if ( !isSuccess() )
62+
{
63+
throw new IllegalStateException( "Can't access records of a failed result", error );
64+
}
65+
return records;
66+
}
67+
68+
public Throwable error()
69+
{
70+
if ( isSuccess() )
71+
{
72+
throw new IllegalStateException( "Can't access error of a succeeded result " + records );
73+
}
74+
return error;
75+
}
76+
}

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureRunner.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.neo4j.driver.internal.spi.Connection;
2727
import org.neo4j.driver.v1.Record;
2828
import org.neo4j.driver.v1.Statement;
29+
import org.neo4j.driver.v1.exceptions.ClientException;
2930

3031
import static org.neo4j.driver.internal.util.ServerVersion.v3_2_0;
3132
import static org.neo4j.driver.internal.util.ServerVersion.version;
@@ -38,35 +39,37 @@ public class RoutingProcedureRunner
3839
static final String GET_ROUTING_TABLE = "dbms.cluster.routing.getRoutingTable({" + GET_ROUTING_TABLE_PARAM + "})";
3940

4041
private final RoutingContext context;
41-
private Statement invokedProcedure;
4242

4343
public RoutingProcedureRunner( RoutingContext context )
4444
{
4545
this.context = context;
4646
}
4747

48-
public List<Record> run( Connection connection )
48+
public RoutingProcedureResponse run( Connection connection )
4949
{
50+
Statement procedure;
5051
if( version( connection.server().version() ).greaterThanOrEqual( v3_2_0 ) )
5152
{
52-
invokedProcedure = new Statement( "CALL " + GET_ROUTING_TABLE,
53+
procedure = new Statement( "CALL " + GET_ROUTING_TABLE,
5354
parameters( GET_ROUTING_TABLE_PARAM, context.asMap() ) );
5455
}
5556
else
5657
{
57-
invokedProcedure = new Statement( "CALL " + GET_SERVERS );
58+
procedure = new Statement( "CALL " + GET_SERVERS );
5859
}
5960

60-
return runProcedure( connection, invokedProcedure );
61+
try
62+
{
63+
return new RoutingProcedureResponse( procedure, runProcedure( connection, procedure ) );
64+
}
65+
catch ( ClientException error )
66+
{
67+
return new RoutingProcedureResponse( procedure, error );
68+
}
6169
}
6270

6371
List<Record> runProcedure( Connection connection, Statement procedure )
6472
{
6573
return NetworkSession.run( connection, procedure, ResultResourcesHandler.NO_OP ).list();
6674
}
67-
68-
Statement invokedProcedure()
69-
{
70-
return invokedProcedure;
71-
}
7275
}

driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterCompositionProviderTest.java

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717
* limitations under the License.
1818
*/
1919
package org.neo4j.driver.internal.cluster;
20+
2021
import org.junit.Test;
2122

22-
import java.util.ArrayList;
2323
import java.util.HashMap;
2424
import java.util.HashSet;
2525
import java.util.Map;
@@ -59,8 +59,8 @@ public void shouldProtocolErrorWhenNoRecord() throws Throwable
5959
DEV_NULL_LOGGER, mockedRunner );
6060

6161
PooledConnection mockedConn = mock( PooledConnection.class );
62-
ArrayList<Record> emptyRecord = new ArrayList<>();
63-
when( mockedRunner.run( mockedConn ) ).thenReturn( emptyRecord );
62+
RoutingProcedureResponse noRecordsResponse = newRoutingResponse();
63+
when( mockedRunner.run( mockedConn ) ).thenReturn( noRecordsResponse );
6464

6565
// When
6666
ClusterCompositionResponse response = provider.getClusterComposition( mockedConn );
@@ -89,7 +89,8 @@ public void shouldProtocolErrorWhenMoreThanOneRecord() throws Throwable
8989

9090
PooledConnection mockedConn = mock( PooledConnection.class );
9191
Record aRecord = new InternalRecord( asList( "key1", "key2" ), new Value[]{ new StringValue( "a value" ) } );
92-
when( mockedRunner.run( mockedConn ) ).thenReturn( asList( aRecord, aRecord ) );
92+
RoutingProcedureResponse routingResponse = newRoutingResponse( aRecord, aRecord );
93+
when( mockedRunner.run( mockedConn ) ).thenReturn( routingResponse );
9394

9495
// When
9596
ClusterCompositionResponse response = provider.getClusterComposition( mockedConn );
@@ -118,7 +119,8 @@ public void shouldProtocolErrorWhenUnparsableRecord() throws Throwable
118119

119120
PooledConnection mockedConn = mock( PooledConnection.class );
120121
Record aRecord = new InternalRecord( asList( "key1", "key2" ), new Value[]{ new StringValue( "a value" ) } );
121-
when( mockedRunner.run( mockedConn ) ).thenReturn( asList( aRecord ) );
122+
RoutingProcedureResponse routingResponse = newRoutingResponse( aRecord );
123+
when( mockedRunner.run( mockedConn ) ).thenReturn( routingResponse );
122124

123125
// When
124126
ClusterCompositionResponse response = provider.getClusterComposition( mockedConn );
@@ -152,7 +154,8 @@ public void shouldProtocolErrorWhenNoRouters() throws Throwable
152154
serverInfo( "READ", "one:1337", "two:1337" ),
153155
serverInfo( "WRITE", "one:1337" ) ) )
154156
} );
155-
when( mockedRunner.run( mockedConn ) ).thenReturn( asList( record ) );
157+
RoutingProcedureResponse routingResponse = newRoutingResponse( record );
158+
when( mockedRunner.run( mockedConn ) ).thenReturn( routingResponse );
156159
when( mockedClock.millis() ).thenReturn( 12345L );
157160

158161
// When
@@ -187,7 +190,8 @@ public void shouldProtocolErrorWhenNoReaders() throws Throwable
187190
serverInfo( "WRITE", "one:1337" ),
188191
serverInfo( "ROUTE", "one:1337", "two:1337" ) ) )
189192
} );
190-
when( mockedRunner.run( mockedConn ) ).thenReturn( asList( record ) );
193+
RoutingProcedureResponse routingResponse = newRoutingResponse( record );
194+
when( mockedRunner.run( mockedConn ) ).thenReturn( routingResponse );
191195
when( mockedClock.millis() ).thenReturn( 12345L );
192196

193197
// When
@@ -254,7 +258,8 @@ public void shouldReturnSuccessResultWhenNoError() throws Throwable
254258
serverInfo( "WRITE", "one:1337" ),
255259
serverInfo( "ROUTE", "one:1337", "two:1337" ) ) )
256260
} );
257-
when( mockedRunner.run( mockedConn ) ).thenReturn( asList( record ) );
261+
RoutingProcedureResponse routingResponse = newRoutingResponse( record );
262+
when( mockedRunner.run( mockedConn ) ).thenReturn( routingResponse );
258263
when( mockedClock.millis() ).thenReturn( 12345L );
259264

260265
// When
@@ -289,8 +294,11 @@ private static Set<BoltServerAddress> serverSet( String... addresses )
289294

290295
private static RoutingProcedureRunner newProcedureRunnerMock()
291296
{
292-
RoutingProcedureRunner mock = mock( RoutingProcedureRunner.class );
293-
when( mock.invokedProcedure() ).thenReturn( new Statement( "procedure" ) );
294-
return mock;
297+
return mock( RoutingProcedureRunner.class );
298+
}
299+
300+
private static RoutingProcedureResponse newRoutingResponse( Record... records )
301+
{
302+
return new RoutingProcedureResponse( new Statement( "procedure" ), asList( records ) );
295303
}
296304
}

0 commit comments

Comments
 (0)