Skip to content

Commit fb197d5

Browse files
committed
Update sub millisecond transaction timeout handling
If transaction timeout value has a sub millisecond value, it gets rounded up to next millisecond value.
1 parent bdbf958 commit fb197d5

35 files changed

+914
-300
lines changed

driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,7 @@ protected Rediscovery createRediscovery(
321321
Logging logging,
322322
DomainNameResolver domainNameResolver) {
323323
var clusterCompositionProvider =
324-
new RoutingProcedureClusterCompositionProvider(clock, settings.routingContext());
324+
new RoutingProcedureClusterCompositionProvider(clock, settings.routingContext(), logging);
325325
return new RediscoveryImpl(initialRouter, clusterCompositionProvider, resolver, logging, domainNameResolver);
326326
}
327327

driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ public class NetworkSession {
6767
private final NetworkSessionConnectionContext connectionContext;
6868
private final AccessMode mode;
6969
private final RetryLogic retryLogic;
70+
private final Logging logging;
7071
protected final Logger log;
7172

7273
private final long fetchSize;
@@ -97,6 +98,7 @@ public NetworkSession(
9798
this.connectionProvider = connectionProvider;
9899
this.mode = mode;
99100
this.retryLogic = retryLogic;
101+
this.logging = logging;
100102
this.log = new PrefixedLogger("[" + hashCode() + "]", logging.getLog(getClass()));
101103
var databaseNameFuture = databaseName
102104
.databaseName()
@@ -150,7 +152,7 @@ public CompletionStage<UnmanagedTransaction> beginTransactionAsync(
150152
ImpersonationUtil.ensureImpersonationSupport(connection, connection.impersonatedUser()))
151153
.thenCompose(connection -> {
152154
var tx = new UnmanagedTransaction(
153-
connection, this::handleNewBookmark, fetchSize, notificationConfig);
155+
connection, this::handleNewBookmark, fetchSize, notificationConfig, logging);
154156
return tx.beginAsync(determineBookmarks(true), config, txType);
155157
});
156158

@@ -268,7 +270,8 @@ private CompletionStage<ResultCursorFactory> buildResultCursorFactory(Query quer
268270
this::handleNewBookmark,
269271
config,
270272
fetchSize,
271-
notificationConfig);
273+
notificationConfig,
274+
logging);
272275
return completedFuture(factory);
273276
} catch (Throwable e) {
274277
return Futures.failedFuture(e);

driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.function.Consumer;
3838
import java.util.function.Function;
3939
import org.neo4j.driver.Bookmark;
40+
import org.neo4j.driver.Logging;
4041
import org.neo4j.driver.NotificationConfig;
4142
import org.neo4j.driver.Query;
4243
import org.neo4j.driver.TransactionConfig;
@@ -98,34 +99,38 @@ private enum State {
9899
private Throwable causeOfTermination;
99100
private CompletionStage<Void> terminationStage;
100101
private final NotificationConfig notificationConfig;
102+
private final Logging logging;
101103

102104
public UnmanagedTransaction(
103105
Connection connection,
104106
Consumer<DatabaseBookmark> bookmarkConsumer,
105107
long fetchSize,
106-
NotificationConfig notificationConfig) {
107-
this(connection, bookmarkConsumer, fetchSize, new ResultCursorsHolder(), notificationConfig);
108+
NotificationConfig notificationConfig,
109+
Logging logging) {
110+
this(connection, bookmarkConsumer, fetchSize, new ResultCursorsHolder(), notificationConfig, logging);
108111
}
109112

110113
protected UnmanagedTransaction(
111114
Connection connection,
112115
Consumer<DatabaseBookmark> bookmarkConsumer,
113116
long fetchSize,
114117
ResultCursorsHolder resultCursors,
115-
NotificationConfig notificationConfig) {
118+
NotificationConfig notificationConfig,
119+
Logging logging) {
116120
this.connection = connection;
117121
this.protocol = connection.protocol();
118122
this.bookmarkConsumer = bookmarkConsumer;
119123
this.resultCursors = resultCursors;
120124
this.fetchSize = fetchSize;
121125
this.notificationConfig = notificationConfig;
126+
this.logging = logging;
122127

123128
connection.bindTerminationAwareStateLockingExecutor(this);
124129
}
125130

126131
public CompletionStage<UnmanagedTransaction> beginAsync(
127132
Set<Bookmark> initialBookmarks, TransactionConfig config, String txType) {
128-
return protocol.beginTransaction(connection, initialBookmarks, config, txType, notificationConfig)
133+
return protocol.beginTransaction(connection, initialBookmarks, config, txType, notificationConfig, logging)
129134
.handle((ignore, beginError) -> {
130135
if (beginError != null) {
131136
if (beginError instanceof AuthorizationExpiredException) {

driver/src/main/java/org/neo4j/driver/internal/cluster/MultiDatabasesRoutingProcedureRunner.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.Set;
2626
import org.neo4j.driver.AccessMode;
2727
import org.neo4j.driver.Bookmark;
28+
import org.neo4j.driver.Logging;
2829
import org.neo4j.driver.Query;
2930
import org.neo4j.driver.Value;
3031
import org.neo4j.driver.internal.DatabaseName;
@@ -41,8 +42,8 @@ public class MultiDatabasesRoutingProcedureRunner extends SingleDatabaseRoutingP
4142
static final String MULTI_DB_GET_ROUTING_TABLE =
4243
String.format("CALL dbms.routing.getRoutingTable($%s, $%s)", ROUTING_CONTEXT, DATABASE_NAME);
4344

44-
public MultiDatabasesRoutingProcedureRunner(RoutingContext context) {
45-
super(context);
45+
public MultiDatabasesRoutingProcedureRunner(RoutingContext context, Logging logging) {
46+
super(context, logging);
4647
}
4748

4849
@Override

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureClusterCompositionProvider.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.concurrent.CompletionException;
2828
import java.util.concurrent.CompletionStage;
2929
import org.neo4j.driver.Bookmark;
30+
import org.neo4j.driver.Logging;
3031
import org.neo4j.driver.exceptions.ProtocolException;
3132
import org.neo4j.driver.exceptions.value.ValueException;
3233
import org.neo4j.driver.internal.DatabaseName;
@@ -40,11 +41,11 @@ public class RoutingProcedureClusterCompositionProvider implements ClusterCompos
4041
private final RoutingProcedureRunner multiDatabaseRoutingProcedureRunner;
4142
private final RoutingProcedureRunner routeMessageRoutingProcedureRunner;
4243

43-
public RoutingProcedureClusterCompositionProvider(Clock clock, RoutingContext routingContext) {
44+
public RoutingProcedureClusterCompositionProvider(Clock clock, RoutingContext routingContext, Logging logging) {
4445
this(
4546
clock,
46-
new SingleDatabaseRoutingProcedureRunner(routingContext),
47-
new MultiDatabasesRoutingProcedureRunner(routingContext),
47+
new SingleDatabaseRoutingProcedureRunner(routingContext, logging),
48+
new MultiDatabasesRoutingProcedureRunner(routingContext, logging),
4849
new RouteMessageRoutingProcedureRunner(routingContext));
4950
}
5051

driver/src/main/java/org/neo4j/driver/internal/cluster/SingleDatabaseRoutingProcedureRunner.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.concurrent.CompletionStage;
3030
import org.neo4j.driver.AccessMode;
3131
import org.neo4j.driver.Bookmark;
32+
import org.neo4j.driver.Logging;
3233
import org.neo4j.driver.Query;
3334
import org.neo4j.driver.Record;
3435
import org.neo4j.driver.TransactionConfig;
@@ -50,9 +51,11 @@ public class SingleDatabaseRoutingProcedureRunner implements RoutingProcedureRun
5051
static final String GET_ROUTING_TABLE = "CALL dbms.cluster.routing.getRoutingTable($" + ROUTING_CONTEXT + ")";
5152

5253
final RoutingContext context;
54+
private Logging logging;
5355

54-
public SingleDatabaseRoutingProcedureRunner(RoutingContext context) {
56+
public SingleDatabaseRoutingProcedureRunner(RoutingContext context, Logging logging) {
5557
this.context = context;
58+
this.logging = logging;
5659
}
5760

5861
@Override
@@ -93,7 +96,8 @@ CompletionStage<List<Record>> runProcedure(Connection connection, Query procedur
9396
(ignored) -> {},
9497
TransactionConfig.empty(),
9598
UNLIMITED_FETCH_SIZE,
96-
null)
99+
null,
100+
logging)
97101
.asyncResult()
98102
.thenCompose(ResultCursor::listAsync);
99103
}

driver/src/main/java/org/neo4j/driver/internal/messaging/BoltProtocol.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.function.Consumer;
2929
import org.neo4j.driver.AuthToken;
3030
import org.neo4j.driver.Bookmark;
31+
import org.neo4j.driver.Logging;
3132
import org.neo4j.driver.NotificationConfig;
3233
import org.neo4j.driver.Query;
3334
import org.neo4j.driver.Session;
@@ -93,14 +94,16 @@ void initializeChannel(
9394
* @param config the transaction configuration. Never null, should be {@link TransactionConfig#empty()} when absent.
9495
* @param txType the Kernel transaction type
9596
* @param notificationConfig the notification configuration
97+
* @param logging the driver logging
9698
* @return a completion stage completed when transaction is started or completed exceptionally when there was a failure.
9799
*/
98100
CompletionStage<Void> beginTransaction(
99101
Connection connection,
100102
Set<Bookmark> bookmarks,
101103
TransactionConfig config,
102104
String txType,
103-
NotificationConfig notificationConfig);
105+
NotificationConfig notificationConfig,
106+
Logging logging);
104107

105108
/**
106109
* Commit the unmanaged transaction.
@@ -127,6 +130,7 @@ CompletionStage<Void> beginTransaction(
127130
* @param config the transaction config for the implicitly started auto-commit transaction.
128131
* @param fetchSize the record fetch size for PULL message.
129132
* @param notificationConfig the notification configuration
133+
* @param logging the driver logging
130134
* @return stage with cursor.
131135
*/
132136
ResultCursorFactory runInAutoCommitTransaction(
@@ -136,7 +140,8 @@ ResultCursorFactory runInAutoCommitTransaction(
136140
Consumer<DatabaseBookmark> bookmarkConsumer,
137141
TransactionConfig config,
138142
long fetchSize,
139-
NotificationConfig notificationConfig);
143+
NotificationConfig notificationConfig,
144+
Logging logging);
140145

141146
/**
142147
* Execute the given query in a running unmanaged transaction, i.e. {@link Transaction#run(Query)}.

driver/src/main/java/org/neo4j/driver/internal/messaging/request/BeginMessage.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.Set;
2727
import org.neo4j.driver.AccessMode;
2828
import org.neo4j.driver.Bookmark;
29+
import org.neo4j.driver.Logging;
2930
import org.neo4j.driver.NotificationConfig;
3031
import org.neo4j.driver.TransactionConfig;
3132
import org.neo4j.driver.Value;
@@ -41,7 +42,8 @@ public BeginMessage(
4142
AccessMode mode,
4243
String impersonatedUser,
4344
String txType,
44-
NotificationConfig notificationConfig) {
45+
NotificationConfig notificationConfig,
46+
Logging logging) {
4547
this(
4648
bookmarks,
4749
config.timeout(),
@@ -50,7 +52,8 @@ public BeginMessage(
5052
databaseName,
5153
impersonatedUser,
5254
txType,
53-
notificationConfig);
55+
notificationConfig,
56+
logging);
5457
}
5558

5659
public BeginMessage(
@@ -61,9 +64,18 @@ public BeginMessage(
6164
DatabaseName databaseName,
6265
String impersonatedUser,
6366
String txType,
64-
NotificationConfig notificationConfig) {
67+
NotificationConfig notificationConfig,
68+
Logging logging) {
6569
super(buildMetadata(
66-
txTimeout, txMetadata, databaseName, mode, bookmarks, impersonatedUser, txType, notificationConfig));
70+
txTimeout,
71+
txMetadata,
72+
databaseName,
73+
mode,
74+
bookmarks,
75+
impersonatedUser,
76+
txType,
77+
notificationConfig,
78+
logging));
6779
}
6880

6981
@Override

driver/src/main/java/org/neo4j/driver/internal/messaging/request/RunWithMetadataMessage.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.Set;
2929
import org.neo4j.driver.AccessMode;
3030
import org.neo4j.driver.Bookmark;
31+
import org.neo4j.driver.Logging;
3132
import org.neo4j.driver.NotificationConfig;
3233
import org.neo4j.driver.Query;
3334
import org.neo4j.driver.TransactionConfig;
@@ -47,7 +48,8 @@ public static RunWithMetadataMessage autoCommitTxRunMessage(
4748
AccessMode mode,
4849
Set<Bookmark> bookmarks,
4950
String impersonatedUser,
50-
NotificationConfig notificationConfig) {
51+
NotificationConfig notificationConfig,
52+
Logging logging) {
5153
return autoCommitTxRunMessage(
5254
query,
5355
config.timeout(),
@@ -56,7 +58,8 @@ public static RunWithMetadataMessage autoCommitTxRunMessage(
5658
mode,
5759
bookmarks,
5860
impersonatedUser,
59-
notificationConfig);
61+
notificationConfig,
62+
logging);
6063
}
6164

6265
public static RunWithMetadataMessage autoCommitTxRunMessage(
@@ -67,9 +70,18 @@ public static RunWithMetadataMessage autoCommitTxRunMessage(
6770
AccessMode mode,
6871
Set<Bookmark> bookmarks,
6972
String impersonatedUser,
70-
NotificationConfig notificationConfig) {
73+
NotificationConfig notificationConfig,
74+
Logging logging) {
7175
var metadata = buildMetadata(
72-
txTimeout, txMetadata, databaseName, mode, bookmarks, impersonatedUser, null, notificationConfig);
76+
txTimeout,
77+
txMetadata,
78+
databaseName,
79+
mode,
80+
bookmarks,
81+
impersonatedUser,
82+
null,
83+
notificationConfig,
84+
logging);
7385
return new RunWithMetadataMessage(query.text(), query.parameters().asMap(ofValue()), metadata);
7486
}
7587

driver/src/main/java/org/neo4j/driver/internal/messaging/request/TransactionMetadataBuilder.java

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,13 @@
2020

2121
import static java.util.Collections.emptyMap;
2222
import static org.neo4j.driver.Values.value;
23-
import static org.neo4j.driver.internal.DatabaseNameUtil.defaultDatabase;
2423

2524
import java.time.Duration;
2625
import java.util.Map;
2726
import java.util.Set;
2827
import org.neo4j.driver.AccessMode;
2928
import org.neo4j.driver.Bookmark;
29+
import org.neo4j.driver.Logging;
3030
import org.neo4j.driver.NotificationConfig;
3131
import org.neo4j.driver.Value;
3232
import org.neo4j.driver.internal.DatabaseName;
@@ -42,16 +42,6 @@ public class TransactionMetadataBuilder {
4242
private static final String IMPERSONATED_USER_KEY = "imp_user";
4343
private static final String TX_TYPE_KEY = "tx_type";
4444

45-
public static Map<String, Value> buildMetadata(
46-
Duration txTimeout,
47-
Map<String, Value> txMetadata,
48-
AccessMode mode,
49-
Set<Bookmark> bookmarks,
50-
String impersonatedUser,
51-
String txType) {
52-
return buildMetadata(txTimeout, txMetadata, defaultDatabase(), mode, bookmarks, impersonatedUser, txType, null);
53-
}
54-
5545
public static Map<String, Value> buildMetadata(
5646
Duration txTimeout,
5747
Map<String, Value> txMetadata,
@@ -60,7 +50,8 @@ public static Map<String, Value> buildMetadata(
6050
Set<Bookmark> bookmarks,
6151
String impersonatedUser,
6252
String txType,
63-
NotificationConfig notificationConfig) {
53+
NotificationConfig notificationConfig,
54+
Logging logging) {
6455
var bookmarksPresent = !bookmarks.isEmpty();
6556
var txTimeoutPresent = txTimeout != null;
6657
var txMetadataPresent = txMetadata != null && !txMetadata.isEmpty();
@@ -87,7 +78,14 @@ public static Map<String, Value> buildMetadata(
8778
result.put(BOOKMARKS_METADATA_KEY, value(bookmarks.stream().map(Bookmark::value)));
8879
}
8980
if (txTimeoutPresent) {
90-
result.put(TX_TIMEOUT_METADATA_KEY, value(txTimeout.toMillis()));
81+
var millis = txTimeout.toMillis();
82+
if (txTimeout.toNanosPart() % 1_000_000 > 0) {
83+
var log = logging.getLog(TransactionMetadataBuilder.class);
84+
millis++;
85+
log.info(
86+
"The transaction timeout has been rounded up to next millisecond value since the config had a fractional millisecond value");
87+
}
88+
result.put(TX_TIMEOUT_METADATA_KEY, value(millis));
9189
}
9290
if (txMetadataPresent) {
9391
result.put(TX_METADATA_METADATA_KEY, value(txMetadata));

0 commit comments

Comments
 (0)