Skip to content

Commit 0c92a88

Browse files
committed
Introduce BEGIN message pipelining in ExecutableQuery
This is a communication optimization.
1 parent c3b10e3 commit 0c92a88

22 files changed

+183
-117
lines changed

driver/src/main/java/org/neo4j/driver/internal/InternalExecutableQuery.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import java.util.Map;
2424
import java.util.stream.Collector;
25+
import org.neo4j.driver.AccessMode;
2526
import org.neo4j.driver.Driver;
2627
import org.neo4j.driver.ExecutableQuery;
2728
import org.neo4j.driver.Query;
@@ -30,6 +31,7 @@
3031
import org.neo4j.driver.RoutingControl;
3132
import org.neo4j.driver.SessionConfig;
3233
import org.neo4j.driver.TransactionCallback;
34+
import org.neo4j.driver.TransactionConfig;
3335

3436
public class InternalExecutableQuery implements ExecutableQuery {
3537
private final Driver driver;
@@ -67,7 +69,7 @@ public <A, R, T> T execute(Collector<Record, A, R> recordCollector, ResultFinish
6769
var supplier = recordCollector.supplier();
6870
var accumulator = recordCollector.accumulator();
6971
var finisher = recordCollector.finisher();
70-
try (var session = driver.session(sessionConfigBuilder.build())) {
72+
try (var session = (InternalSession) driver.session(sessionConfigBuilder.build())) {
7173
TransactionCallback<T> txCallback = tx -> {
7274
var result = tx.run(query);
7375
var container = supplier.get();
@@ -78,9 +80,8 @@ public <A, R, T> T execute(Collector<Record, A, R> recordCollector, ResultFinish
7880
var summary = result.consume();
7981
return resultFinisher.finish(result.keys(), finishedValue, summary);
8082
};
81-
return config.routing().equals(RoutingControl.READ)
82-
? session.executeRead(txCallback)
83-
: session.executeWrite(txCallback);
83+
var accessMode = config.routing().equals(RoutingControl.WRITE) ? AccessMode.WRITE : AccessMode.READ;
84+
return session.execute(accessMode, txCallback, TransactionConfig.empty(), false);
8485
}
8586
}
8687

driver/src/main/java/org/neo4j/driver/internal/InternalSession.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -107,12 +107,12 @@ public <T> T readTransaction(TransactionWork<T> work) {
107107
@Override
108108
@Deprecated
109109
public <T> T readTransaction(TransactionWork<T> work, TransactionConfig config) {
110-
return transaction(AccessMode.READ, work, config);
110+
return transaction(AccessMode.READ, work, config, true);
111111
}
112112

113113
@Override
114114
public <T> T executeRead(TransactionCallback<T> callback, TransactionConfig config) {
115-
return readTransaction(tx -> callback.execute(new DelegatingTransactionContext(tx)), config);
115+
return execute(AccessMode.READ, callback, config, true);
116116
}
117117

118118
@Override
@@ -124,12 +124,12 @@ public <T> T writeTransaction(TransactionWork<T> work) {
124124
@Override
125125
@Deprecated
126126
public <T> T writeTransaction(TransactionWork<T> work, TransactionConfig config) {
127-
return transaction(AccessMode.WRITE, work, config);
127+
return transaction(AccessMode.WRITE, work, config, true);
128128
}
129129

130130
@Override
131131
public <T> T executeWrite(TransactionCallback<T> callback, TransactionConfig config) {
132-
return writeTransaction(tx -> callback.execute(new DelegatingTransactionContext(tx)), config);
132+
return execute(AccessMode.WRITE, callback, config, true);
133133
}
134134

135135
@Override
@@ -151,14 +151,21 @@ public void reset() {
151151
() -> terminateConnectionOnThreadInterrupt("Thread interrupted while resetting the session"));
152152
}
153153

154+
<T> T execute(AccessMode accessMode, TransactionCallback<T> callback, TransactionConfig config, boolean flush) {
155+
return transaction(accessMode, tx -> callback.execute(new DelegatingTransactionContext(tx)), config, flush);
156+
}
157+
154158
private <T> T transaction(
155-
AccessMode mode, @SuppressWarnings("deprecation") TransactionWork<T> work, TransactionConfig config) {
159+
AccessMode mode,
160+
@SuppressWarnings("deprecation") TransactionWork<T> work,
161+
TransactionConfig config,
162+
boolean flush) {
156163
// use different code path compared to async so that work is executed in the caller thread
157164
// caller thread will also be the one who sleeps between retries;
158165
// it is unsafe to execute retries in the event loop threads because this can cause a deadlock
159166
// event loop thread will bock and wait for itself to read some data
160167
return session.retryLogic().retry(() -> {
161-
try (var tx = beginTransaction(mode, config)) {
168+
try (var tx = beginTransaction(mode, config, flush)) {
162169

163170
var result = work.execute(tx);
164171
if (result instanceof Result) {
@@ -175,9 +182,9 @@ private <T> T transaction(
175182
});
176183
}
177184

178-
private Transaction beginTransaction(AccessMode mode, TransactionConfig config) {
185+
private Transaction beginTransaction(AccessMode mode, TransactionConfig config, boolean flush) {
179186
var tx = Futures.blockingGet(
180-
session.beginTransactionAsync(mode, config),
187+
session.beginTransactionAsync(mode, config, null, flush),
181188
() -> terminateConnectionOnThreadInterrupt("Thread interrupted while starting a transaction"));
182189
return new InternalTransaction(tx);
183190
}

driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -127,19 +127,19 @@ public CompletionStage<RxResultCursor> runRx(
127127
}
128128

129129
public CompletionStage<UnmanagedTransaction> beginTransactionAsync(TransactionConfig config) {
130-
return beginTransactionAsync(mode, config, null);
130+
return beginTransactionAsync(mode, config, null, true);
131131
}
132132

133133
public CompletionStage<UnmanagedTransaction> beginTransactionAsync(TransactionConfig config, String txType) {
134-
return this.beginTransactionAsync(mode, config, txType);
134+
return this.beginTransactionAsync(mode, config, txType, true);
135135
}
136136

137137
public CompletionStage<UnmanagedTransaction> beginTransactionAsync(AccessMode mode, TransactionConfig config) {
138-
return beginTransactionAsync(mode, config, null);
138+
return beginTransactionAsync(mode, config, null, true);
139139
}
140140

141141
public CompletionStage<UnmanagedTransaction> beginTransactionAsync(
142-
AccessMode mode, TransactionConfig config, String txType) {
142+
AccessMode mode, TransactionConfig config, String txType, boolean flush) {
143143
ensureSessionIsOpen();
144144

145145
// create a chain that acquires connection and starts a transaction
@@ -150,7 +150,7 @@ public CompletionStage<UnmanagedTransaction> beginTransactionAsync(
150150
.thenCompose(connection -> {
151151
var tx = new UnmanagedTransaction(
152152
connection, this::handleNewBookmark, fetchSize, notificationConfig, logging);
153-
return tx.beginAsync(determineBookmarks(true), config, txType);
153+
return tx.beginAsync(determineBookmarks(true), config, txType, flush);
154154
});
155155

156156
// update the reference to the only known transaction

driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ private enum State {
9999
private Throwable causeOfTermination;
100100
private CompletionStage<Void> terminationStage;
101101
private final NotificationConfig notificationConfig;
102+
private final CompletableFuture<UnmanagedTransaction> beginFuture = new CompletableFuture<>();
102103
private final Logging logging;
103104

104105
public UnmanagedTransaction(
@@ -128,9 +129,10 @@ protected UnmanagedTransaction(
128129
connection.bindTerminationAwareStateLockingExecutor(this);
129130
}
130131

132+
// flush = false is only supported for async mode with a single subsequent run
131133
public CompletionStage<UnmanagedTransaction> beginAsync(
132-
Set<Bookmark> initialBookmarks, TransactionConfig config, String txType) {
133-
return protocol.beginTransaction(connection, initialBookmarks, config, txType, notificationConfig, logging)
134+
Set<Bookmark> initialBookmarks, TransactionConfig config, String txType, boolean flush) {
135+
protocol.beginTransaction(connection, initialBookmarks, config, txType, notificationConfig, logging, flush)
134136
.handle((ignore, beginError) -> {
135137
if (beginError != null) {
136138
if (beginError instanceof AuthorizationExpiredException) {
@@ -143,7 +145,9 @@ public CompletionStage<UnmanagedTransaction> beginAsync(
143145
throw asCompletionException(beginError);
144146
}
145147
return this;
146-
});
148+
})
149+
.whenComplete(futureCompletingConsumer(beginFuture));
150+
return flush ? beginFuture : CompletableFuture.completedFuture(this);
147151
}
148152

149153
public CompletionStage<Void> closeAsync() {
@@ -167,9 +171,9 @@ public CompletionStage<ResultCursor> runAsync(Query query) {
167171
var cursorStage = protocol.runInUnmanagedTransaction(connection, query, this, fetchSize)
168172
.asyncResult();
169173
resultCursors.add(cursorStage);
170-
return cursorStage
174+
return beginFuture.thenCompose(ignored -> cursorStage
171175
.thenCompose(AsyncResultCursor::mapSuccessfulRunCompletionAsync)
172-
.thenApply(Function.identity());
176+
.thenApply(Function.identity()));
173177
}
174178

175179
public CompletionStage<RxResultCursor> runRx(Query query) {

driver/src/main/java/org/neo4j/driver/internal/handlers/HelloV51ResponseHandler.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,22 @@
1919
package org.neo4j.driver.internal.handlers;
2020

2121
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setConnectionId;
22+
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setConnectionReadTimeout;
2223
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setServerAgent;
2324
import static org.neo4j.driver.internal.util.MetadataExtractor.extractServer;
2425

2526
import io.netty.channel.Channel;
2627
import java.util.Map;
28+
import java.util.Optional;
2729
import java.util.concurrent.CompletableFuture;
30+
import java.util.function.Supplier;
2831
import org.neo4j.driver.Value;
2932
import org.neo4j.driver.internal.spi.ResponseHandler;
3033

3134
public class HelloV51ResponseHandler implements ResponseHandler {
3235
private static final String CONNECTION_ID_METADATA_KEY = "connection_id";
36+
public static final String CONFIGURATION_HINTS_KEY = "hints";
37+
public static final String CONNECTION_RECEIVE_TIMEOUT_SECONDS_KEY = "connection.recv_timeout_seconds";
3338

3439
private final Channel channel;
3540
private final CompletableFuture<Void> helloFuture;
@@ -48,6 +53,8 @@ public void onSuccess(Map<String, Value> metadata) {
4853
var connectionId = extractConnectionId(metadata);
4954
setConnectionId(channel, connectionId);
5055

56+
processConfigurationHints(metadata);
57+
5158
helloFuture.complete(null);
5259
} catch (Throwable error) {
5360
onFailure(error);
@@ -65,6 +72,16 @@ public void onRecord(Value[] fields) {
6572
throw new UnsupportedOperationException();
6673
}
6774

75+
private void processConfigurationHints(Map<String, Value> metadata) {
76+
var configurationHints = metadata.get(CONFIGURATION_HINTS_KEY);
77+
if (configurationHints != null) {
78+
getFromSupplierOrEmptyOnException(() -> configurationHints
79+
.get(CONNECTION_RECEIVE_TIMEOUT_SECONDS_KEY)
80+
.asLong())
81+
.ifPresent(timeout -> setConnectionReadTimeout(channel, timeout));
82+
}
83+
}
84+
6885
private static String extractConnectionId(Map<String, Value> metadata) {
6986
var value = metadata.get(CONNECTION_ID_METADATA_KEY);
7087
if (value == null || value.isNull()) {
@@ -73,4 +90,12 @@ private static String extractConnectionId(Map<String, Value> metadata) {
7390
}
7491
return value.asString();
7592
}
93+
94+
private static <T> Optional<T> getFromSupplierOrEmptyOnException(Supplier<T> supplier) {
95+
try {
96+
return Optional.of(supplier.get());
97+
} catch (Exception e) {
98+
return Optional.empty();
99+
}
100+
}
76101
}

driver/src/main/java/org/neo4j/driver/internal/messaging/BoltProtocol.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ void initializeChannel(
9595
* @param txType the Kernel transaction type
9696
* @param notificationConfig the notification configuration
9797
* @param logging the driver logging
98+
* @param flush defines whether to flush the message to the connection
9899
* @return a completion stage completed when transaction is started or completed exceptionally when there was a failure.
99100
*/
100101
CompletionStage<Void> beginTransaction(
@@ -103,7 +104,8 @@ CompletionStage<Void> beginTransaction(
103104
TransactionConfig config,
104105
String txType,
105106
NotificationConfig notificationConfig,
106-
Logging logging);
107+
Logging logging,
108+
boolean flush);
107109

108110
/**
109111
* Commit the unmanaged transaction.

driver/src/main/java/org/neo4j/driver/internal/messaging/v3/BoltProtocolV3.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,8 @@ public CompletionStage<Void> beginTransaction(
137137
TransactionConfig config,
138138
String txType,
139139
NotificationConfig notificationConfig,
140-
Logging logging) {
140+
Logging logging,
141+
boolean flush) {
141142
var exception = verifyNotificationConfigSupported(notificationConfig);
142143
if (exception != null) {
143144
return CompletableFuture.failedStage(exception);
@@ -158,7 +159,12 @@ public CompletionStage<Void> beginTransaction(
158159
txType,
159160
notificationConfig,
160161
logging);
161-
connection.writeAndFlush(beginMessage, new BeginTxResponseHandler(beginTxFuture));
162+
var handler = new BeginTxResponseHandler(beginTxFuture);
163+
if (flush) {
164+
connection.writeAndFlush(beginMessage, handler);
165+
} else {
166+
connection.write(beginMessage, handler);
167+
}
162168
return beginTxFuture;
163169
}
164170

driver/src/test/java/org/neo4j/driver/internal/InternalExecutableQueryTest.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.junit.jupiter.api.Assertions.assertEquals;
2222
import static org.junit.jupiter.api.Assertions.assertThrows;
2323
import static org.mockito.ArgumentMatchers.any;
24+
import static org.mockito.ArgumentMatchers.eq;
2425
import static org.mockito.BDDMockito.given;
2526
import static org.mockito.BDDMockito.then;
2627
import static org.mockito.Mockito.mock;
@@ -29,13 +30,13 @@
2930
import java.util.List;
3031
import java.util.Map;
3132
import java.util.function.BiConsumer;
32-
import java.util.function.BiFunction;
3333
import java.util.function.Function;
3434
import java.util.stream.Collector;
3535
import org.junit.jupiter.api.Test;
3636
import org.junit.jupiter.params.ParameterizedTest;
3737
import org.junit.jupiter.params.provider.MethodSource;
3838
import org.mockito.ArgumentCaptor;
39+
import org.neo4j.driver.AccessMode;
3940
import org.neo4j.driver.BookmarkManager;
4041
import org.neo4j.driver.Driver;
4142
import org.neo4j.driver.ExecutableQuery;
@@ -44,9 +45,9 @@
4445
import org.neo4j.driver.Record;
4546
import org.neo4j.driver.Result;
4647
import org.neo4j.driver.RoutingControl;
47-
import org.neo4j.driver.Session;
4848
import org.neo4j.driver.SessionConfig;
4949
import org.neo4j.driver.TransactionCallback;
50+
import org.neo4j.driver.TransactionConfig;
5051
import org.neo4j.driver.TransactionContext;
5152
import org.neo4j.driver.summary.ResultSummary;
5253

@@ -126,15 +127,15 @@ void shouldExecuteAndReturnResult(RoutingControl routingControl) {
126127
var driver = mock(Driver.class);
127128
var bookmarkManager = mock(BookmarkManager.class);
128129
given(driver.executableQueryBookmarkManager()).willReturn(bookmarkManager);
129-
var session = mock(Session.class);
130+
var session = mock(InternalSession.class);
130131
given(driver.session(any(SessionConfig.class))).willReturn(session);
131132
var txContext = mock(TransactionContext.class);
132-
BiFunction<Session, TransactionCallback<Object>, Object> executeMethod =
133-
routingControl.equals(RoutingControl.READ) ? Session::executeRead : Session::executeWrite;
134-
given(executeMethod.apply(session, any())).willAnswer(answer -> {
135-
TransactionCallback<?> txCallback = answer.getArgument(0);
136-
return txCallback.execute(txContext);
137-
});
133+
var accessMode = routingControl.equals(RoutingControl.WRITE) ? AccessMode.WRITE : AccessMode.READ;
134+
given(session.execute(eq(accessMode), any(), eq(TransactionConfig.empty()), eq(false)))
135+
.willAnswer(answer -> {
136+
TransactionCallback<?> txCallback = answer.getArgument(1);
137+
return txCallback.execute(txContext);
138+
});
138139
var result = mock(Result.class);
139140
given(txContext.run(any(Query.class))).willReturn(result);
140141
var keys = List.of("key");
@@ -180,7 +181,7 @@ var record = mock(Record.class);
180181
.withBookmarkManager(bookmarkManager)
181182
.build();
182183
assertEquals(expectedSessionConfig, sessionConfig);
183-
executeMethod.apply(then(session).should(), any(TransactionCallback.class));
184+
then(session).should().execute(eq(accessMode), any(), eq(TransactionConfig.empty()), eq(false));
184185
then(txContext).should().run(query.withParameters(params));
185186
then(result).should(times(2)).hasNext();
186187
then(result).should().next();

0 commit comments

Comments
 (0)