Skip to content

Commit 8471f76

Browse files
committed
Introduce home database resolution cache
This update adds support for Bolt protocol 5.8 and introduces home database resolution cache that works with Bolt protocol 5.8+ when routing scheme is used. The cache is maintained by the driver internally and is not exposed in the API, its purpose is to reduce the number of Bolt exchanges when home database resolution is needed.
1 parent 743091e commit 8471f76

File tree

82 files changed

+2112
-294
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

82 files changed

+2112
-294
lines changed

bolt-api-netty/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/NettyBoltConnectionProvider.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,8 @@ public CompletionStage<BoltConnection> connect(
116116
String impersonatedUser,
117117
BoltProtocolVersion minVersion,
118118
NotificationConfig notificationConfig,
119-
Consumer<DatabaseName> databaseNameConsumer) {
119+
Consumer<DatabaseName> databaseNameConsumer,
120+
Map<String, Object> additionalParameters) {
120121
synchronized (this) {
121122
if (closeFuture != null) {
122123
return CompletableFuture.failedFuture(new IllegalStateException("Connection provider is closed."));
@@ -189,7 +190,8 @@ public CompletionStage<Void> verifyConnectivity(SecurityPlan securityPlan, Map<S
189190
null,
190191
null,
191192
null,
192-
(ignored) -> {})
193+
(ignored) -> {},
194+
Collections.emptyMap())
193195
.thenCompose(BoltConnection::close);
194196
}
195197

@@ -204,7 +206,8 @@ public CompletionStage<Boolean> supportsMultiDb(SecurityPlan securityPlan, Map<S
204206
null,
205207
null,
206208
null,
207-
(ignored) -> {})
209+
(ignored) -> {},
210+
Collections.emptyMap())
208211
.thenCompose(boltConnection -> {
209212
var supports = boltConnection.protocolVersion().compareTo(BoltProtocolV4.VERSION) >= 0;
210213
return boltConnection.close().thenApply(ignored -> supports);
@@ -222,7 +225,8 @@ public CompletionStage<Boolean> supportsSessionAuth(SecurityPlan securityPlan, M
222225
null,
223226
null,
224227
null,
225-
(ignored) -> {})
228+
(ignored) -> {},
229+
Collections.emptyMap())
226230
.thenCompose(boltConnection -> {
227231
var supports = BoltProtocolV51.VERSION.compareTo(boltConnection.protocolVersion()) <= 0;
228232
return boltConnection.close().thenApply(ignored -> supports);

bolt-api-netty/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/impl/BoltConnectionImpl.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ public final class BoltConnectionImpl implements BoltConnection {
8080
private final BoltServerAddress serverAddress;
8181
private final BoltProtocolVersion protocolVersion;
8282
private final boolean telemetrySupported;
83+
private final boolean serverSideRouting;
8384
private final AtomicReference<BoltConnectionState> stateRef = new AtomicReference<>(BoltConnectionState.OPEN);
8485
private final AtomicReference<CompletableFuture<AuthData>> authDataRef;
8586
private final Map<String, Value> routingContext;
@@ -104,6 +105,7 @@ public BoltConnectionImpl(
104105
this.serverAddress = Objects.requireNonNull(connection.serverAddress());
105106
this.protocolVersion = Objects.requireNonNull(connection.protocol().version());
106107
this.telemetrySupported = connection.isTelemetryEnabled();
108+
this.serverSideRouting = connection.isSsrEnabled();
107109
this.authDataRef = new AtomicReference<>(
108110
CompletableFuture.completedFuture(new AuthDataImpl(authMap, latestAuthMillisFuture.join())));
109111
this.valueFactory = Objects.requireNonNull(valueFactory);
@@ -177,8 +179,8 @@ public void onError(Throwable throwable) {
177179
}
178180

179181
@Override
180-
public void onSummary(Void summary) {
181-
handler.onBeginSummary(BeginSummaryImpl.INSTANCE);
182+
public void onSummary(BeginSummary summary) {
183+
handler.onBeginSummary(summary);
182184
}
183185
},
184186
logging,
@@ -520,6 +522,11 @@ public boolean telemetrySupported() {
520522
return telemetrySupported;
521523
}
522524

525+
@Override
526+
public boolean serverSideRoutingEnabled() {
527+
return serverSideRouting;
528+
}
529+
523530
private CompletionStage<Void> executeInEventLoop(Runnable runnable) {
524531
var executeFuture = new CompletableFuture<Void>();
525532
Runnable stageCompletingRunnable = () -> {
@@ -720,10 +727,6 @@ private void runIgnoringError(Runnable runnable) {
720727
}
721728
}
722729

723-
private static class BeginSummaryImpl implements BeginSummary {
724-
private static final BeginSummary INSTANCE = new BeginSummaryImpl();
725-
}
726-
727730
private static class TelemetrySummaryImpl implements TelemetrySummary {
728731
private static final TelemetrySummary INSTANCE = new TelemetrySummaryImpl();
729732
}

bolt-api-netty/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/impl/async/NetworkConnection.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public class NetworkConnection implements Connection {
5353
private final String serverAgent;
5454
private final BoltServerAddress serverAddress;
5555
private final boolean telemetryEnabled;
56+
private final boolean ssrEnabled;
5657
private final BoltProtocol protocol;
5758

5859
private final Long connectionReadTimeout;
@@ -67,6 +68,7 @@ public NetworkConnection(Channel channel, LoggingProvider logging) {
6768
this.serverAgent = ChannelAttributes.serverAgent(channel);
6869
this.serverAddress = ChannelAttributes.serverAddress(channel);
6970
this.telemetryEnabled = ChannelAttributes.telemetryEnabled(channel);
71+
this.ssrEnabled = ChannelAttributes.ssrEnabled(channel);
7072
this.protocol = BoltProtocol.forChannel(channel);
7173
this.connectionReadTimeout =
7274
ChannelAttributes.connectionReadTimeout(channel).orElse(null);
@@ -111,6 +113,11 @@ public boolean isTelemetryEnabled() {
111113
return telemetryEnabled;
112114
}
113115

116+
@Override
117+
public boolean isSsrEnabled() {
118+
return ssrEnabled;
119+
}
120+
114121
@Override
115122
public String serverAgent() {
116123
return serverAgent;

bolt-api-netty/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/impl/async/connection/BoltProtocolUtil.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v42.BoltProtocolV42;
2828
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v44.BoltProtocolV44;
2929
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v5.BoltProtocolV5;
30-
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v57.BoltProtocolV57;
30+
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v58.BoltProtocolV58;
3131

3232
public final class BoltProtocolUtil {
3333
public static final int BOLT_MAGIC_PREAMBLE = 0x6060B017;
@@ -39,7 +39,7 @@ public final class BoltProtocolUtil {
3939

4040
private static final ByteBuf HANDSHAKE_BUF = unreleasableBuffer(copyInt(
4141
BOLT_MAGIC_PREAMBLE,
42-
BoltProtocolV57.VERSION.toIntRange(BoltProtocolV5.VERSION),
42+
BoltProtocolV58.VERSION.toIntRange(BoltProtocolV5.VERSION),
4343
BoltProtocolV44.VERSION.toIntRange(BoltProtocolV42.VERSION),
4444
BoltProtocolV41.VERSION.toInt(),
4545
BoltProtocolV3.VERSION.toInt()))

bolt-api-netty/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/impl/async/connection/ChannelAttributes.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ public final class ChannelAttributes {
4545

4646
// configuration hints provided by the server
4747
private static final AttributeKey<Long> CONNECTION_READ_TIMEOUT = newInstance("connectionReadTimeout");
48-
4948
private static final AttributeKey<Boolean> TELEMETRY_ENABLED = newInstance("telemetryEnabled");
49+
private static final AttributeKey<Boolean> SSR_ENABLED = newInstance("ssr.enabled");
5050

5151
private ChannelAttributes() {}
5252

@@ -153,6 +153,14 @@ public static boolean telemetryEnabled(Channel channel) {
153153
return Optional.ofNullable(get(channel, TELEMETRY_ENABLED)).orElse(false);
154154
}
155155

156+
public static void setSsrEnabled(Channel channel, Boolean telemetryEnabled) {
157+
setOnce(channel, SSR_ENABLED, telemetryEnabled);
158+
}
159+
160+
public static boolean ssrEnabled(Channel channel) {
161+
return Optional.ofNullable(get(channel, SSR_ENABLED)).orElse(false);
162+
}
163+
156164
private static <T> T get(Channel channel, AttributeKey<T> key) {
157165
return channel.attr(key).get();
158166
}

bolt-api-netty/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/impl/handlers/BeginTxResponseHandler.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,24 @@
2020

2121
import java.util.Arrays;
2222
import java.util.Map;
23+
import java.util.Optional;
2324
import java.util.concurrent.CompletableFuture;
25+
import org.neo4j.driver.internal.bolt.api.summary.BeginSummary;
2426
import org.neo4j.driver.internal.bolt.api.values.Value;
2527
import org.neo4j.driver.internal.bolt.basicimpl.impl.spi.ResponseHandler;
2628

2729
public class BeginTxResponseHandler implements ResponseHandler {
28-
private final CompletableFuture<Void> beginTxFuture;
30+
private final CompletableFuture<BeginSummary> beginTxFuture;
2931

30-
public BeginTxResponseHandler(CompletableFuture<Void> beginTxFuture) {
32+
public BeginTxResponseHandler(CompletableFuture<BeginSummary> beginTxFuture) {
3133
this.beginTxFuture = requireNonNull(beginTxFuture);
3234
}
3335

3436
@Override
3537
public void onSuccess(Map<String, Value> metadata) {
36-
beginTxFuture.complete(null);
38+
var db = metadata.get("db");
39+
var databaseName = db != null ? db.asString() : null;
40+
beginTxFuture.complete(new BeginSummaryImpl(databaseName));
3741
}
3842

3943
@Override
@@ -46,4 +50,11 @@ public void onRecord(Value[] fields) {
4650
throw new UnsupportedOperationException(
4751
"Transaction begin is not expected to receive records: " + Arrays.toString(fields));
4852
}
53+
54+
private record BeginSummaryImpl(String database) implements BeginSummary {
55+
@Override
56+
public Optional<String> databaseName() {
57+
return Optional.ofNullable(database);
58+
}
59+
}
4960
}

bolt-api-netty/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/impl/handlers/HelloV51ResponseHandler.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static org.neo4j.driver.internal.bolt.basicimpl.impl.async.connection.ChannelAttributes.setConnectionId;
2020
import static org.neo4j.driver.internal.bolt.basicimpl.impl.async.connection.ChannelAttributes.setConnectionReadTimeout;
2121
import static org.neo4j.driver.internal.bolt.basicimpl.impl.async.connection.ChannelAttributes.setServerAgent;
22+
import static org.neo4j.driver.internal.bolt.basicimpl.impl.async.connection.ChannelAttributes.setSsrEnabled;
2223
import static org.neo4j.driver.internal.bolt.basicimpl.impl.async.connection.ChannelAttributes.setTelemetryEnabled;
2324
import static org.neo4j.driver.internal.bolt.basicimpl.impl.util.MetadataExtractor.extractServer;
2425

@@ -35,6 +36,7 @@ public class HelloV51ResponseHandler implements ResponseHandler {
3536
public static final String CONFIGURATION_HINTS_KEY = "hints";
3637
public static final String CONNECTION_RECEIVE_TIMEOUT_SECONDS_KEY = "connection.recv_timeout_seconds";
3738
public static final String TELEMETRY_ENABLED_KEY = "telemetry.enabled";
39+
public static final String SSR_ENABLED_KEY = "ssr.enabled";
3840

3941
private final Channel channel;
4042
private final CompletableFuture<String> helloFuture;
@@ -85,6 +87,12 @@ private void processConfigurationHints(Map<String, Value> metadata) {
8587
return !value.isNull() && value.asBoolean();
8688
})
8789
.ifPresent(telemetryEnabled -> setTelemetryEnabled(channel, telemetryEnabled));
90+
91+
getFromSupplierOrEmptyOnException(() -> {
92+
var value = configurationHints.get(SSR_ENABLED_KEY);
93+
return !value.isNull() && value.asBoolean();
94+
})
95+
.ifPresent(telemetryEnabled -> setSsrEnabled(channel, telemetryEnabled));
8896
}
8997
}
9098

bolt-api-netty/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/impl/handlers/RunResponseHandler.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.util.List;
2020
import java.util.Map;
21+
import java.util.Optional;
2122
import java.util.concurrent.CompletableFuture;
2223
import org.neo4j.driver.internal.bolt.api.summary.RunSummary;
2324
import org.neo4j.driver.internal.bolt.api.values.Value;
@@ -38,8 +39,10 @@ public void onSuccess(Map<String, Value> metadata) {
3839
var queryKeys = metadataExtractor.extractQueryKeys(metadata);
3940
var resultAvailableAfter = metadataExtractor.extractResultAvailableAfter(metadata);
4041
var queryId = metadataExtractor.extractQueryId(metadata);
42+
var db = metadata.get("db");
43+
var databaseName = db != null ? db.asString() : null;
4144

42-
runFuture.complete(new RunResponseImpl(queryId, queryKeys, resultAvailableAfter));
45+
runFuture.complete(new RunResponseImpl(queryId, queryKeys, resultAvailableAfter, databaseName));
4346
}
4447

4548
@Override
@@ -52,5 +55,11 @@ public void onRecord(Value[] fields) {
5255
throw new UnsupportedOperationException();
5356
}
5457

55-
private record RunResponseImpl(long queryId, List<String> keys, long resultAvailableAfter) implements RunSummary {}
58+
private record RunResponseImpl(long queryId, List<String> keys, long resultAvailableAfter, String database)
59+
implements RunSummary {
60+
@Override
61+
public Optional<String> databaseName() {
62+
return Optional.ofNullable(database);
63+
}
64+
}
5665
}

bolt-api-netty/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/impl/messaging/BoltProtocol.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.neo4j.driver.internal.bolt.api.RoutingContext;
3535
import org.neo4j.driver.internal.bolt.api.exception.BoltClientException;
3636
import org.neo4j.driver.internal.bolt.api.exception.BoltUnsupportedFeatureException;
37+
import org.neo4j.driver.internal.bolt.api.summary.BeginSummary;
3738
import org.neo4j.driver.internal.bolt.api.summary.DiscardSummary;
3839
import org.neo4j.driver.internal.bolt.api.summary.RouteSummary;
3940
import org.neo4j.driver.internal.bolt.api.summary.RunSummary;
@@ -53,6 +54,7 @@
5354
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v55.BoltProtocolV55;
5455
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v56.BoltProtocolV56;
5556
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v57.BoltProtocolV57;
57+
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v58.BoltProtocolV58;
5658
import org.neo4j.driver.internal.bolt.basicimpl.impl.spi.Connection;
5759

5860
public interface BoltProtocol {
@@ -90,7 +92,7 @@ CompletionStage<Void> beginTransaction(
9092
Map<String, Value> txMetadata,
9193
String txType,
9294
NotificationConfig notificationConfig,
93-
MessageHandler<Void> handler,
95+
MessageHandler<BeginSummary> handler,
9496
LoggingProvider logging,
9597
ValueFactory valueFactory);
9698

@@ -182,6 +184,8 @@ static BoltProtocol forVersion(BoltProtocolVersion version) {
182184
return BoltProtocolV56.INSTANCE;
183185
} else if (BoltProtocolV57.VERSION.equals(version)) {
184186
return BoltProtocolV57.INSTANCE;
187+
} else if (BoltProtocolV58.VERSION.equals(version)) {
188+
return BoltProtocolV58.INSTANCE;
185189
}
186190
throw new BoltClientException("Unknown protocol version: " + version);
187191
}

bolt-api-netty/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/impl/messaging/v3/BoltProtocolV3.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.neo4j.driver.internal.bolt.api.RoutingContext;
4343
import org.neo4j.driver.internal.bolt.api.exception.BoltException;
4444
import org.neo4j.driver.internal.bolt.api.exception.BoltUnsupportedFeatureException;
45+
import org.neo4j.driver.internal.bolt.api.summary.BeginSummary;
4546
import org.neo4j.driver.internal.bolt.api.summary.DiscardSummary;
4647
import org.neo4j.driver.internal.bolt.api.summary.PullSummary;
4748
import org.neo4j.driver.internal.bolt.api.summary.RouteSummary;
@@ -249,7 +250,7 @@ public CompletionStage<Void> beginTransaction(
249250
Map<String, Value> txMetadata,
250251
String txType,
251252
NotificationConfig notificationConfig,
252-
MessageHandler<Void> handler,
253+
MessageHandler<BeginSummary> handler,
253254
LoggingProvider logging,
254255
ValueFactory valueFactory) {
255256
var exception = verifyNotificationConfigSupported(notificationConfig);
@@ -262,7 +263,7 @@ public CompletionStage<Void> beginTransaction(
262263
return CompletableFuture.failedFuture(error);
263264
}
264265

265-
var beginTxFuture = new CompletableFuture<Void>();
266+
var beginTxFuture = new CompletableFuture<BeginSummary>();
266267
var beginMessage = new BeginMessage(
267268
bookmarks,
268269
txTimeout,
@@ -275,11 +276,11 @@ public CompletionStage<Void> beginTransaction(
275276
useLegacyNotifications(),
276277
logging,
277278
valueFactory);
278-
beginTxFuture.whenComplete((ignored, throwable) -> {
279+
beginTxFuture.whenComplete((summary, throwable) -> {
279280
if (throwable != null) {
280281
handler.onError(throwable);
281282
} else {
282-
handler.onSummary(null);
283+
handler.onSummary(summary);
283284
}
284285
});
285286
return connection.write(beginMessage, new BeginTxResponseHandler(beginTxFuture));
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [https://neo4j.com]
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v58;
18+
19+
import org.neo4j.driver.internal.bolt.api.BoltProtocolVersion;
20+
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.BoltProtocol;
21+
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v57.BoltProtocolV57;
22+
23+
public class BoltProtocolV58 extends BoltProtocolV57 {
24+
public static final BoltProtocolVersion VERSION = new BoltProtocolVersion(5, 8);
25+
public static final BoltProtocol INSTANCE = new BoltProtocolV58();
26+
27+
@Override
28+
public BoltProtocolVersion version() {
29+
return VERSION;
30+
}
31+
}

bolt-api-netty/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/impl/spi/Connection.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ public interface Connection {
3535

3636
boolean isTelemetryEnabled();
3737

38+
boolean isSsrEnabled();
39+
3840
String serverAgent();
3941

4042
BoltServerAddress serverAddress();

0 commit comments

Comments
 (0)