Skip to content

Update sub millisecond transaction timeout handling #1517

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class NetworkSession {
private final NetworkSessionConnectionContext connectionContext;
private final AccessMode mode;
private final RetryLogic retryLogic;
private final Logging logging;
protected final Logger log;

private final BookmarkHolder bookmarkHolder;
Expand All @@ -74,6 +75,7 @@ public NetworkSession(
this.connectionProvider = connectionProvider;
this.mode = mode;
this.retryLogic = retryLogic;
this.logging = logging;
this.log = new PrefixedLogger("[" + hashCode() + "]", logging.getLog(getClass()));
this.bookmarkHolder = bookmarkHolder;
CompletableFuture<DatabaseName> databaseNameFuture = databaseName
Expand Down Expand Up @@ -116,7 +118,7 @@ public CompletionStage<UnmanagedTransaction> beginTransactionAsync(AccessMode mo
.thenApply(connection ->
ImpersonationUtil.ensureImpersonationSupport(connection, connection.impersonatedUser()))
.thenCompose(connection -> {
UnmanagedTransaction tx = new UnmanagedTransaction(connection, bookmarkHolder, fetchSize);
UnmanagedTransaction tx = new UnmanagedTransaction(connection, bookmarkHolder, fetchSize, logging);
return tx.beginAsync(bookmarkHolder.getBookmark(), config);
});

Expand Down Expand Up @@ -226,7 +228,8 @@ private CompletionStage<ResultCursorFactory> buildResultCursorFactory(Query quer
try {
ResultCursorFactory factory = connection
.protocol()
.runInAutoCommitTransaction(connection, query, bookmarkHolder, config, fetchSize);
.runInAutoCommitTransaction(
connection, query, bookmarkHolder, config, fetchSize, logging);
return completedFuture(factory);
} catch (Throwable e) {
return Futures.failedFuture(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.function.BiFunction;
import java.util.function.Function;
import org.neo4j.driver.Bookmark;
import org.neo4j.driver.Logging;
import org.neo4j.driver.Query;
import org.neo4j.driver.Session;
import org.neo4j.driver.TransactionConfig;
Expand Down Expand Up @@ -91,34 +92,41 @@ private enum State {
private CompletableFuture<Void> commitFuture;
private CompletableFuture<Void> rollbackFuture;
private Throwable causeOfTermination;
private final Logging logging;

public UnmanagedTransaction(Connection connection, BookmarkHolder bookmarkHolder, long fetchSize) {
this(connection, bookmarkHolder, fetchSize, new ResultCursorsHolder());
public UnmanagedTransaction(Connection connection, BookmarkHolder bookmarkHolder, long fetchSize, Logging logging) {
this(connection, bookmarkHolder, fetchSize, new ResultCursorsHolder(), logging);
}

protected UnmanagedTransaction(
Connection connection, BookmarkHolder bookmarkHolder, long fetchSize, ResultCursorsHolder resultCursors) {
Connection connection,
BookmarkHolder bookmarkHolder,
long fetchSize,
ResultCursorsHolder resultCursors,
Logging logging) {
this.connection = connection;
this.protocol = connection.protocol();
this.bookmarkHolder = bookmarkHolder;
this.resultCursors = resultCursors;
this.fetchSize = fetchSize;
this.logging = logging;
}

public CompletionStage<UnmanagedTransaction> beginAsync(Bookmark initialBookmark, TransactionConfig config) {
return protocol.beginTransaction(connection, initialBookmark, config).handle((ignore, beginError) -> {
if (beginError != null) {
if (beginError instanceof AuthorizationExpiredException) {
connection.terminateAndRelease(AuthorizationExpiredException.DESCRIPTION);
} else if (beginError instanceof ConnectionReadTimeoutException) {
connection.terminateAndRelease(beginError.getMessage());
} else {
connection.release();
}
throw asCompletionException(beginError);
}
return this;
});
return protocol.beginTransaction(connection, initialBookmark, config, logging)
.handle((ignore, beginError) -> {
if (beginError != null) {
if (beginError instanceof AuthorizationExpiredException) {
connection.terminateAndRelease(AuthorizationExpiredException.DESCRIPTION);
} else if (beginError instanceof ConnectionReadTimeoutException) {
connection.terminateAndRelease(beginError.getMessage());
} else {
connection.release();
}
throw asCompletionException(beginError);
}
return this;
});
}

public CompletionStage<Void> closeAsync() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.HashMap;
import org.neo4j.driver.AccessMode;
import org.neo4j.driver.Bookmark;
import org.neo4j.driver.Logging;
import org.neo4j.driver.Query;
import org.neo4j.driver.Value;
import org.neo4j.driver.internal.BookmarkHolder;
Expand All @@ -42,8 +43,8 @@ public class MultiDatabasesRoutingProcedureRunner extends SingleDatabaseRoutingP
static final String MULTI_DB_GET_ROUTING_TABLE =
String.format("CALL dbms.routing.getRoutingTable($%s, $%s)", ROUTING_CONTEXT, DATABASE_NAME);

public MultiDatabasesRoutingProcedureRunner(RoutingContext context) {
super(context);
public MultiDatabasesRoutingProcedureRunner(RoutingContext context, Logging logging) {
super(context, logging);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import org.neo4j.driver.Bookmark;
import org.neo4j.driver.Logging;
import org.neo4j.driver.Query;
import org.neo4j.driver.Record;
import org.neo4j.driver.exceptions.ProtocolException;
Expand All @@ -42,11 +43,11 @@ public class RoutingProcedureClusterCompositionProvider implements ClusterCompos
private final RoutingProcedureRunner multiDatabaseRoutingProcedureRunner;
private final RoutingProcedureRunner routeMessageRoutingProcedureRunner;

public RoutingProcedureClusterCompositionProvider(Clock clock, RoutingContext routingContext) {
public RoutingProcedureClusterCompositionProvider(Clock clock, RoutingContext routingContext, Logging logging) {
this(
clock,
new SingleDatabaseRoutingProcedureRunner(routingContext),
new MultiDatabasesRoutingProcedureRunner(routingContext),
new SingleDatabaseRoutingProcedureRunner(routingContext, logging),
new MultiDatabasesRoutingProcedureRunner(routingContext, logging),
new RouteMessageRoutingProcedureRunner(routingContext));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.CompletionStage;
import org.neo4j.driver.AccessMode;
import org.neo4j.driver.Bookmark;
import org.neo4j.driver.Logging;
import org.neo4j.driver.Query;
import org.neo4j.driver.Record;
import org.neo4j.driver.TransactionConfig;
Expand All @@ -49,9 +50,11 @@ public class SingleDatabaseRoutingProcedureRunner implements RoutingProcedureRun
static final String GET_ROUTING_TABLE = "CALL dbms.cluster.routing.getRoutingTable($" + ROUTING_CONTEXT + ")";

final RoutingContext context;
private Logging logging;

public SingleDatabaseRoutingProcedureRunner(RoutingContext context) {
public SingleDatabaseRoutingProcedureRunner(RoutingContext context, Logging logging) {
this.context = context;
this.logging = logging;
}

@Override
Expand Down Expand Up @@ -87,7 +90,7 @@ CompletionStage<List<Record>> runProcedure(Connection connection, Query procedur
return connection
.protocol()
.runInAutoCommitTransaction(
connection, procedure, bookmarkHolder, TransactionConfig.empty(), UNLIMITED_FETCH_SIZE)
connection, procedure, bookmarkHolder, TransactionConfig.empty(), UNLIMITED_FETCH_SIZE, logging)
.asyncResult()
.thenCompose(ResultCursor::listAsync);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ private static Rediscovery createRediscovery(
Logging logging,
DomainNameResolver domainNameResolver) {
ClusterCompositionProvider clusterCompositionProvider =
new RoutingProcedureClusterCompositionProvider(clock, settings.routingContext());
new RoutingProcedureClusterCompositionProvider(clock, settings.routingContext(), logging);
return new RediscoveryImpl(
initialRouter,
settings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.CompletionStage;
import org.neo4j.driver.AuthToken;
import org.neo4j.driver.Bookmark;
import org.neo4j.driver.Logging;
import org.neo4j.driver.Query;
import org.neo4j.driver.Session;
import org.neo4j.driver.Transaction;
Expand Down Expand Up @@ -77,9 +78,11 @@ void initializeChannel(
* @param connection the connection to use.
* @param bookmark the bookmarks. Never null, should be {@link InternalBookmark#empty()} when absent.
* @param config the transaction configuration. Never null, should be {@link TransactionConfig#empty()} when absent.
* @param logging the driver logging
* @return a completion stage completed when transaction is started or completed exceptionally when there was a failure.
*/
CompletionStage<Void> beginTransaction(Connection connection, Bookmark bookmark, TransactionConfig config);
CompletionStage<Void> beginTransaction(
Connection connection, Bookmark bookmark, TransactionConfig config, Logging logging);

/**
* Commit the unmanaged transaction.
Expand All @@ -105,14 +108,16 @@ void initializeChannel(
* @param bookmarkHolder the bookmarksHolder that keeps track of the current bookmark and can be updated with a new bookmark.
* @param config the transaction config for the implicitly started auto-commit transaction.
* @param fetchSize the record fetch size for PULL message.
* @param logging the driver logging
* @return stage with cursor.
*/
ResultCursorFactory runInAutoCommitTransaction(
Connection connection,
Query query,
BookmarkHolder bookmarkHolder,
TransactionConfig config,
long fetchSize);
long fetchSize,
Logging logging);

/**
* Execute the given query in a running unmanaged transaction, i.e. {@link Transaction#run(Query)}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Objects;
import org.neo4j.driver.AccessMode;
import org.neo4j.driver.Bookmark;
import org.neo4j.driver.Logging;
import org.neo4j.driver.TransactionConfig;
import org.neo4j.driver.Value;
import org.neo4j.driver.internal.DatabaseName;
Expand All @@ -37,8 +38,9 @@ public BeginMessage(
TransactionConfig config,
DatabaseName databaseName,
AccessMode mode,
String impersonatedUser) {
this(bookmark, config.timeout(), config.metadata(), mode, databaseName, impersonatedUser);
String impersonatedUser,
Logging logging) {
this(bookmark, config.timeout(), config.metadata(), mode, databaseName, impersonatedUser, logging);
}

public BeginMessage(
Expand All @@ -47,8 +49,9 @@ public BeginMessage(
Map<String, Value> txMetadata,
AccessMode mode,
DatabaseName databaseName,
String impersonatedUser) {
super(buildMetadata(txTimeout, txMetadata, databaseName, mode, bookmark, impersonatedUser));
String impersonatedUser,
Logging logging) {
super(buildMetadata(txTimeout, txMetadata, databaseName, mode, bookmark, impersonatedUser, logging));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Objects;
import org.neo4j.driver.AccessMode;
import org.neo4j.driver.Bookmark;
import org.neo4j.driver.Logging;
import org.neo4j.driver.Query;
import org.neo4j.driver.TransactionConfig;
import org.neo4j.driver.Value;
Expand All @@ -44,9 +45,10 @@ public static RunWithMetadataMessage autoCommitTxRunMessage(
DatabaseName databaseName,
AccessMode mode,
Bookmark bookmark,
String impersonatedUser) {
String impersonatedUser,
Logging logging) {
return autoCommitTxRunMessage(
query, config.timeout(), config.metadata(), databaseName, mode, bookmark, impersonatedUser);
query, config.timeout(), config.metadata(), databaseName, mode, bookmark, impersonatedUser, logging);
}

public static RunWithMetadataMessage autoCommitTxRunMessage(
Expand All @@ -56,9 +58,10 @@ public static RunWithMetadataMessage autoCommitTxRunMessage(
DatabaseName databaseName,
AccessMode mode,
Bookmark bookmark,
String impersonatedUser) {
String impersonatedUser,
Logging logging) {
Map<String, Value> metadata =
buildMetadata(txTimeout, txMetadata, databaseName, mode, bookmark, impersonatedUser);
buildMetadata(txTimeout, txMetadata, databaseName, mode, bookmark, impersonatedUser, logging);
return new RunWithMetadataMessage(query.text(), query.parameters().asMap(ofValue()), metadata);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.util.Map;
import org.neo4j.driver.AccessMode;
import org.neo4j.driver.Bookmark;
import org.neo4j.driver.Logger;
import org.neo4j.driver.Logging;
import org.neo4j.driver.Value;
import org.neo4j.driver.internal.DatabaseName;
import org.neo4j.driver.internal.util.Iterables;
Expand All @@ -44,8 +46,9 @@ public static Map<String, Value> buildMetadata(
Map<String, Value> txMetadata,
AccessMode mode,
Bookmark bookmark,
String impersonatedUser) {
return buildMetadata(txTimeout, txMetadata, defaultDatabase(), mode, bookmark, impersonatedUser);
String impersonatedUser,
Logging logging) {
return buildMetadata(txTimeout, txMetadata, defaultDatabase(), mode, bookmark, impersonatedUser, logging);
}

public static Map<String, Value> buildMetadata(
Expand All @@ -54,7 +57,8 @@ public static Map<String, Value> buildMetadata(
DatabaseName databaseName,
AccessMode mode,
Bookmark bookmark,
String impersonatedUser) {
String impersonatedUser,
Logging logging) {
boolean bookmarksPresent = bookmark != null && !bookmark.isEmpty();
boolean txTimeoutPresent = txTimeout != null;
boolean txMetadataPresent = txMetadata != null && !txMetadata.isEmpty();
Expand All @@ -77,7 +81,14 @@ public static Map<String, Value> buildMetadata(
result.put(BOOKMARKS_METADATA_KEY, value(bookmark.values()));
}
if (txTimeoutPresent) {
result.put(TX_TIMEOUT_METADATA_KEY, value(txTimeout.toMillis()));
long millis = txTimeout.toMillis();
if (txTimeout.getNano() % 1_000_000 > 0) {
Logger log = logging.getLog(TransactionMetadataBuilder.class);
millis++;
log.info(
"The transaction timeout has been rounded up to next millisecond value since the config had a fractional millisecond value");
}
result.put(TX_TIMEOUT_METADATA_KEY, value(millis));
}
if (txMetadataPresent) {
result.put(TX_METADATA_METADATA_KEY, value(txMetadata));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.CompletionStage;
import org.neo4j.driver.AuthToken;
import org.neo4j.driver.Bookmark;
import org.neo4j.driver.Logging;
import org.neo4j.driver.Query;
import org.neo4j.driver.TransactionConfig;
import org.neo4j.driver.internal.BookmarkHolder;
Expand Down Expand Up @@ -110,7 +111,8 @@ public void prepareToCloseChannel(Channel channel) {
}

@Override
public CompletionStage<Void> beginTransaction(Connection connection, Bookmark bookmark, TransactionConfig config) {
public CompletionStage<Void> beginTransaction(
Connection connection, Bookmark bookmark, TransactionConfig config, Logging logging) {
try {
verifyDatabaseNameBeforeTransaction(connection.databaseName());
} catch (Exception error) {
Expand All @@ -119,7 +121,7 @@ public CompletionStage<Void> beginTransaction(Connection connection, Bookmark bo

CompletableFuture<Void> beginTxFuture = new CompletableFuture<>();
BeginMessage beginMessage = new BeginMessage(
bookmark, config, connection.databaseName(), connection.mode(), connection.impersonatedUser());
bookmark, config, connection.databaseName(), connection.mode(), connection.impersonatedUser(), logging);
connection.writeAndFlush(beginMessage, new BeginTxResponseHandler(beginTxFuture));
return beginTxFuture;
}
Expand All @@ -144,15 +146,17 @@ public ResultCursorFactory runInAutoCommitTransaction(
Query query,
BookmarkHolder bookmarkHolder,
TransactionConfig config,
long fetchSize) {
long fetchSize,
Logging logging) {
verifyDatabaseNameBeforeTransaction(connection.databaseName());
RunWithMetadataMessage runMessage = autoCommitTxRunMessage(
query,
config,
connection.databaseName(),
connection.mode(),
bookmarkHolder.getBookmark(),
connection.impersonatedUser());
connection.impersonatedUser(),
logging);
return buildResultCursorFactory(connection, query, bookmarkHolder, null, runMessage, fetchSize);
}

Expand Down
Loading