Skip to content

Update sub millisecond transaction timeout handling #1451

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ protected Rediscovery createRediscovery(
Logging logging,
DomainNameResolver domainNameResolver) {
var clusterCompositionProvider =
new RoutingProcedureClusterCompositionProvider(clock, settings.routingContext());
new RoutingProcedureClusterCompositionProvider(clock, settings.routingContext(), logging);
return new RediscoveryImpl(initialRouter, clusterCompositionProvider, resolver, logging, domainNameResolver);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class NetworkSession {
private final NetworkSessionConnectionContext connectionContext;
private final AccessMode mode;
private final RetryLogic retryLogic;
private final Logging logging;
protected final Logger log;

private final long fetchSize;
Expand Down Expand Up @@ -97,6 +98,7 @@ public NetworkSession(
this.connectionProvider = connectionProvider;
this.mode = mode;
this.retryLogic = retryLogic;
this.logging = logging;
this.log = new PrefixedLogger("[" + hashCode() + "]", logging.getLog(getClass()));
var databaseNameFuture = databaseName
.databaseName()
Expand Down Expand Up @@ -150,7 +152,7 @@ public CompletionStage<UnmanagedTransaction> beginTransactionAsync(
ImpersonationUtil.ensureImpersonationSupport(connection, connection.impersonatedUser()))
.thenCompose(connection -> {
var tx = new UnmanagedTransaction(
connection, this::handleNewBookmark, fetchSize, notificationConfig);
connection, this::handleNewBookmark, fetchSize, notificationConfig, logging);
return tx.beginAsync(determineBookmarks(true), config, txType);
});

Expand Down Expand Up @@ -268,7 +270,8 @@ private CompletionStage<ResultCursorFactory> buildResultCursorFactory(Query quer
this::handleNewBookmark,
config,
fetchSize,
notificationConfig);
notificationConfig,
logging);
return completedFuture(factory);
} catch (Throwable e) {
return Futures.failedFuture(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.function.Consumer;
import java.util.function.Function;
import org.neo4j.driver.Bookmark;
import org.neo4j.driver.Logging;
import org.neo4j.driver.NotificationConfig;
import org.neo4j.driver.Query;
import org.neo4j.driver.TransactionConfig;
Expand Down Expand Up @@ -98,34 +99,38 @@ private enum State {
private Throwable causeOfTermination;
private CompletionStage<Void> terminationStage;
private final NotificationConfig notificationConfig;
private final Logging logging;

public UnmanagedTransaction(
Connection connection,
Consumer<DatabaseBookmark> bookmarkConsumer,
long fetchSize,
NotificationConfig notificationConfig) {
this(connection, bookmarkConsumer, fetchSize, new ResultCursorsHolder(), notificationConfig);
NotificationConfig notificationConfig,
Logging logging) {
this(connection, bookmarkConsumer, fetchSize, new ResultCursorsHolder(), notificationConfig, logging);
}

protected UnmanagedTransaction(
Connection connection,
Consumer<DatabaseBookmark> bookmarkConsumer,
long fetchSize,
ResultCursorsHolder resultCursors,
NotificationConfig notificationConfig) {
NotificationConfig notificationConfig,
Logging logging) {
this.connection = connection;
this.protocol = connection.protocol();
this.bookmarkConsumer = bookmarkConsumer;
this.resultCursors = resultCursors;
this.fetchSize = fetchSize;
this.notificationConfig = notificationConfig;
this.logging = logging;

connection.bindTerminationAwareStateLockingExecutor(this);
}

public CompletionStage<UnmanagedTransaction> beginAsync(
Set<Bookmark> initialBookmarks, TransactionConfig config, String txType) {
return protocol.beginTransaction(connection, initialBookmarks, config, txType, notificationConfig)
return protocol.beginTransaction(connection, initialBookmarks, config, txType, notificationConfig, logging)
.handle((ignore, beginError) -> {
if (beginError != null) {
if (beginError instanceof AuthorizationExpiredException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Set;
import org.neo4j.driver.AccessMode;
import org.neo4j.driver.Bookmark;
import org.neo4j.driver.Logging;
import org.neo4j.driver.Query;
import org.neo4j.driver.Value;
import org.neo4j.driver.internal.DatabaseName;
Expand All @@ -41,8 +42,8 @@ public class MultiDatabasesRoutingProcedureRunner extends SingleDatabaseRoutingP
static final String MULTI_DB_GET_ROUTING_TABLE =
String.format("CALL dbms.routing.getRoutingTable($%s, $%s)", ROUTING_CONTEXT, DATABASE_NAME);

public MultiDatabasesRoutingProcedureRunner(RoutingContext context) {
super(context);
public MultiDatabasesRoutingProcedureRunner(RoutingContext context, Logging logging) {
super(context, logging);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import org.neo4j.driver.Bookmark;
import org.neo4j.driver.Logging;
import org.neo4j.driver.exceptions.ProtocolException;
import org.neo4j.driver.exceptions.value.ValueException;
import org.neo4j.driver.internal.DatabaseName;
Expand All @@ -40,11 +41,11 @@ public class RoutingProcedureClusterCompositionProvider implements ClusterCompos
private final RoutingProcedureRunner multiDatabaseRoutingProcedureRunner;
private final RoutingProcedureRunner routeMessageRoutingProcedureRunner;

public RoutingProcedureClusterCompositionProvider(Clock clock, RoutingContext routingContext) {
public RoutingProcedureClusterCompositionProvider(Clock clock, RoutingContext routingContext, Logging logging) {
this(
clock,
new SingleDatabaseRoutingProcedureRunner(routingContext),
new MultiDatabasesRoutingProcedureRunner(routingContext),
new SingleDatabaseRoutingProcedureRunner(routingContext, logging),
new MultiDatabasesRoutingProcedureRunner(routingContext, logging),
new RouteMessageRoutingProcedureRunner(routingContext));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.CompletionStage;
import org.neo4j.driver.AccessMode;
import org.neo4j.driver.Bookmark;
import org.neo4j.driver.Logging;
import org.neo4j.driver.Query;
import org.neo4j.driver.Record;
import org.neo4j.driver.TransactionConfig;
Expand All @@ -50,9 +51,11 @@ public class SingleDatabaseRoutingProcedureRunner implements RoutingProcedureRun
static final String GET_ROUTING_TABLE = "CALL dbms.cluster.routing.getRoutingTable($" + ROUTING_CONTEXT + ")";

final RoutingContext context;
private Logging logging;

public SingleDatabaseRoutingProcedureRunner(RoutingContext context) {
public SingleDatabaseRoutingProcedureRunner(RoutingContext context, Logging logging) {
this.context = context;
this.logging = logging;
}

@Override
Expand Down Expand Up @@ -93,7 +96,8 @@ CompletionStage<List<Record>> runProcedure(Connection connection, Query procedur
(ignored) -> {},
TransactionConfig.empty(),
UNLIMITED_FETCH_SIZE,
null)
null,
logging)
.asyncResult()
.thenCompose(ResultCursor::listAsync);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.function.Consumer;
import org.neo4j.driver.AuthToken;
import org.neo4j.driver.Bookmark;
import org.neo4j.driver.Logging;
import org.neo4j.driver.NotificationConfig;
import org.neo4j.driver.Query;
import org.neo4j.driver.Session;
Expand Down Expand Up @@ -93,14 +94,16 @@ void initializeChannel(
* @param config the transaction configuration. Never null, should be {@link TransactionConfig#empty()} when absent.
* @param txType the Kernel transaction type
* @param notificationConfig the notification configuration
* @param logging the driver logging
* @return a completion stage completed when transaction is started or completed exceptionally when there was a failure.
*/
CompletionStage<Void> beginTransaction(
Connection connection,
Set<Bookmark> bookmarks,
TransactionConfig config,
String txType,
NotificationConfig notificationConfig);
NotificationConfig notificationConfig,
Logging logging);

/**
* Commit the unmanaged transaction.
Expand All @@ -127,6 +130,7 @@ CompletionStage<Void> beginTransaction(
* @param config the transaction config for the implicitly started auto-commit transaction.
* @param fetchSize the record fetch size for PULL message.
* @param notificationConfig the notification configuration
* @param logging the driver logging
* @return stage with cursor.
*/
ResultCursorFactory runInAutoCommitTransaction(
Expand All @@ -136,7 +140,8 @@ ResultCursorFactory runInAutoCommitTransaction(
Consumer<DatabaseBookmark> bookmarkConsumer,
TransactionConfig config,
long fetchSize,
NotificationConfig notificationConfig);
NotificationConfig notificationConfig,
Logging logging);

/**
* Execute the given query in a running unmanaged transaction, i.e. {@link Transaction#run(Query)}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Set;
import org.neo4j.driver.AccessMode;
import org.neo4j.driver.Bookmark;
import org.neo4j.driver.Logging;
import org.neo4j.driver.NotificationConfig;
import org.neo4j.driver.TransactionConfig;
import org.neo4j.driver.Value;
Expand All @@ -41,7 +42,8 @@ public BeginMessage(
AccessMode mode,
String impersonatedUser,
String txType,
NotificationConfig notificationConfig) {
NotificationConfig notificationConfig,
Logging logging) {
this(
bookmarks,
config.timeout(),
Expand All @@ -50,7 +52,8 @@ public BeginMessage(
databaseName,
impersonatedUser,
txType,
notificationConfig);
notificationConfig,
logging);
}

public BeginMessage(
Expand All @@ -61,9 +64,18 @@ public BeginMessage(
DatabaseName databaseName,
String impersonatedUser,
String txType,
NotificationConfig notificationConfig) {
NotificationConfig notificationConfig,
Logging logging) {
super(buildMetadata(
txTimeout, txMetadata, databaseName, mode, bookmarks, impersonatedUser, txType, notificationConfig));
txTimeout,
txMetadata,
databaseName,
mode,
bookmarks,
impersonatedUser,
txType,
notificationConfig,
logging));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Set;
import org.neo4j.driver.AccessMode;
import org.neo4j.driver.Bookmark;
import org.neo4j.driver.Logging;
import org.neo4j.driver.NotificationConfig;
import org.neo4j.driver.Query;
import org.neo4j.driver.TransactionConfig;
Expand All @@ -47,7 +48,8 @@ public static RunWithMetadataMessage autoCommitTxRunMessage(
AccessMode mode,
Set<Bookmark> bookmarks,
String impersonatedUser,
NotificationConfig notificationConfig) {
NotificationConfig notificationConfig,
Logging logging) {
return autoCommitTxRunMessage(
query,
config.timeout(),
Expand All @@ -56,7 +58,8 @@ public static RunWithMetadataMessage autoCommitTxRunMessage(
mode,
bookmarks,
impersonatedUser,
notificationConfig);
notificationConfig,
logging);
}

public static RunWithMetadataMessage autoCommitTxRunMessage(
Expand All @@ -67,9 +70,18 @@ public static RunWithMetadataMessage autoCommitTxRunMessage(
AccessMode mode,
Set<Bookmark> bookmarks,
String impersonatedUser,
NotificationConfig notificationConfig) {
NotificationConfig notificationConfig,
Logging logging) {
var metadata = buildMetadata(
txTimeout, txMetadata, databaseName, mode, bookmarks, impersonatedUser, null, notificationConfig);
txTimeout,
txMetadata,
databaseName,
mode,
bookmarks,
impersonatedUser,
null,
notificationConfig,
logging);
return new RunWithMetadataMessage(query.text(), query.parameters().asMap(ofValue()), metadata);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@

import static java.util.Collections.emptyMap;
import static org.neo4j.driver.Values.value;
import static org.neo4j.driver.internal.DatabaseNameUtil.defaultDatabase;

import java.time.Duration;
import java.util.Map;
import java.util.Set;
import org.neo4j.driver.AccessMode;
import org.neo4j.driver.Bookmark;
import org.neo4j.driver.Logging;
import org.neo4j.driver.NotificationConfig;
import org.neo4j.driver.Value;
import org.neo4j.driver.internal.DatabaseName;
Expand All @@ -42,16 +42,6 @@ public class TransactionMetadataBuilder {
private static final String IMPERSONATED_USER_KEY = "imp_user";
private static final String TX_TYPE_KEY = "tx_type";

public static Map<String, Value> buildMetadata(
Duration txTimeout,
Map<String, Value> txMetadata,
AccessMode mode,
Set<Bookmark> bookmarks,
String impersonatedUser,
String txType) {
return buildMetadata(txTimeout, txMetadata, defaultDatabase(), mode, bookmarks, impersonatedUser, txType, null);
}

public static Map<String, Value> buildMetadata(
Duration txTimeout,
Map<String, Value> txMetadata,
Expand All @@ -60,7 +50,8 @@ public static Map<String, Value> buildMetadata(
Set<Bookmark> bookmarks,
String impersonatedUser,
String txType,
NotificationConfig notificationConfig) {
NotificationConfig notificationConfig,
Logging logging) {
var bookmarksPresent = !bookmarks.isEmpty();
var txTimeoutPresent = txTimeout != null;
var txMetadataPresent = txMetadata != null && !txMetadata.isEmpty();
Expand All @@ -87,7 +78,14 @@ public static Map<String, Value> buildMetadata(
result.put(BOOKMARKS_METADATA_KEY, value(bookmarks.stream().map(Bookmark::value)));
}
if (txTimeoutPresent) {
result.put(TX_TIMEOUT_METADATA_KEY, value(txTimeout.toMillis()));
var millis = txTimeout.toMillis();
if (txTimeout.toNanosPart() % 1_000_000 > 0) {
var log = logging.getLog(TransactionMetadataBuilder.class);
millis++;
log.info(
"The transaction timeout has been rounded up to next millisecond value since the config had a fractional millisecond value");
}
result.put(TX_TIMEOUT_METADATA_KEY, value(millis));
}
if (txMetadataPresent) {
result.put(TX_METADATA_METADATA_KEY, value(txMetadata));
Expand Down
Loading