Skip to content

Introduce home database resolution cache #1600

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ public CompletionStage<BoltConnection> connect(
String impersonatedUser,
BoltProtocolVersion minVersion,
NotificationConfig notificationConfig,
Consumer<DatabaseName> databaseNameConsumer) {
Consumer<DatabaseName> databaseNameConsumer,
Map<String, Object> additionalParameters) {
synchronized (this) {
if (closeFuture != null) {
return CompletableFuture.failedFuture(new IllegalStateException("Connection provider is closed."));
Expand Down Expand Up @@ -189,7 +190,8 @@ public CompletionStage<Void> verifyConnectivity(SecurityPlan securityPlan, Map<S
null,
null,
null,
(ignored) -> {})
(ignored) -> {},
Collections.emptyMap())
.thenCompose(BoltConnection::close);
}

Expand All @@ -204,7 +206,8 @@ public CompletionStage<Boolean> supportsMultiDb(SecurityPlan securityPlan, Map<S
null,
null,
null,
(ignored) -> {})
(ignored) -> {},
Collections.emptyMap())
.thenCompose(boltConnection -> {
var supports = boltConnection.protocolVersion().compareTo(BoltProtocolV4.VERSION) >= 0;
return boltConnection.close().thenApply(ignored -> supports);
Expand All @@ -222,7 +225,8 @@ public CompletionStage<Boolean> supportsSessionAuth(SecurityPlan securityPlan, M
null,
null,
null,
(ignored) -> {})
(ignored) -> {},
Collections.emptyMap())
.thenCompose(boltConnection -> {
var supports = BoltProtocolV51.VERSION.compareTo(boltConnection.protocolVersion()) <= 0;
return boltConnection.close().thenApply(ignored -> supports);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public final class BoltConnectionImpl implements BoltConnection {
private final BoltServerAddress serverAddress;
private final BoltProtocolVersion protocolVersion;
private final boolean telemetrySupported;
private final boolean serverSideRouting;
private final AtomicReference<BoltConnectionState> stateRef = new AtomicReference<>(BoltConnectionState.OPEN);
private final AtomicReference<CompletableFuture<AuthData>> authDataRef;
private final Map<String, Value> routingContext;
Expand All @@ -104,6 +105,7 @@ public BoltConnectionImpl(
this.serverAddress = Objects.requireNonNull(connection.serverAddress());
this.protocolVersion = Objects.requireNonNull(connection.protocol().version());
this.telemetrySupported = connection.isTelemetryEnabled();
this.serverSideRouting = connection.isSsrEnabled();
this.authDataRef = new AtomicReference<>(
CompletableFuture.completedFuture(new AuthDataImpl(authMap, latestAuthMillisFuture.join())));
this.valueFactory = Objects.requireNonNull(valueFactory);
Expand Down Expand Up @@ -177,8 +179,8 @@ public void onError(Throwable throwable) {
}

@Override
public void onSummary(Void summary) {
handler.onBeginSummary(BeginSummaryImpl.INSTANCE);
public void onSummary(BeginSummary summary) {
handler.onBeginSummary(summary);
}
},
logging,
Expand Down Expand Up @@ -520,6 +522,11 @@ public boolean telemetrySupported() {
return telemetrySupported;
}

@Override
public boolean serverSideRoutingEnabled() {
return serverSideRouting;
}

private CompletionStage<Void> executeInEventLoop(Runnable runnable) {
var executeFuture = new CompletableFuture<Void>();
Runnable stageCompletingRunnable = () -> {
Expand Down Expand Up @@ -720,10 +727,6 @@ private void runIgnoringError(Runnable runnable) {
}
}

private static class BeginSummaryImpl implements BeginSummary {
private static final BeginSummary INSTANCE = new BeginSummaryImpl();
}

private static class TelemetrySummaryImpl implements TelemetrySummary {
private static final TelemetrySummary INSTANCE = new TelemetrySummaryImpl();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class NetworkConnection implements Connection {
private final String serverAgent;
private final BoltServerAddress serverAddress;
private final boolean telemetryEnabled;
private final boolean ssrEnabled;
private final BoltProtocol protocol;

private final Long connectionReadTimeout;
Expand All @@ -67,6 +68,7 @@ public NetworkConnection(Channel channel, LoggingProvider logging) {
this.serverAgent = ChannelAttributes.serverAgent(channel);
this.serverAddress = ChannelAttributes.serverAddress(channel);
this.telemetryEnabled = ChannelAttributes.telemetryEnabled(channel);
this.ssrEnabled = ChannelAttributes.ssrEnabled(channel);
this.protocol = BoltProtocol.forChannel(channel);
this.connectionReadTimeout =
ChannelAttributes.connectionReadTimeout(channel).orElse(null);
Expand Down Expand Up @@ -111,6 +113,11 @@ public boolean isTelemetryEnabled() {
return telemetryEnabled;
}

@Override
public boolean isSsrEnabled() {
return ssrEnabled;
}

@Override
public String serverAgent() {
return serverAgent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v42.BoltProtocolV42;
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v44.BoltProtocolV44;
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v5.BoltProtocolV5;
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v57.BoltProtocolV57;
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v58.BoltProtocolV58;

public final class BoltProtocolUtil {
public static final int BOLT_MAGIC_PREAMBLE = 0x6060B017;
Expand All @@ -39,7 +39,7 @@ public final class BoltProtocolUtil {

private static final ByteBuf HANDSHAKE_BUF = unreleasableBuffer(copyInt(
BOLT_MAGIC_PREAMBLE,
BoltProtocolV57.VERSION.toIntRange(BoltProtocolV5.VERSION),
BoltProtocolV58.VERSION.toIntRange(BoltProtocolV5.VERSION),
BoltProtocolV44.VERSION.toIntRange(BoltProtocolV42.VERSION),
BoltProtocolV41.VERSION.toInt(),
BoltProtocolV3.VERSION.toInt()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ public final class ChannelAttributes {

// configuration hints provided by the server
private static final AttributeKey<Long> CONNECTION_READ_TIMEOUT = newInstance("connectionReadTimeout");

private static final AttributeKey<Boolean> TELEMETRY_ENABLED = newInstance("telemetryEnabled");
private static final AttributeKey<Boolean> SSR_ENABLED = newInstance("ssr.enabled");

private ChannelAttributes() {}

Expand Down Expand Up @@ -153,6 +153,14 @@ public static boolean telemetryEnabled(Channel channel) {
return Optional.ofNullable(get(channel, TELEMETRY_ENABLED)).orElse(false);
}

public static void setSsrEnabled(Channel channel, Boolean telemetryEnabled) {
setOnce(channel, SSR_ENABLED, telemetryEnabled);
}

public static boolean ssrEnabled(Channel channel) {
return Optional.ofNullable(get(channel, SSR_ENABLED)).orElse(false);
}

private static <T> T get(Channel channel, AttributeKey<T> key) {
return channel.attr(key).get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,24 @@

import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.neo4j.driver.internal.bolt.api.summary.BeginSummary;
import org.neo4j.driver.internal.bolt.api.values.Value;
import org.neo4j.driver.internal.bolt.basicimpl.impl.spi.ResponseHandler;

public class BeginTxResponseHandler implements ResponseHandler {
private final CompletableFuture<Void> beginTxFuture;
private final CompletableFuture<BeginSummary> beginTxFuture;

public BeginTxResponseHandler(CompletableFuture<Void> beginTxFuture) {
public BeginTxResponseHandler(CompletableFuture<BeginSummary> beginTxFuture) {
this.beginTxFuture = requireNonNull(beginTxFuture);
}

@Override
public void onSuccess(Map<String, Value> metadata) {
beginTxFuture.complete(null);
var db = metadata.get("db");
var databaseName = db != null ? db.asString() : null;
beginTxFuture.complete(new BeginSummaryImpl(databaseName));
}

@Override
Expand All @@ -46,4 +50,11 @@ public void onRecord(Value[] fields) {
throw new UnsupportedOperationException(
"Transaction begin is not expected to receive records: " + Arrays.toString(fields));
}

private record BeginSummaryImpl(String database) implements BeginSummary {
@Override
public Optional<String> databaseName() {
return Optional.ofNullable(database);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.neo4j.driver.internal.bolt.basicimpl.impl.async.connection.ChannelAttributes.setConnectionId;
import static org.neo4j.driver.internal.bolt.basicimpl.impl.async.connection.ChannelAttributes.setConnectionReadTimeout;
import static org.neo4j.driver.internal.bolt.basicimpl.impl.async.connection.ChannelAttributes.setServerAgent;
import static org.neo4j.driver.internal.bolt.basicimpl.impl.async.connection.ChannelAttributes.setSsrEnabled;
import static org.neo4j.driver.internal.bolt.basicimpl.impl.async.connection.ChannelAttributes.setTelemetryEnabled;
import static org.neo4j.driver.internal.bolt.basicimpl.impl.util.MetadataExtractor.extractServer;

Expand All @@ -35,6 +36,7 @@ public class HelloV51ResponseHandler implements ResponseHandler {
public static final String CONFIGURATION_HINTS_KEY = "hints";
public static final String CONNECTION_RECEIVE_TIMEOUT_SECONDS_KEY = "connection.recv_timeout_seconds";
public static final String TELEMETRY_ENABLED_KEY = "telemetry.enabled";
public static final String SSR_ENABLED_KEY = "ssr.enabled";

private final Channel channel;
private final CompletableFuture<String> helloFuture;
Expand Down Expand Up @@ -85,6 +87,12 @@ private void processConfigurationHints(Map<String, Value> metadata) {
return !value.isNull() && value.asBoolean();
})
.ifPresent(telemetryEnabled -> setTelemetryEnabled(channel, telemetryEnabled));

getFromSupplierOrEmptyOnException(() -> {
var value = configurationHints.get(SSR_ENABLED_KEY);
return !value.isNull() && value.asBoolean();
})
.ifPresent(telemetryEnabled -> setSsrEnabled(channel, telemetryEnabled));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.neo4j.driver.internal.bolt.api.summary.RunSummary;
import org.neo4j.driver.internal.bolt.api.values.Value;
Expand All @@ -38,8 +39,10 @@ public void onSuccess(Map<String, Value> metadata) {
var queryKeys = metadataExtractor.extractQueryKeys(metadata);
var resultAvailableAfter = metadataExtractor.extractResultAvailableAfter(metadata);
var queryId = metadataExtractor.extractQueryId(metadata);
var db = metadata.get("db");
var databaseName = db != null ? db.asString() : null;

runFuture.complete(new RunResponseImpl(queryId, queryKeys, resultAvailableAfter));
runFuture.complete(new RunResponseImpl(queryId, queryKeys, resultAvailableAfter, databaseName));
}

@Override
Expand All @@ -52,5 +55,11 @@ public void onRecord(Value[] fields) {
throw new UnsupportedOperationException();
}

private record RunResponseImpl(long queryId, List<String> keys, long resultAvailableAfter) implements RunSummary {}
private record RunResponseImpl(long queryId, List<String> keys, long resultAvailableAfter, String database)
implements RunSummary {
@Override
public Optional<String> databaseName() {
return Optional.ofNullable(database);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.neo4j.driver.internal.bolt.api.RoutingContext;
import org.neo4j.driver.internal.bolt.api.exception.BoltClientException;
import org.neo4j.driver.internal.bolt.api.exception.BoltUnsupportedFeatureException;
import org.neo4j.driver.internal.bolt.api.summary.BeginSummary;
import org.neo4j.driver.internal.bolt.api.summary.DiscardSummary;
import org.neo4j.driver.internal.bolt.api.summary.RouteSummary;
import org.neo4j.driver.internal.bolt.api.summary.RunSummary;
Expand All @@ -53,6 +54,7 @@
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v55.BoltProtocolV55;
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v56.BoltProtocolV56;
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v57.BoltProtocolV57;
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v58.BoltProtocolV58;
import org.neo4j.driver.internal.bolt.basicimpl.impl.spi.Connection;

public interface BoltProtocol {
Expand Down Expand Up @@ -90,7 +92,7 @@ CompletionStage<Void> beginTransaction(
Map<String, Value> txMetadata,
String txType,
NotificationConfig notificationConfig,
MessageHandler<Void> handler,
MessageHandler<BeginSummary> handler,
LoggingProvider logging,
ValueFactory valueFactory);

Expand Down Expand Up @@ -182,6 +184,8 @@ static BoltProtocol forVersion(BoltProtocolVersion version) {
return BoltProtocolV56.INSTANCE;
} else if (BoltProtocolV57.VERSION.equals(version)) {
return BoltProtocolV57.INSTANCE;
} else if (BoltProtocolV58.VERSION.equals(version)) {
return BoltProtocolV58.INSTANCE;
}
throw new BoltClientException("Unknown protocol version: " + version);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.neo4j.driver.internal.bolt.api.RoutingContext;
import org.neo4j.driver.internal.bolt.api.exception.BoltException;
import org.neo4j.driver.internal.bolt.api.exception.BoltUnsupportedFeatureException;
import org.neo4j.driver.internal.bolt.api.summary.BeginSummary;
import org.neo4j.driver.internal.bolt.api.summary.DiscardSummary;
import org.neo4j.driver.internal.bolt.api.summary.PullSummary;
import org.neo4j.driver.internal.bolt.api.summary.RouteSummary;
Expand Down Expand Up @@ -249,7 +250,7 @@ public CompletionStage<Void> beginTransaction(
Map<String, Value> txMetadata,
String txType,
NotificationConfig notificationConfig,
MessageHandler<Void> handler,
MessageHandler<BeginSummary> handler,
LoggingProvider logging,
ValueFactory valueFactory) {
var exception = verifyNotificationConfigSupported(notificationConfig);
Expand All @@ -262,7 +263,7 @@ public CompletionStage<Void> beginTransaction(
return CompletableFuture.failedFuture(error);
}

var beginTxFuture = new CompletableFuture<Void>();
var beginTxFuture = new CompletableFuture<BeginSummary>();
var beginMessage = new BeginMessage(
bookmarks,
txTimeout,
Expand All @@ -275,11 +276,11 @@ public CompletionStage<Void> beginTransaction(
useLegacyNotifications(),
logging,
valueFactory);
beginTxFuture.whenComplete((ignored, throwable) -> {
beginTxFuture.whenComplete((summary, throwable) -> {
if (throwable != null) {
handler.onError(throwable);
} else {
handler.onSummary(null);
handler.onSummary(summary);
}
});
return connection.write(beginMessage, new BeginTxResponseHandler(beginTxFuture));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [https://neo4j.com]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v58;

import org.neo4j.driver.internal.bolt.api.BoltProtocolVersion;
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.BoltProtocol;
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v57.BoltProtocolV57;

public class BoltProtocolV58 extends BoltProtocolV57 {
public static final BoltProtocolVersion VERSION = new BoltProtocolVersion(5, 8);
public static final BoltProtocol INSTANCE = new BoltProtocolV58();

@Override
public BoltProtocolVersion version() {
return VERSION;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public interface Connection {

boolean isTelemetryEnabled();

boolean isSsrEnabled();

String serverAgent();

BoltServerAddress serverAddress();
Expand Down
Loading