Skip to content

Commit 4672b38

Browse files
committed
Update sub millisecond transaction timeout handling (neo4j#1451)
* Update sub millisecond transaction timeout handling If transaction timeout value has a sub millisecond value, it gets rounded up to next millisecond value. * Add additional test
1 parent ff21d99 commit 4672b38

29 files changed

+450
-218
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public class NetworkSession {
5252
private final NetworkSessionConnectionContext connectionContext;
5353
private final AccessMode mode;
5454
private final RetryLogic retryLogic;
55+
private final Logging logging;
5556
protected final Logger log;
5657

5758
private final BookmarkHolder bookmarkHolder;
@@ -74,6 +75,7 @@ public NetworkSession(
7475
this.connectionProvider = connectionProvider;
7576
this.mode = mode;
7677
this.retryLogic = retryLogic;
78+
this.logging = logging;
7779
this.log = new PrefixedLogger("[" + hashCode() + "]", logging.getLog(getClass()));
7880
this.bookmarkHolder = bookmarkHolder;
7981
CompletableFuture<DatabaseName> databaseNameFuture = databaseName
@@ -116,7 +118,7 @@ public CompletionStage<UnmanagedTransaction> beginTransactionAsync(AccessMode mo
116118
.thenApply(connection ->
117119
ImpersonationUtil.ensureImpersonationSupport(connection, connection.impersonatedUser()))
118120
.thenCompose(connection -> {
119-
UnmanagedTransaction tx = new UnmanagedTransaction(connection, bookmarkHolder, fetchSize);
121+
UnmanagedTransaction tx = new UnmanagedTransaction(connection, bookmarkHolder, fetchSize, logging);
120122
return tx.beginAsync(bookmarkHolder.getBookmark(), config);
121123
});
122124

@@ -226,7 +228,8 @@ private CompletionStage<ResultCursorFactory> buildResultCursorFactory(Query quer
226228
try {
227229
ResultCursorFactory factory = connection
228230
.protocol()
229-
.runInAutoCommitTransaction(connection, query, bookmarkHolder, config, fetchSize);
231+
.runInAutoCommitTransaction(
232+
connection, query, bookmarkHolder, config, fetchSize, logging);
230233
return completedFuture(factory);
231234
} catch (Throwable e) {
232235
return Futures.failedFuture(e);

driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.function.BiFunction;
3636
import java.util.function.Function;
3737
import org.neo4j.driver.Bookmark;
38+
import org.neo4j.driver.Logging;
3839
import org.neo4j.driver.Query;
3940
import org.neo4j.driver.Session;
4041
import org.neo4j.driver.TransactionConfig;
@@ -91,34 +92,41 @@ private enum State {
9192
private CompletableFuture<Void> commitFuture;
9293
private CompletableFuture<Void> rollbackFuture;
9394
private Throwable causeOfTermination;
95+
private final Logging logging;
9496

95-
public UnmanagedTransaction(Connection connection, BookmarkHolder bookmarkHolder, long fetchSize) {
96-
this(connection, bookmarkHolder, fetchSize, new ResultCursorsHolder());
97+
public UnmanagedTransaction(Connection connection, BookmarkHolder bookmarkHolder, long fetchSize, Logging logging) {
98+
this(connection, bookmarkHolder, fetchSize, new ResultCursorsHolder(), logging);
9799
}
98100

99101
protected UnmanagedTransaction(
100-
Connection connection, BookmarkHolder bookmarkHolder, long fetchSize, ResultCursorsHolder resultCursors) {
102+
Connection connection,
103+
BookmarkHolder bookmarkHolder,
104+
long fetchSize,
105+
ResultCursorsHolder resultCursors,
106+
Logging logging) {
101107
this.connection = connection;
102108
this.protocol = connection.protocol();
103109
this.bookmarkHolder = bookmarkHolder;
104110
this.resultCursors = resultCursors;
105111
this.fetchSize = fetchSize;
112+
this.logging = logging;
106113
}
107114

108115
public CompletionStage<UnmanagedTransaction> beginAsync(Bookmark initialBookmark, TransactionConfig config) {
109-
return protocol.beginTransaction(connection, initialBookmark, config).handle((ignore, beginError) -> {
110-
if (beginError != null) {
111-
if (beginError instanceof AuthorizationExpiredException) {
112-
connection.terminateAndRelease(AuthorizationExpiredException.DESCRIPTION);
113-
} else if (beginError instanceof ConnectionReadTimeoutException) {
114-
connection.terminateAndRelease(beginError.getMessage());
115-
} else {
116-
connection.release();
117-
}
118-
throw asCompletionException(beginError);
119-
}
120-
return this;
121-
});
116+
return protocol.beginTransaction(connection, initialBookmark, config, logging)
117+
.handle((ignore, beginError) -> {
118+
if (beginError != null) {
119+
if (beginError instanceof AuthorizationExpiredException) {
120+
connection.terminateAndRelease(AuthorizationExpiredException.DESCRIPTION);
121+
} else if (beginError instanceof ConnectionReadTimeoutException) {
122+
connection.terminateAndRelease(beginError.getMessage());
123+
} else {
124+
connection.release();
125+
}
126+
throw asCompletionException(beginError);
127+
}
128+
return this;
129+
});
122130
}
123131

124132
public CompletionStage<Void> closeAsync() {

driver/src/main/java/org/neo4j/driver/internal/cluster/MultiDatabasesRoutingProcedureRunner.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.HashMap;
2525
import org.neo4j.driver.AccessMode;
2626
import org.neo4j.driver.Bookmark;
27+
import org.neo4j.driver.Logging;
2728
import org.neo4j.driver.Query;
2829
import org.neo4j.driver.Value;
2930
import org.neo4j.driver.internal.BookmarkHolder;
@@ -42,8 +43,8 @@ public class MultiDatabasesRoutingProcedureRunner extends SingleDatabaseRoutingP
4243
static final String MULTI_DB_GET_ROUTING_TABLE =
4344
String.format("CALL dbms.routing.getRoutingTable($%s, $%s)", ROUTING_CONTEXT, DATABASE_NAME);
4445

45-
public MultiDatabasesRoutingProcedureRunner(RoutingContext context) {
46-
super(context);
46+
public MultiDatabasesRoutingProcedureRunner(RoutingContext context, Logging logging) {
47+
super(context, logging);
4748
}
4849

4950
@Override

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureClusterCompositionProvider.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.concurrent.CompletionException;
2727
import java.util.concurrent.CompletionStage;
2828
import org.neo4j.driver.Bookmark;
29+
import org.neo4j.driver.Logging;
2930
import org.neo4j.driver.Query;
3031
import org.neo4j.driver.Record;
3132
import org.neo4j.driver.exceptions.ProtocolException;
@@ -42,11 +43,11 @@ public class RoutingProcedureClusterCompositionProvider implements ClusterCompos
4243
private final RoutingProcedureRunner multiDatabaseRoutingProcedureRunner;
4344
private final RoutingProcedureRunner routeMessageRoutingProcedureRunner;
4445

45-
public RoutingProcedureClusterCompositionProvider(Clock clock, RoutingContext routingContext) {
46+
public RoutingProcedureClusterCompositionProvider(Clock clock, RoutingContext routingContext, Logging logging) {
4647
this(
4748
clock,
48-
new SingleDatabaseRoutingProcedureRunner(routingContext),
49-
new MultiDatabasesRoutingProcedureRunner(routingContext),
49+
new SingleDatabaseRoutingProcedureRunner(routingContext, logging),
50+
new MultiDatabasesRoutingProcedureRunner(routingContext, logging),
5051
new RouteMessageRoutingProcedureRunner(routingContext));
5152
}
5253

driver/src/main/java/org/neo4j/driver/internal/cluster/SingleDatabaseRoutingProcedureRunner.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.concurrent.CompletionStage;
2828
import org.neo4j.driver.AccessMode;
2929
import org.neo4j.driver.Bookmark;
30+
import org.neo4j.driver.Logging;
3031
import org.neo4j.driver.Query;
3132
import org.neo4j.driver.Record;
3233
import org.neo4j.driver.TransactionConfig;
@@ -49,9 +50,11 @@ public class SingleDatabaseRoutingProcedureRunner implements RoutingProcedureRun
4950
static final String GET_ROUTING_TABLE = "CALL dbms.cluster.routing.getRoutingTable($" + ROUTING_CONTEXT + ")";
5051

5152
final RoutingContext context;
53+
private Logging logging;
5254

53-
public SingleDatabaseRoutingProcedureRunner(RoutingContext context) {
55+
public SingleDatabaseRoutingProcedureRunner(RoutingContext context, Logging logging) {
5456
this.context = context;
57+
this.logging = logging;
5558
}
5659

5760
@Override
@@ -87,7 +90,7 @@ CompletionStage<List<Record>> runProcedure(Connection connection, Query procedur
8790
return connection
8891
.protocol()
8992
.runInAutoCommitTransaction(
90-
connection, procedure, bookmarkHolder, TransactionConfig.empty(), UNLIMITED_FETCH_SIZE)
93+
connection, procedure, bookmarkHolder, TransactionConfig.empty(), UNLIMITED_FETCH_SIZE, logging)
9194
.asyncResult()
9295
.thenCompose(ResultCursor::listAsync);
9396
}

driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ private static Rediscovery createRediscovery(
297297
Logging logging,
298298
DomainNameResolver domainNameResolver) {
299299
ClusterCompositionProvider clusterCompositionProvider =
300-
new RoutingProcedureClusterCompositionProvider(clock, settings.routingContext());
300+
new RoutingProcedureClusterCompositionProvider(clock, settings.routingContext(), logging);
301301
return new RediscoveryImpl(
302302
initialRouter,
303303
settings,

driver/src/main/java/org/neo4j/driver/internal/messaging/BoltProtocol.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.concurrent.CompletionStage;
2626
import org.neo4j.driver.AuthToken;
2727
import org.neo4j.driver.Bookmark;
28+
import org.neo4j.driver.Logging;
2829
import org.neo4j.driver.Query;
2930
import org.neo4j.driver.Session;
3031
import org.neo4j.driver.Transaction;
@@ -77,9 +78,11 @@ void initializeChannel(
7778
* @param connection the connection to use.
7879
* @param bookmark the bookmarks. Never null, should be {@link InternalBookmark#empty()} when absent.
7980
* @param config the transaction configuration. Never null, should be {@link TransactionConfig#empty()} when absent.
81+
* @param logging the driver logging
8082
* @return a completion stage completed when transaction is started or completed exceptionally when there was a failure.
8183
*/
82-
CompletionStage<Void> beginTransaction(Connection connection, Bookmark bookmark, TransactionConfig config);
84+
CompletionStage<Void> beginTransaction(
85+
Connection connection, Bookmark bookmark, TransactionConfig config, Logging logging);
8386

8487
/**
8588
* Commit the unmanaged transaction.
@@ -105,14 +108,16 @@ void initializeChannel(
105108
* @param bookmarkHolder the bookmarksHolder that keeps track of the current bookmark and can be updated with a new bookmark.
106109
* @param config the transaction config for the implicitly started auto-commit transaction.
107110
* @param fetchSize the record fetch size for PULL message.
111+
* @param logging the driver logging
108112
* @return stage with cursor.
109113
*/
110114
ResultCursorFactory runInAutoCommitTransaction(
111115
Connection connection,
112116
Query query,
113117
BookmarkHolder bookmarkHolder,
114118
TransactionConfig config,
115-
long fetchSize);
119+
long fetchSize,
120+
Logging logging);
116121

117122
/**
118123
* Execute the given query in a running unmanaged transaction, i.e. {@link Transaction#run(Query)}.

driver/src/main/java/org/neo4j/driver/internal/messaging/request/BeginMessage.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.Objects;
2626
import org.neo4j.driver.AccessMode;
2727
import org.neo4j.driver.Bookmark;
28+
import org.neo4j.driver.Logging;
2829
import org.neo4j.driver.TransactionConfig;
2930
import org.neo4j.driver.Value;
3031
import org.neo4j.driver.internal.DatabaseName;
@@ -37,8 +38,9 @@ public BeginMessage(
3738
TransactionConfig config,
3839
DatabaseName databaseName,
3940
AccessMode mode,
40-
String impersonatedUser) {
41-
this(bookmark, config.timeout(), config.metadata(), mode, databaseName, impersonatedUser);
41+
String impersonatedUser,
42+
Logging logging) {
43+
this(bookmark, config.timeout(), config.metadata(), mode, databaseName, impersonatedUser, logging);
4244
}
4345

4446
public BeginMessage(
@@ -47,8 +49,9 @@ public BeginMessage(
4749
Map<String, Value> txMetadata,
4850
AccessMode mode,
4951
DatabaseName databaseName,
50-
String impersonatedUser) {
51-
super(buildMetadata(txTimeout, txMetadata, databaseName, mode, bookmark, impersonatedUser));
52+
String impersonatedUser,
53+
Logging logging) {
54+
super(buildMetadata(txTimeout, txMetadata, databaseName, mode, bookmark, impersonatedUser, logging));
5255
}
5356

5457
@Override

driver/src/main/java/org/neo4j/driver/internal/messaging/request/RunWithMetadataMessage.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.Objects;
2828
import org.neo4j.driver.AccessMode;
2929
import org.neo4j.driver.Bookmark;
30+
import org.neo4j.driver.Logging;
3031
import org.neo4j.driver.Query;
3132
import org.neo4j.driver.TransactionConfig;
3233
import org.neo4j.driver.Value;
@@ -44,9 +45,10 @@ public static RunWithMetadataMessage autoCommitTxRunMessage(
4445
DatabaseName databaseName,
4546
AccessMode mode,
4647
Bookmark bookmark,
47-
String impersonatedUser) {
48+
String impersonatedUser,
49+
Logging logging) {
4850
return autoCommitTxRunMessage(
49-
query, config.timeout(), config.metadata(), databaseName, mode, bookmark, impersonatedUser);
51+
query, config.timeout(), config.metadata(), databaseName, mode, bookmark, impersonatedUser, logging);
5052
}
5153

5254
public static RunWithMetadataMessage autoCommitTxRunMessage(
@@ -56,9 +58,10 @@ public static RunWithMetadataMessage autoCommitTxRunMessage(
5658
DatabaseName databaseName,
5759
AccessMode mode,
5860
Bookmark bookmark,
59-
String impersonatedUser) {
61+
String impersonatedUser,
62+
Logging logging) {
6063
Map<String, Value> metadata =
61-
buildMetadata(txTimeout, txMetadata, databaseName, mode, bookmark, impersonatedUser);
64+
buildMetadata(txTimeout, txMetadata, databaseName, mode, bookmark, impersonatedUser, logging);
6265
return new RunWithMetadataMessage(query.text(), query.parameters().asMap(ofValue()), metadata);
6366
}
6467

driver/src/main/java/org/neo4j/driver/internal/messaging/request/TransactionMetadataBuilder.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import java.util.Map;
2727
import org.neo4j.driver.AccessMode;
2828
import org.neo4j.driver.Bookmark;
29+
import org.neo4j.driver.Logger;
30+
import org.neo4j.driver.Logging;
2931
import org.neo4j.driver.Value;
3032
import org.neo4j.driver.internal.DatabaseName;
3133
import org.neo4j.driver.internal.util.Iterables;
@@ -44,8 +46,9 @@ public static Map<String, Value> buildMetadata(
4446
Map<String, Value> txMetadata,
4547
AccessMode mode,
4648
Bookmark bookmark,
47-
String impersonatedUser) {
48-
return buildMetadata(txTimeout, txMetadata, defaultDatabase(), mode, bookmark, impersonatedUser);
49+
String impersonatedUser,
50+
Logging logging) {
51+
return buildMetadata(txTimeout, txMetadata, defaultDatabase(), mode, bookmark, impersonatedUser, logging);
4952
}
5053

5154
public static Map<String, Value> buildMetadata(
@@ -54,7 +57,8 @@ public static Map<String, Value> buildMetadata(
5457
DatabaseName databaseName,
5558
AccessMode mode,
5659
Bookmark bookmark,
57-
String impersonatedUser) {
60+
String impersonatedUser,
61+
Logging logging) {
5862
boolean bookmarksPresent = bookmark != null && !bookmark.isEmpty();
5963
boolean txTimeoutPresent = txTimeout != null;
6064
boolean txMetadataPresent = txMetadata != null && !txMetadata.isEmpty();
@@ -77,7 +81,14 @@ public static Map<String, Value> buildMetadata(
7781
result.put(BOOKMARKS_METADATA_KEY, value(bookmark.values()));
7882
}
7983
if (txTimeoutPresent) {
80-
result.put(TX_TIMEOUT_METADATA_KEY, value(txTimeout.toMillis()));
84+
long millis = txTimeout.toMillis();
85+
if (txTimeout.getNano() % 1_000_000 > 0) {
86+
Logger log = logging.getLog(TransactionMetadataBuilder.class);
87+
millis++;
88+
log.info(
89+
"The transaction timeout has been rounded up to next millisecond value since the config had a fractional millisecond value");
90+
}
91+
result.put(TX_TIMEOUT_METADATA_KEY, value(millis));
8192
}
8293
if (txMetadataPresent) {
8394
result.put(TX_METADATA_METADATA_KEY, value(txMetadata));

driver/src/main/java/org/neo4j/driver/internal/messaging/v3/BoltProtocolV3.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.concurrent.CompletionStage;
3333
import org.neo4j.driver.AuthToken;
3434
import org.neo4j.driver.Bookmark;
35+
import org.neo4j.driver.Logging;
3536
import org.neo4j.driver.Query;
3637
import org.neo4j.driver.TransactionConfig;
3738
import org.neo4j.driver.internal.BookmarkHolder;
@@ -110,7 +111,8 @@ public void prepareToCloseChannel(Channel channel) {
110111
}
111112

112113
@Override
113-
public CompletionStage<Void> beginTransaction(Connection connection, Bookmark bookmark, TransactionConfig config) {
114+
public CompletionStage<Void> beginTransaction(
115+
Connection connection, Bookmark bookmark, TransactionConfig config, Logging logging) {
114116
try {
115117
verifyDatabaseNameBeforeTransaction(connection.databaseName());
116118
} catch (Exception error) {
@@ -119,7 +121,7 @@ public CompletionStage<Void> beginTransaction(Connection connection, Bookmark bo
119121

120122
CompletableFuture<Void> beginTxFuture = new CompletableFuture<>();
121123
BeginMessage beginMessage = new BeginMessage(
122-
bookmark, config, connection.databaseName(), connection.mode(), connection.impersonatedUser());
124+
bookmark, config, connection.databaseName(), connection.mode(), connection.impersonatedUser(), logging);
123125
connection.writeAndFlush(beginMessage, new BeginTxResponseHandler(beginTxFuture));
124126
return beginTxFuture;
125127
}
@@ -144,15 +146,17 @@ public ResultCursorFactory runInAutoCommitTransaction(
144146
Query query,
145147
BookmarkHolder bookmarkHolder,
146148
TransactionConfig config,
147-
long fetchSize) {
149+
long fetchSize,
150+
Logging logging) {
148151
verifyDatabaseNameBeforeTransaction(connection.databaseName());
149152
RunWithMetadataMessage runMessage = autoCommitTxRunMessage(
150153
query,
151154
config,
152155
connection.databaseName(),
153156
connection.mode(),
154157
bookmarkHolder.getBookmark(),
155-
connection.impersonatedUser());
158+
connection.impersonatedUser(),
159+
logging);
156160
return buildResultCursorFactory(connection, query, bookmarkHolder, null, runMessage, fetchSize);
157161
}
158162

0 commit comments

Comments
 (0)