Skip to content

Commit 6a9fdf9

Browse files
committed
Adding routing context into HELLO message so that server can make routing decisions
1 parent 1254103 commit 6a9fdf9

33 files changed

+278
-55
lines changed

driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -93,17 +93,18 @@ public final Driver newInstance ( URI uri, AuthToken authToken, RoutingSettings
9393
RetryLogic retryLogic = createRetryLogic( retrySettings, eventExecutorGroup, config.logging() );
9494

9595
MetricsProvider metricsProvider = createDriverMetrics( config, createClock() );
96-
ConnectionPool connectionPool = createConnectionPool( authToken, securityPlan, bootstrap, metricsProvider, config, ownsEventLoopGroup );
96+
ConnectionPool connectionPool = createConnectionPool( authToken, securityPlan, bootstrap, metricsProvider, config,
97+
ownsEventLoopGroup, newRoutingSettings.routingContext() );
9798

9899
return createDriver( uri, securityPlan, address, connectionPool, eventExecutorGroup, newRoutingSettings, retryLogic, metricsProvider, config );
99100
}
100101

101102
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap,
102-
MetricsProvider metricsProvider, Config config, boolean ownsEventLoopGroup )
103+
MetricsProvider metricsProvider, Config config, boolean ownsEventLoopGroup, RoutingContext routingContext )
103104
{
104105
Clock clock = createClock();
105106
ConnectionSettings settings = new ConnectionSettings( authToken, config.connectionTimeoutMillis() );
106-
ChannelConnector connector = createConnector( settings, securityPlan, config, clock );
107+
ChannelConnector connector = createConnector( settings, securityPlan, config, clock, routingContext );
107108
PoolSettings poolSettings = new PoolSettings( config.maxConnectionPoolSize(),
108109
config.connectionAcquisitionTimeoutMillis(), config.maxConnectionLifetimeMillis(),
109110
config.idleTimeBeforeConnectionTest()
@@ -124,9 +125,9 @@ protected static MetricsProvider createDriverMetrics( Config config, Clock clock
124125
}
125126

126127
protected ChannelConnector createConnector( ConnectionSettings settings, SecurityPlan securityPlan,
127-
Config config, Clock clock )
128+
Config config, Clock clock, RoutingContext routingContext )
128129
{
129-
return new ChannelConnectorImpl( settings, securityPlan, config.logging(), clock );
130+
return new ChannelConnectorImpl( settings, securityPlan, config.logging(), clock, routingContext );
130131
}
131132

132133
private InternalDriver createDriver( URI uri, SecurityPlan securityPlan, BoltServerAddress address, ConnectionPool connectionPool,

driver/src/main/java/org/neo4j/driver/internal/async/connection/ChannelConnectorImpl.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.neo4j.driver.internal.BoltServerAddress;
3131
import org.neo4j.driver.internal.ConnectionSettings;
3232
import org.neo4j.driver.internal.async.inbound.ConnectTimeoutHandler;
33+
import org.neo4j.driver.internal.cluster.RoutingContext;
3334
import org.neo4j.driver.internal.security.InternalAuthToken;
3435
import org.neo4j.driver.internal.security.SecurityPlan;
3536
import org.neo4j.driver.internal.util.Clock;
@@ -45,23 +46,25 @@ public class ChannelConnectorImpl implements ChannelConnector
4546
{
4647
private final String userAgent;
4748
private final Map<String,Value> authToken;
49+
private final RoutingContext routingContext;
4850
private final SecurityPlan securityPlan;
4951
private final ChannelPipelineBuilder pipelineBuilder;
5052
private final int connectTimeoutMillis;
5153
private final Logging logging;
5254
private final Clock clock;
5355

5456
public ChannelConnectorImpl( ConnectionSettings connectionSettings, SecurityPlan securityPlan, Logging logging,
55-
Clock clock )
57+
Clock clock, RoutingContext routingContext )
5658
{
57-
this( connectionSettings, securityPlan, new ChannelPipelineBuilderImpl(), logging, clock );
59+
this( connectionSettings, securityPlan, new ChannelPipelineBuilderImpl(), logging, clock, routingContext );
5860
}
5961

6062
public ChannelConnectorImpl( ConnectionSettings connectionSettings, SecurityPlan securityPlan,
61-
ChannelPipelineBuilder pipelineBuilder, Logging logging, Clock clock )
63+
ChannelPipelineBuilder pipelineBuilder, Logging logging, Clock clock, RoutingContext routingContext )
6264
{
6365
this.userAgent = connectionSettings.userAgent();
6466
this.authToken = tokenAsMap( connectionSettings.authToken() );
67+
this.routingContext = routingContext;
6568
this.connectTimeoutMillis = connectionSettings.connectTimeoutMillis();
6669
this.securityPlan = requireNonNull( securityPlan );
6770
this.pipelineBuilder = pipelineBuilder;
@@ -113,7 +116,7 @@ private void installHandshakeCompletedListeners( ChannelPromise handshakeComplet
113116

114117
// add listener that sends an INIT message. connection is now fully established. channel pipeline if fully
115118
// set to send/receive messages for a selected protocol version
116-
handshakeCompleted.addListener( new HandshakeCompletedListener( userAgent, authToken, connectionInitialized ) );
119+
handshakeCompleted.addListener( new HandshakeCompletedListener( userAgent, authToken, routingContext, connectionInitialized ) );
117120
}
118121

119122
private static Map<String,Value> tokenAsMap( AuthToken token )

driver/src/main/java/org/neo4j/driver/internal/async/connection/HandshakeCompletedListener.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
import java.util.Map;
2626

27+
import org.neo4j.driver.internal.cluster.RoutingContext;
2728
import org.neo4j.driver.internal.messaging.BoltProtocol;
2829
import org.neo4j.driver.Value;
2930

@@ -33,13 +34,15 @@ public class HandshakeCompletedListener implements ChannelFutureListener
3334
{
3435
private final String userAgent;
3536
private final Map<String,Value> authToken;
37+
private final RoutingContext routingContext;
3638
private final ChannelPromise connectionInitializedPromise;
3739

3840
public HandshakeCompletedListener( String userAgent, Map<String,Value> authToken,
39-
ChannelPromise connectionInitializedPromise )
41+
RoutingContext routingContext, ChannelPromise connectionInitializedPromise )
4042
{
4143
this.userAgent = requireNonNull( userAgent );
4244
this.authToken = requireNonNull( authToken );
45+
this.routingContext = routingContext;
4346
this.connectionInitializedPromise = requireNonNull( connectionInitializedPromise );
4447
}
4548

@@ -49,7 +52,7 @@ public void operationComplete( ChannelFuture future )
4952
if ( future.isSuccess() )
5053
{
5154
BoltProtocol protocol = BoltProtocol.forChannel( future.channel() );
52-
protocol.initializeChannel( userAgent, authToken, connectionInitializedPromise );
55+
protocol.initializeChannel( userAgent, authToken, routingContext, connectionInitializedPromise );
5356
}
5457
else
5558
{

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingContext.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,17 @@ public class RoutingContext
3030
public static final RoutingContext EMPTY = new RoutingContext();
3131

3232
private final Map<String,String> context;
33+
private final boolean isServerRoutingEnabled;
3334

3435
private RoutingContext()
3536
{
37+
this.isServerRoutingEnabled = true;
3638
this.context = emptyMap();
3739
}
3840

3941
public RoutingContext( URI uri )
4042
{
43+
this.isServerRoutingEnabled = uri.getScheme().startsWith( "neo4j" );
4144
this.context = unmodifiableMap( parseParameters( uri ) );
4245
}
4346

@@ -51,10 +54,15 @@ public Map<String,String> asMap()
5154
return context;
5255
}
5356

57+
public boolean isServerRoutingEnabled()
58+
{
59+
return isServerRoutingEnabled;
60+
}
61+
5462
@Override
5563
public String toString()
5664
{
57-
return "RoutingContext" + context;
65+
return "RoutingContext" + context + " isServerRoutingEnabled=" + isServerRoutingEnabled;
5866
}
5967

6068
private static Map<String,String> parseParameters( URI uri )

driver/src/main/java/org/neo4j/driver/internal/messaging/BoltProtocol.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.neo4j.driver.internal.BookmarkHolder;
3535
import org.neo4j.driver.internal.InternalBookmark;
3636
import org.neo4j.driver.internal.async.UnmanagedTransaction;
37+
import org.neo4j.driver.internal.cluster.RoutingContext;
3738
import org.neo4j.driver.internal.cursor.ResultCursorFactory;
3839
import org.neo4j.driver.internal.messaging.v3.BoltProtocolV3;
3940
import org.neo4j.driver.internal.messaging.v4.BoltProtocolV4;
@@ -56,9 +57,10 @@ public interface BoltProtocol
5657
*
5758
* @param userAgent the user agent string.
5859
* @param authToken the authentication token.
60+
* @param routingContext the configured routing context
5961
* @param channelInitializedPromise the promise to be notified when initialization is completed.
6062
*/
61-
void initializeChannel( String userAgent, Map<String,Value> authToken, ChannelPromise channelInitializedPromise );
63+
void initializeChannel( String userAgent, Map<String,Value> authToken, RoutingContext routingContext, ChannelPromise channelInitializedPromise );
6264

6365
/**
6466
* Prepare to close channel before it is closed.

driver/src/main/java/org/neo4j/driver/internal/messaging/request/HelloMessage.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,11 @@ public class HelloMessage extends MessageWithMetadata
3232
public final static byte SIGNATURE = 0x01;
3333

3434
private static final String USER_AGENT_METADATA_KEY = "user_agent";
35+
private static final String ROUTING_CONTEXT_METADATA_KEY = "routing";
3536

36-
public HelloMessage( String userAgent, Map<String,Value> authToken )
37+
public HelloMessage( String userAgent, Map<String,Value> authToken, Map<String,String> routingContext )
3738
{
38-
super( buildMetadata( userAgent, authToken ) );
39+
super( buildMetadata( userAgent, authToken, routingContext ) );
3940
}
4041

4142
@Override
@@ -73,10 +74,11 @@ public String toString()
7374
return "HELLO " + metadataCopy;
7475
}
7576

76-
private static Map<String,Value> buildMetadata( String userAgent, Map<String,Value> authToken )
77+
private static Map<String,Value> buildMetadata( String userAgent, Map<String,Value> authToken, Map<String,String> routingContext )
7778
{
7879
Map<String,Value> result = new HashMap<>( authToken );
7980
result.put( USER_AGENT_METADATA_KEY, value( userAgent ) );
81+
result.put( ROUTING_CONTEXT_METADATA_KEY, value( routingContext ) );
8082
return result;
8183
}
8284
}

driver/src/main/java/org/neo4j/driver/internal/messaging/v3/BoltProtocolV3.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.netty.channel.Channel;
2222
import io.netty.channel.ChannelPromise;
2323

24+
import java.util.Collections;
2425
import java.util.Map;
2526
import java.util.concurrent.CompletableFuture;
2627
import java.util.concurrent.CompletionStage;
@@ -32,6 +33,7 @@
3233
import org.neo4j.driver.internal.BookmarkHolder;
3334
import org.neo4j.driver.internal.DatabaseName;
3435
import org.neo4j.driver.internal.async.UnmanagedTransaction;
36+
import org.neo4j.driver.internal.cluster.RoutingContext;
3537
import org.neo4j.driver.internal.cursor.AsyncResultCursorOnlyFactory;
3638
import org.neo4j.driver.internal.cursor.ResultCursorFactory;
3739
import org.neo4j.driver.internal.handlers.BeginTxResponseHandler;
@@ -75,11 +77,20 @@ public MessageFormat createMessageFormat()
7577
}
7678

7779
@Override
78-
public void initializeChannel( String userAgent, Map<String,Value> authToken, ChannelPromise channelInitializedPromise )
80+
public void initializeChannel( String userAgent, Map<String,Value> authToken, RoutingContext routingContext, ChannelPromise channelInitializedPromise )
7981
{
8082
Channel channel = channelInitializedPromise.channel();
83+
HelloMessage message;
84+
85+
if ( routingContext.isServerRoutingEnabled() )
86+
{
87+
message = new HelloMessage( userAgent, authToken, routingContext.asMap() );
88+
}
89+
else
90+
{
91+
message = new HelloMessage( userAgent, authToken, null );
92+
}
8193

82-
HelloMessage message = new HelloMessage( userAgent, authToken );
8394
HelloResponseHandler handler = new HelloResponseHandler( channelInitializedPromise );
8495

8596
messageDispatcher( channel ).enqueue( handler );

driver/src/test/java/org/neo4j/driver/integration/ChannelConnectorImplIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.neo4j.driver.internal.async.connection.ChannelConnector;
4747
import org.neo4j.driver.internal.async.connection.ChannelConnectorImpl;
4848
import org.neo4j.driver.internal.async.inbound.ConnectTimeoutHandler;
49+
import org.neo4j.driver.internal.cluster.RoutingContext;
4950
import org.neo4j.driver.internal.security.SecurityPlanImpl;
5051
import org.neo4j.driver.internal.security.SecurityPlan;
5152
import org.neo4j.driver.internal.util.FakeClock;
@@ -231,7 +232,7 @@ private ChannelConnectorImpl newConnector( AuthToken authToken, SecurityPlan sec
231232
int connectTimeoutMillis )
232233
{
233234
ConnectionSettings settings = new ConnectionSettings( authToken, connectTimeoutMillis );
234-
return new ChannelConnectorImpl( settings, securityPlan, DEV_NULL_LOGGING, new FakeClock() );
235+
return new ChannelConnectorImpl( settings, securityPlan, DEV_NULL_LOGGING, new FakeClock(), RoutingContext.EMPTY );
235236
}
236237

237238
private static SecurityPlan trustAllCertificates() throws GeneralSecurityException

driver/src/test/java/org/neo4j/driver/integration/ConnectionHandlingIT.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.neo4j.driver.internal.async.connection.ChannelConnector;
5050
import org.neo4j.driver.internal.async.pool.ConnectionPoolImpl;
5151
import org.neo4j.driver.internal.async.pool.PoolSettings;
52+
import org.neo4j.driver.internal.cluster.RoutingContext;
5253
import org.neo4j.driver.internal.cluster.RoutingSettings;
5354
import org.neo4j.driver.internal.metrics.MetricsProvider;
5455
import org.neo4j.driver.internal.retry.RetrySettings;
@@ -445,14 +446,15 @@ private static class DriverFactoryWithConnectionPool extends DriverFactory
445446

446447
@Override
447448
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap,
448-
MetricsProvider ignored, Config config, boolean ownsEventLoopGroup )
449+
MetricsProvider ignored, Config config, boolean ownsEventLoopGroup,
450+
RoutingContext routingContext )
449451
{
450452
ConnectionSettings connectionSettings = new ConnectionSettings( authToken, 1000 );
451453
PoolSettings poolSettings = new PoolSettings( config.maxConnectionPoolSize(),
452454
config.connectionAcquisitionTimeoutMillis(), config.maxConnectionLifetimeMillis(),
453455
config.idleTimeBeforeConnectionTest() );
454456
Clock clock = createClock();
455-
ChannelConnector connector = super.createConnector( connectionSettings, securityPlan, config, clock );
457+
ChannelConnector connector = super.createConnector( connectionSettings, securityPlan, config, clock, routingContext );
456458
connectionPool = new MemorizingConnectionPool( connector, bootstrap, poolSettings, config.logging(), clock, ownsEventLoopGroup );
457459
return connectionPool;
458460
}

driver/src/test/java/org/neo4j/driver/integration/RoutingDriverBoltKitTest.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -892,6 +892,46 @@ void shouldSendRoutingContextToServer() throws Exception
892892
}
893893
}
894894

895+
@Test
896+
void shouldSendRoutingContextInHelloMessage() throws Exception
897+
{
898+
// stub server is both a router and reader
899+
StubServer server = StubServer.start( "routing_context_in_hello_neo4j.script", 9001 );
900+
901+
URI uri = URI.create( "neo4j://127.0.0.1:9001/?policy=my_policy&region=china" );
902+
try ( Driver driver = GraphDatabase.driver( uri, INSECURE_CONFIG ); Session session = driver.session() )
903+
{
904+
List<Record> records = session.run( "MATCH (n) RETURN n.name AS name" ).list();
905+
assertEquals( 2, records.size() );
906+
assertEquals( "Alice", records.get( 0 ).get( "name" ).asString() );
907+
assertEquals( "Bob", records.get( 1 ).get( "name" ).asString() );
908+
}
909+
finally
910+
{
911+
assertEquals( 0, server.exitStatus() );
912+
}
913+
}
914+
915+
@Test
916+
void shouldSendEmptyRoutingContextInHelloMessage() throws Exception
917+
{
918+
// stub server is both a router and reader
919+
StubServer server = StubServer.start( "empty_routing_context_in_hello_neo4j.script", 9001 );
920+
921+
URI uri = URI.create( "neo4j://127.0.0.1:9001/" );
922+
try ( Driver driver = GraphDatabase.driver( uri, INSECURE_CONFIG ); Session session = driver.session() )
923+
{
924+
List<Record> records = session.run( "MATCH (n) RETURN n.name AS name" ).list();
925+
assertEquals( 2, records.size() );
926+
assertEquals( "Alice", records.get( 0 ).get( "name" ).asString() );
927+
assertEquals( "Bob", records.get( 1 ).get( "name" ).asString() );
928+
}
929+
finally
930+
{
931+
assertEquals( 0, server.exitStatus() );
932+
}
933+
}
934+
895935
@Test
896936
void shouldServeReadsButFailWritesWhenNoWritersAvailable() throws Exception
897937
{

driver/src/test/java/org/neo4j/driver/internal/CustomSecurityPlanTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.neo4j.driver.AuthToken;
2929
import org.neo4j.driver.AuthTokens;
3030
import org.neo4j.driver.Config;
31+
import org.neo4j.driver.internal.cluster.RoutingContext;
3132
import org.neo4j.driver.internal.cluster.RoutingSettings;
3233
import org.neo4j.driver.internal.metrics.MetricsProvider;
3334
import org.neo4j.driver.internal.retry.RetrySettings;
@@ -74,10 +75,11 @@ protected InternalDriver createDriver( SecurityPlan securityPlan, SessionFactory
7475

7576
@Override
7677
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap,
77-
MetricsProvider metricsProvider, Config config, boolean ownsEventLoopGroup )
78+
MetricsProvider metricsProvider, Config config, boolean ownsEventLoopGroup,
79+
RoutingContext routingContext )
7880
{
7981
capturedSecurityPlans.add( securityPlan );
80-
return super.createConnectionPool( authToken, securityPlan, bootstrap, metricsProvider, config, ownsEventLoopGroup );
82+
return super.createConnectionPool( authToken, securityPlan, bootstrap, metricsProvider, config, ownsEventLoopGroup, routingContext );
8183
}
8284
}
8385
}

driver/src/test/java/org/neo4j/driver/internal/DirectDriverBoltKitTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,24 @@ void shouldSendMultipleBookmarks() throws Exception
125125
}
126126
}
127127

128+
@Test
129+
void shouldSendNullRoutingContextForBoltUri() throws Exception
130+
{
131+
StubServer server = StubServer.start( "hello_with_routing_context_bolt.script", 9001 );
132+
133+
try ( Driver driver = GraphDatabase.driver( "bolt://localhost:9001", INSECURE_CONFIG );
134+
Session session = driver.session() )
135+
{
136+
List<String> names = session.run( "MATCH (n) RETURN n.name" ).list( record -> record.get( 0 ).asString() );
137+
assertEquals( asList( "Foo", "Bar" ), names );
138+
139+
}
140+
finally
141+
{
142+
assertEquals( 0, server.exitStatus() );
143+
}
144+
}
145+
128146
@Test
129147
void shouldLogConnectionIdInDebugMode() throws Exception
130148
{

0 commit comments

Comments
 (0)