Skip to content

Commit 8d1e7a3

Browse files
authored
Merge pull request #770 from bigmontz/routing
Creating protocol 4.3 and uses the Route Message to get the routing table
2 parents 1494935 + bb4cd2b commit 8d1e7a3

29 files changed

+2080
-166
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/connection/BoltProtocolUtil.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.neo4j.driver.internal.messaging.v4.BoltProtocolV4;
2626
import org.neo4j.driver.internal.messaging.v41.BoltProtocolV41;
2727
import org.neo4j.driver.internal.messaging.v42.BoltProtocolV42;
28+
import org.neo4j.driver.internal.messaging.v43.BoltProtocolV43;
2829

2930
import static io.netty.buffer.Unpooled.copyInt;
3031
import static io.netty.buffer.Unpooled.unreleasableBuffer;
@@ -41,9 +42,9 @@ public final class BoltProtocolUtil
4142

4243
private static final ByteBuf HANDSHAKE_BUF = unreleasableBuffer( copyInt(
4344
BOLT_MAGIC_PREAMBLE,
45+
BoltProtocolV43.VERSION.toInt(),
4446
BoltProtocolV42.VERSION.toInt(),
4547
BoltProtocolV41.VERSION.toInt(),
46-
BoltProtocolV4.VERSION.toInt(),
4748
BoltProtocolV3.VERSION.toInt() ) ).asReadOnly();
4849

4950
private static final String HANDSHAKE_STRING = createHandshakeString();

driver/src/main/java/org/neo4j/driver/internal/cluster/MultiDatabasesRoutingProcedureRunner.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,12 @@
3434
import static org.neo4j.driver.Values.value;
3535
import static org.neo4j.driver.internal.DatabaseNameUtil.systemDatabase;
3636

37-
public class MultiDatabasesRoutingProcedureRunner extends RoutingProcedureRunner
37+
38+
/**
39+
* This implementation of the {@link RoutingProcedureRunner} works with multi database versions of Neo4j calling
40+
* the procedure `dbms.routing.getRoutingTable`
41+
*/
42+
public class MultiDatabasesRoutingProcedureRunner extends SingleDatabaseRoutingProcedureRunner
3843
{
3944
static final String DATABASE_NAME = "database";
4045
static final String MULTI_DB_GET_ROUTING_TABLE = String.format( "CALL dbms.routing.getRoutingTable($%s, $%s)", ROUTING_CONTEXT, DATABASE_NAME );
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Copyright (c) 2002-2020 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.cluster;
20+
21+
import java.util.ArrayList;
22+
import java.util.HashMap;
23+
import java.util.Map;
24+
import java.util.concurrent.CompletableFuture;
25+
import java.util.concurrent.CompletionStage;
26+
import java.util.function.Supplier;
27+
import java.util.stream.Collectors;
28+
29+
import org.neo4j.driver.AccessMode;
30+
import org.neo4j.driver.Bookmark;
31+
import org.neo4j.driver.Query;
32+
import org.neo4j.driver.Record;
33+
import org.neo4j.driver.Value;
34+
import org.neo4j.driver.Values;
35+
import org.neo4j.driver.internal.DatabaseName;
36+
import org.neo4j.driver.internal.InternalRecord;
37+
import org.neo4j.driver.internal.async.connection.DirectConnection;
38+
import org.neo4j.driver.internal.handlers.RouteMessageResponseHandler;
39+
import org.neo4j.driver.internal.messaging.request.RouteMessage;
40+
import org.neo4j.driver.internal.spi.Connection;
41+
42+
import static java.util.Collections.singletonList;
43+
44+
/**
45+
* This implementation of the {@link RoutingProcedureRunner} access the routing procedure
46+
* through the bolt's ROUTE message.
47+
*/
48+
public class RouteMessageRoutingProcedureRunner implements RoutingProcedureRunner
49+
{
50+
private final Map<String,Value> routingContext;
51+
private final Supplier<CompletableFuture<Map<String,Value>>> createCompletableFuture;
52+
53+
public RouteMessageRoutingProcedureRunner( RoutingContext routingContext )
54+
{
55+
this( routingContext, CompletableFuture::new );
56+
}
57+
58+
protected RouteMessageRoutingProcedureRunner( RoutingContext routingContext, Supplier<CompletableFuture<Map<String,Value>>> createCompletableFuture )
59+
{
60+
this.routingContext = routingContext
61+
.toMap()
62+
.entrySet()
63+
.stream()
64+
.collect( Collectors.toMap( Map.Entry::getKey, entry -> Values.value( entry.getValue() ) ) );
65+
this.createCompletableFuture = createCompletableFuture;
66+
}
67+
68+
@Override
69+
public CompletionStage<RoutingProcedureResponse> run( Connection connection, DatabaseName databaseName, Bookmark bookmark )
70+
{
71+
CompletableFuture<Map<String,Value>> completableFuture = createCompletableFuture.get();
72+
73+
DirectConnection directConnection = toDirectConnection( connection, databaseName );
74+
directConnection.writeAndFlush( new RouteMessage( routingContext, databaseName.databaseName().orElse( null ) ),
75+
new RouteMessageResponseHandler( completableFuture ) );
76+
return completableFuture
77+
.thenApply( routingTable -> new RoutingProcedureResponse( getQuery( databaseName ), singletonList( toRecord( routingTable ) ) ) )
78+
.exceptionally( throwable -> new RoutingProcedureResponse( getQuery( databaseName ), throwable.getCause() ) )
79+
.thenCompose( routingProcedureResponse -> directConnection.release().thenApply( ignore -> routingProcedureResponse ) );
80+
}
81+
82+
private Record toRecord( Map<String,Value> routingTable )
83+
{
84+
return new InternalRecord( new ArrayList<>( routingTable.keySet() ), routingTable.values().toArray( new Value[0] ) );
85+
}
86+
87+
private DirectConnection toDirectConnection( Connection connection, DatabaseName databaseName )
88+
{
89+
return new DirectConnection( connection, databaseName, AccessMode.READ );
90+
}
91+
92+
private Query getQuery( DatabaseName databaseName )
93+
{
94+
Map<String,Object> params = new HashMap<>();
95+
params.put( "routingContext", routingContext );
96+
params.put( "databaseName", databaseName.databaseName().orElse( null ) );
97+
return new Query( "ROUTE $routingContext $databaseName", params );
98+
}
99+
}

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureClusterCompositionProvider.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,43 +33,53 @@
3333

3434
import static java.lang.String.format;
3535
import static org.neo4j.driver.internal.messaging.request.MultiDatabaseUtil.supportsMultiDatabase;
36+
import static org.neo4j.driver.internal.messaging.request.MultiDatabaseUtil.supportsRouteMessage;
3637

3738
public class RoutingProcedureClusterCompositionProvider implements ClusterCompositionProvider
3839
{
3940
private static final String PROTOCOL_ERROR_MESSAGE = "Failed to parse '%s' result received from server due to ";
4041

4142
private final Clock clock;
42-
private final RoutingProcedureRunner routingProcedureRunner;
43+
private final RoutingProcedureRunner singleDatabaseRoutingProcedureRunner;
4344
private final RoutingProcedureRunner multiDatabaseRoutingProcedureRunner;
45+
private final RoutingProcedureRunner routeMessageRoutingProcedureRunner;
4446

4547
public RoutingProcedureClusterCompositionProvider( Clock clock, RoutingContext routingContext )
4648
{
47-
this( clock, new RoutingProcedureRunner( routingContext ), new MultiDatabasesRoutingProcedureRunner( routingContext ) );
49+
this( clock, new SingleDatabaseRoutingProcedureRunner( routingContext ), new MultiDatabasesRoutingProcedureRunner( routingContext ),
50+
new RouteMessageRoutingProcedureRunner( routingContext ) );
4851
}
4952

50-
RoutingProcedureClusterCompositionProvider( Clock clock, RoutingProcedureRunner routingProcedureRunner,
51-
MultiDatabasesRoutingProcedureRunner multiDatabaseRoutingProcedureRunner )
53+
RoutingProcedureClusterCompositionProvider( Clock clock, SingleDatabaseRoutingProcedureRunner singleDatabaseRoutingProcedureRunner,
54+
MultiDatabasesRoutingProcedureRunner multiDatabaseRoutingProcedureRunner,
55+
RouteMessageRoutingProcedureRunner routeMessageRoutingProcedureRunner )
5256
{
5357
this.clock = clock;
54-
this.routingProcedureRunner = routingProcedureRunner;
58+
this.singleDatabaseRoutingProcedureRunner = singleDatabaseRoutingProcedureRunner;
5559
this.multiDatabaseRoutingProcedureRunner = multiDatabaseRoutingProcedureRunner;
60+
this.routeMessageRoutingProcedureRunner = routeMessageRoutingProcedureRunner;
5661
}
5762

5863
@Override
5964
public CompletionStage<ClusterComposition> getClusterComposition( Connection connection, DatabaseName databaseName, Bookmark bookmark )
6065
{
6166
RoutingProcedureRunner runner;
62-
if ( supportsMultiDatabase( connection ) )
67+
68+
if ( supportsRouteMessage( connection ) )
69+
{
70+
runner = routeMessageRoutingProcedureRunner;
71+
}
72+
else if ( supportsMultiDatabase( connection ) )
6373
{
6474
runner = multiDatabaseRoutingProcedureRunner;
6575
}
6676
else
6777
{
68-
runner = routingProcedureRunner;
78+
runner = singleDatabaseRoutingProcedureRunner;
6979
}
7080

7181
return runner.run( connection, databaseName, bookmark )
72-
.thenApply( this::processRoutingResponse );
82+
.thenApply( this::processRoutingResponse );
7383
}
7484

7585
private ClusterComposition processRoutingResponse( RoutingProcedureResponse response )

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureRunner.java

Lines changed: 13 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -18,112 +18,24 @@
1818
*/
1919
package org.neo4j.driver.internal.cluster;
2020

21-
import java.util.List;
22-
import java.util.concurrent.CompletionException;
2321
import java.util.concurrent.CompletionStage;
2422

25-
import org.neo4j.driver.AccessMode;
2623
import org.neo4j.driver.Bookmark;
27-
import org.neo4j.driver.Query;
28-
import org.neo4j.driver.Record;
29-
import org.neo4j.driver.TransactionConfig;
30-
import org.neo4j.driver.async.ResultCursor;
31-
import org.neo4j.driver.exceptions.ClientException;
32-
import org.neo4j.driver.exceptions.FatalDiscoveryException;
33-
import org.neo4j.driver.internal.BookmarkHolder;
3424
import org.neo4j.driver.internal.DatabaseName;
35-
import org.neo4j.driver.internal.async.connection.DirectConnection;
3625
import org.neo4j.driver.internal.spi.Connection;
37-
import org.neo4j.driver.internal.util.Futures;
38-
import org.neo4j.driver.internal.util.ServerVersion;
3926

40-
import static org.neo4j.driver.Values.parameters;
41-
import static org.neo4j.driver.internal.DatabaseNameUtil.defaultDatabase;
42-
import static org.neo4j.driver.internal.handlers.pulln.FetchSizeUtil.UNLIMITED_FETCH_SIZE;
43-
44-
public class RoutingProcedureRunner
27+
/**
28+
* Interface which defines the standard way to get the routing table
29+
*/
30+
public interface RoutingProcedureRunner
4531
{
46-
static final String ROUTING_CONTEXT = "context";
47-
static final String GET_ROUTING_TABLE = "CALL dbms.cluster.routing.getRoutingTable($" + ROUTING_CONTEXT + ")";
48-
49-
final RoutingContext context;
50-
51-
public RoutingProcedureRunner( RoutingContext context )
52-
{
53-
this.context = context;
54-
}
55-
56-
public CompletionStage<RoutingProcedureResponse> run( Connection connection, DatabaseName databaseName, Bookmark bookmark )
57-
{
58-
DirectConnection delegate = connection( connection );
59-
Query procedure = procedureQuery( connection.serverVersion(), databaseName );
60-
BookmarkHolder bookmarkHolder = bookmarkHolder( bookmark );
61-
return runProcedure( delegate, procedure, bookmarkHolder )
62-
.thenCompose( records -> releaseConnection( delegate, records ) )
63-
.handle( ( records, error ) -> processProcedureResponse( procedure, records, error ) );
64-
}
65-
66-
DirectConnection connection( Connection connection )
67-
{
68-
return new DirectConnection( connection, defaultDatabase(), AccessMode.WRITE );
69-
}
70-
71-
Query procedureQuery(ServerVersion serverVersion, DatabaseName databaseName )
72-
{
73-
if ( databaseName.databaseName().isPresent() )
74-
{
75-
throw new FatalDiscoveryException( String.format(
76-
"Refreshing routing table for multi-databases is not supported in server version lower than 4.0. " +
77-
"Current server version: %s. Database name: '%s'", serverVersion, databaseName.description() ) );
78-
}
79-
return new Query( GET_ROUTING_TABLE, parameters( ROUTING_CONTEXT, context.toMap() ) );
80-
}
81-
82-
BookmarkHolder bookmarkHolder( Bookmark ignored )
83-
{
84-
return BookmarkHolder.NO_OP;
85-
}
86-
87-
CompletionStage<List<Record>> runProcedure(Connection connection, Query procedure, BookmarkHolder bookmarkHolder )
88-
{
89-
return connection.protocol()
90-
.runInAutoCommitTransaction( connection, procedure, bookmarkHolder, TransactionConfig.empty(), true, UNLIMITED_FETCH_SIZE )
91-
.asyncResult().thenCompose( ResultCursor::listAsync );
92-
}
93-
94-
private CompletionStage<List<Record>> releaseConnection( Connection connection, List<Record> records )
95-
{
96-
// It is not strictly required to release connection after routing procedure invocation because it'll
97-
// be released by the PULL_ALL response handler after result is fully fetched. Such release will happen
98-
// in background. However, releasing it early as part of whole chain makes it easier to reason about
99-
// rediscovery in stub server tests. Some of them assume connections to instances not present in new
100-
// routing table will be closed immediately.
101-
return connection.release().thenApply( ignore -> records );
102-
}
103-
104-
private static RoutingProcedureResponse processProcedureResponse(Query procedure, List<Record> records,
105-
Throwable error )
106-
{
107-
Throwable cause = Futures.completionExceptionCause( error );
108-
if ( cause != null )
109-
{
110-
return handleError( procedure, cause );
111-
}
112-
else
113-
{
114-
return new RoutingProcedureResponse( procedure, records );
115-
}
116-
}
117-
118-
private static RoutingProcedureResponse handleError(Query procedure, Throwable error )
119-
{
120-
if ( error instanceof ClientException )
121-
{
122-
return new RoutingProcedureResponse( procedure, error );
123-
}
124-
else
125-
{
126-
throw new CompletionException( error );
127-
}
128-
}
32+
/**
33+
* Run the calls to the server
34+
*
35+
* @param connection The connection which will be used to call the server
36+
* @param databaseName The database name
37+
* @param bookmark The bookmark used to query the routing information
38+
* @return The routing table
39+
*/
40+
CompletionStage<RoutingProcedureResponse> run( Connection connection, DatabaseName databaseName, Bookmark bookmark );
12941
}

0 commit comments

Comments
 (0)