Skip to content

Commit b245275

Browse files
committed
Async handling of routing procedure responses
This commit makes `ClusterCompositionProvider` able to interpret async responses from `RoutingProcedureRunner`.
1 parent d0f9341 commit b245275

File tree

4 files changed

+54
-31
lines changed

4 files changed

+54
-31
lines changed

driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterCompositionProvider.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,15 @@
1818
*/
1919
package org.neo4j.driver.internal.cluster;
2020

21+
import java.util.concurrent.CompletionStage;
22+
23+
import org.neo4j.driver.internal.async.AsyncConnection;
2124
import org.neo4j.driver.internal.spi.Connection;
2225

2326
public interface ClusterCompositionProvider
2427
{
2528
ClusterCompositionResponse getClusterComposition( Connection connection );
29+
30+
CompletionStage<ClusterCompositionResponse> getClusterComposition(
31+
CompletionStage<AsyncConnection> connectionStage );
2632
}

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureClusterCompositionProvider.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
package org.neo4j.driver.internal.cluster;
2020

2121
import java.util.List;
22+
import java.util.concurrent.CompletionStage;
2223

24+
import org.neo4j.driver.internal.async.AsyncConnection;
2325
import org.neo4j.driver.internal.spi.Connection;
2426
import org.neo4j.driver.internal.util.Clock;
2527
import org.neo4j.driver.v1.Logger;
@@ -55,7 +57,19 @@ public RoutingProcedureClusterCompositionProvider( Clock clock, Logger log, Rout
5557
public ClusterCompositionResponse getClusterComposition( Connection connection )
5658
{
5759
RoutingProcedureResponse response = routingProcedureRunner.run( connection );
60+
return processRoutingResponse( response );
61+
}
62+
63+
@Override
64+
public CompletionStage<ClusterCompositionResponse> getClusterComposition(
65+
CompletionStage<AsyncConnection> connectionStage )
66+
{
67+
return routingProcedureRunner.run( connectionStage )
68+
.thenApply( this::processRoutingResponse );
69+
}
5870

71+
private ClusterCompositionResponse processRoutingResponse( RoutingProcedureResponse response )
72+
{
5973
if ( !response.isSuccess() )
6074
{
6175
return new ClusterCompositionResponse.Failure( new ServiceUnavailableException( format(

driver/src/test/java/org/neo4j/driver/internal/RoutingDriverTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@
6565
import static org.mockito.Mockito.mock;
6666
import static org.mockito.Mockito.verify;
6767
import static org.mockito.Mockito.when;
68-
import static org.neo4j.driver.internal.cluster.ClusterCompositionProviderTest.serverInfo;
68+
import static org.neo4j.driver.internal.cluster.RoutingProcedureClusterCompositionProviderTest.serverInfo;
6969
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
7070
import static org.neo4j.driver.internal.security.SecurityPlan.insecure;
7171
import static org.neo4j.driver.v1.Values.value;

driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterCompositionProviderTest.java renamed to driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingProcedureClusterCompositionProviderTest.java

Lines changed: 33 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,11 @@
2424
import java.util.HashSet;
2525
import java.util.Map;
2626
import java.util.Set;
27+
import java.util.concurrent.CompletionStage;
2728

2829
import org.neo4j.driver.internal.InternalRecord;
30+
import org.neo4j.driver.internal.async.AsyncConnection;
2931
import org.neo4j.driver.internal.net.BoltServerAddress;
30-
import org.neo4j.driver.internal.spi.PooledConnection;
3132
import org.neo4j.driver.internal.util.Clock;
3233
import org.neo4j.driver.internal.value.StringValue;
3334
import org.neo4j.driver.v1.Record;
@@ -37,6 +38,7 @@
3738
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
3839

3940
import static java.util.Arrays.asList;
41+
import static java.util.concurrent.CompletableFuture.completedFuture;
4042
import static org.hamcrest.MatcherAssert.assertThat;
4143
import static org.hamcrest.Matchers.containsString;
4244
import static org.hamcrest.Matchers.instanceOf;
@@ -45,25 +47,26 @@
4547
import static org.mockito.Mockito.doThrow;
4648
import static org.mockito.Mockito.mock;
4749
import static org.mockito.Mockito.when;
50+
import static org.neo4j.driver.internal.async.Futures.getBlocking;
4851
import static org.neo4j.driver.internal.logging.DevNullLogger.DEV_NULL_LOGGER;
4952
import static org.neo4j.driver.v1.Values.value;
5053

51-
public class ClusterCompositionProviderTest
54+
public class RoutingProcedureClusterCompositionProviderTest
5255
{
5356
@Test
54-
public void shouldProtocolErrorWhenNoRecord() throws Throwable
57+
public void shouldProtocolErrorWhenNoRecord()
5558
{
5659
// Given
5760
RoutingProcedureRunner mockedRunner = newProcedureRunnerMock();
5861
ClusterCompositionProvider provider = new RoutingProcedureClusterCompositionProvider( mock( Clock.class ),
5962
DEV_NULL_LOGGER, mockedRunner );
6063

61-
PooledConnection mockedConn = mock( PooledConnection.class );
64+
CompletionStage<AsyncConnection> connectionStage = completedFuture( mock( AsyncConnection.class ) );
6265
RoutingProcedureResponse noRecordsResponse = newRoutingResponse();
63-
when( mockedRunner.run( mockedConn ) ).thenReturn( noRecordsResponse );
66+
when( mockedRunner.run( connectionStage ) ).thenReturn( completedFuture( noRecordsResponse ) );
6467

6568
// When
66-
ClusterCompositionResponse response = provider.getClusterComposition( mockedConn );
69+
ClusterCompositionResponse response = getBlocking( provider.getClusterComposition( connectionStage ) );
6770

6871
// Then
6972
assertThat( response, instanceOf( ClusterCompositionResponse.Failure.class ) );
@@ -80,20 +83,20 @@ public void shouldProtocolErrorWhenNoRecord() throws Throwable
8083
}
8184

8285
@Test
83-
public void shouldProtocolErrorWhenMoreThanOneRecord() throws Throwable
86+
public void shouldProtocolErrorWhenMoreThanOneRecord()
8487
{
8588
// Given
8689
RoutingProcedureRunner mockedRunner = newProcedureRunnerMock();
8790
ClusterCompositionProvider provider = new RoutingProcedureClusterCompositionProvider( mock( Clock.class ),
8891
DEV_NULL_LOGGER, mockedRunner );
8992

90-
PooledConnection mockedConn = mock( PooledConnection.class );
93+
CompletionStage<AsyncConnection> connectionStage = completedFuture( mock( AsyncConnection.class ) );
9194
Record aRecord = new InternalRecord( asList( "key1", "key2" ), new Value[]{ new StringValue( "a value" ) } );
9295
RoutingProcedureResponse routingResponse = newRoutingResponse( aRecord, aRecord );
93-
when( mockedRunner.run( mockedConn ) ).thenReturn( routingResponse );
96+
when( mockedRunner.run( connectionStage ) ).thenReturn( completedFuture( routingResponse ) );
9497

9598
// When
96-
ClusterCompositionResponse response = provider.getClusterComposition( mockedConn );
99+
ClusterCompositionResponse response = getBlocking( provider.getClusterComposition( connectionStage ) );
97100

98101
// Then
99102
assertThat( response, instanceOf( ClusterCompositionResponse.Failure.class ) );
@@ -110,20 +113,20 @@ public void shouldProtocolErrorWhenMoreThanOneRecord() throws Throwable
110113
}
111114

112115
@Test
113-
public void shouldProtocolErrorWhenUnparsableRecord() throws Throwable
116+
public void shouldProtocolErrorWhenUnparsableRecord()
114117
{
115118
// Given
116119
RoutingProcedureRunner mockedRunner = newProcedureRunnerMock();
117120
ClusterCompositionProvider provider = new RoutingProcedureClusterCompositionProvider( mock( Clock.class ),
118121
DEV_NULL_LOGGER, mockedRunner );
119122

120-
PooledConnection mockedConn = mock( PooledConnection.class );
123+
CompletionStage<AsyncConnection> connectionStage = completedFuture( mock( AsyncConnection.class ) );
121124
Record aRecord = new InternalRecord( asList( "key1", "key2" ), new Value[]{ new StringValue( "a value" ) } );
122125
RoutingProcedureResponse routingResponse = newRoutingResponse( aRecord );
123-
when( mockedRunner.run( mockedConn ) ).thenReturn( routingResponse );
126+
when( mockedRunner.run( connectionStage ) ).thenReturn( completedFuture( routingResponse ) );
124127

125128
// When
126-
ClusterCompositionResponse response = provider.getClusterComposition( mockedConn );
129+
ClusterCompositionResponse response = getBlocking( provider.getClusterComposition( connectionStage ) );
127130

128131
// Then
129132
assertThat( response, instanceOf( ClusterCompositionResponse.Failure.class ) );
@@ -140,26 +143,26 @@ public void shouldProtocolErrorWhenUnparsableRecord() throws Throwable
140143
}
141144

142145
@Test
143-
public void shouldProtocolErrorWhenNoRouters() throws Throwable
146+
public void shouldProtocolErrorWhenNoRouters()
144147
{
145148
// Given
146149
RoutingProcedureRunner mockedRunner = newProcedureRunnerMock();
147150
Clock mockedClock = mock( Clock.class );
148151
ClusterCompositionProvider provider = new RoutingProcedureClusterCompositionProvider( mockedClock,
149152
DEV_NULL_LOGGER, mockedRunner );
150153

151-
PooledConnection mockedConn = mock( PooledConnection.class );
154+
CompletionStage<AsyncConnection> connectionStage = completedFuture( mock( AsyncConnection.class ) );
152155
Record record = new InternalRecord( asList( "ttl", "servers" ), new Value[]{
153156
value( 100 ), value( asList(
154157
serverInfo( "READ", "one:1337", "two:1337" ),
155158
serverInfo( "WRITE", "one:1337" ) ) )
156159
} );
157160
RoutingProcedureResponse routingResponse = newRoutingResponse( record );
158-
when( mockedRunner.run( mockedConn ) ).thenReturn( routingResponse );
161+
when( mockedRunner.run( connectionStage ) ).thenReturn( completedFuture( routingResponse ) );
159162
when( mockedClock.millis() ).thenReturn( 12345L );
160163

161164
// When
162-
ClusterCompositionResponse response = provider.getClusterComposition( mockedConn );
165+
ClusterCompositionResponse response = getBlocking( provider.getClusterComposition( connectionStage ) );
163166

164167
// Then
165168
assertThat( response, instanceOf( ClusterCompositionResponse.Failure.class ) );
@@ -176,26 +179,26 @@ public void shouldProtocolErrorWhenNoRouters() throws Throwable
176179
}
177180

178181
@Test
179-
public void shouldProtocolErrorWhenNoReaders() throws Throwable
182+
public void shouldProtocolErrorWhenNoReaders()
180183
{
181184
// Given
182185
RoutingProcedureRunner mockedRunner = newProcedureRunnerMock();
183186
Clock mockedClock = mock( Clock.class );
184187
ClusterCompositionProvider provider = new RoutingProcedureClusterCompositionProvider( mockedClock,
185188
DEV_NULL_LOGGER, mockedRunner );
186189

187-
PooledConnection mockedConn = mock( PooledConnection.class );
190+
CompletionStage<AsyncConnection> connectionStage = completedFuture( mock( AsyncConnection.class ) );
188191
Record record = new InternalRecord( asList( "ttl", "servers" ), new Value[]{
189192
value( 100 ), value( asList(
190193
serverInfo( "WRITE", "one:1337" ),
191194
serverInfo( "ROUTE", "one:1337", "two:1337" ) ) )
192195
} );
193196
RoutingProcedureResponse routingResponse = newRoutingResponse( record );
194-
when( mockedRunner.run( mockedConn ) ).thenReturn( routingResponse );
197+
when( mockedRunner.run( connectionStage ) ).thenReturn( completedFuture( routingResponse ) );
195198
when( mockedClock.millis() ).thenReturn( 12345L );
196199

197200
// When
198-
ClusterCompositionResponse response = provider.getClusterComposition( mockedConn );
201+
ClusterCompositionResponse response = getBlocking( provider.getClusterComposition( connectionStage ) );
199202

200203
// Then
201204
assertThat( response, instanceOf( ClusterCompositionResponse.Failure.class ) );
@@ -213,26 +216,26 @@ public void shouldProtocolErrorWhenNoReaders() throws Throwable
213216

214217

215218
@Test
216-
public void shouldPropagateConnectionFailureExceptions() throws Exception
219+
public void shouldPropagateConnectionFailureExceptions()
217220
{
218221
// Given
219222
RoutingProcedureRunner mockedRunner = newProcedureRunnerMock();
220223
ClusterCompositionProvider provider = new RoutingProcedureClusterCompositionProvider( mock( Clock.class ),
221224
DEV_NULL_LOGGER, mockedRunner );
222225

223-
PooledConnection mockedConn = mock( PooledConnection.class );
226+
CompletionStage<AsyncConnection> connectionStage = completedFuture( mock( AsyncConnection.class ) );
224227
Record record = new InternalRecord( asList( "ttl", "servers" ), new Value[]{
225228
value( 100 ), value( asList(
226229
serverInfo( "WRITE", "one:1337" ),
227230
serverInfo( "ROUTE", "one:1337", "two:1337" ) ) )
228231
} );
229232
doThrow( new ServiceUnavailableException( "Connection breaks during cypher execution" ) )
230-
.when( mockedRunner ).run( mockedConn );
233+
.when( mockedRunner ).run( connectionStage );
231234

232235
// When & Then
233236
try
234237
{
235-
provider.getClusterComposition( mockedConn );
238+
provider.getClusterComposition( connectionStage );
236239
fail( "Expecting a failure but not triggered." );
237240
}
238241
catch( Exception e )
@@ -243,27 +246,27 @@ public void shouldPropagateConnectionFailureExceptions() throws Exception
243246
}
244247

245248
@Test
246-
public void shouldReturnSuccessResultWhenNoError() throws Throwable
249+
public void shouldReturnSuccessResultWhenNoError()
247250
{
248251
// Given
249252
Clock mockedClock = mock( Clock.class );
250253
RoutingProcedureRunner mockedRunner = newProcedureRunnerMock();
251254
ClusterCompositionProvider provider = new RoutingProcedureClusterCompositionProvider( mockedClock,
252255
DEV_NULL_LOGGER, mockedRunner );
253256

254-
PooledConnection mockedConn = mock( PooledConnection.class );
257+
CompletionStage<AsyncConnection> connectionStage = completedFuture( mock( AsyncConnection.class ) );
255258
Record record = new InternalRecord( asList( "ttl", "servers" ), new Value[]{
256259
value( 100 ), value( asList(
257260
serverInfo( "READ", "one:1337", "two:1337" ),
258261
serverInfo( "WRITE", "one:1337" ),
259262
serverInfo( "ROUTE", "one:1337", "two:1337" ) ) )
260263
} );
261264
RoutingProcedureResponse routingResponse = newRoutingResponse( record );
262-
when( mockedRunner.run( mockedConn ) ).thenReturn( routingResponse );
265+
when( mockedRunner.run( connectionStage ) ).thenReturn( completedFuture( routingResponse ) );
263266
when( mockedClock.millis() ).thenReturn( 12345L );
264267

265268
// When
266-
ClusterCompositionResponse response = provider.getClusterComposition( mockedConn );
269+
ClusterCompositionResponse response = getBlocking( provider.getClusterComposition( connectionStage ) );
267270

268271
// Then
269272
assertThat( response, instanceOf( ClusterCompositionResponse.Success.class ) );

0 commit comments

Comments
 (0)