Skip to content

Commit 855fa3c

Browse files
committed
Improve explicit transaction terminated state handling
An explicit transaction is considered terminated as soon as a failure occurs. Any usage of the terminated transaction and any of its results must be stopped and the transaction must be closed explicitly. This update aims to ensure that the driver does not send further Bolt messages in regards to the terminated transaction. In addition, a new `TransactionTerminatedException` class has been introduced. It is a subclass of the previously used `ClientException`, making this exception more specific. The exception will contain a non-null `code` if it is created based on the server's response. It will not have a code if it is generated by the driver. Depending on a failure cause, the result handles may emit other exceptions respectively, matching the driver's existing behaviour.
1 parent 52aef12 commit 855fa3c

29 files changed

+771
-563
lines changed
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.exceptions;
20+
21+
import java.io.Serial;
22+
23+
/**
24+
* Indicates that the transaction has been terminated.
25+
* <p>
26+
* Any usage of the terminated transaction and any of its results must be stopped and the transaction must be closed
27+
* explicitly. Moreover, any error in the transaction result(s) should be considered as a transaction termination and
28+
* must be handled in the same way.
29+
* <p>
30+
* The exception will contain a non-null {@link #code()} if it is created based on the server's response. It will not
31+
* have a code if it is generated by the driver.
32+
*
33+
* @since 5.11
34+
*/
35+
public class TransactionTerminatedException extends ClientException {
36+
@Serial
37+
private static final long serialVersionUID = 7639191706067500206L;
38+
39+
/**
40+
* Creates a new instance.
41+
*
42+
* @param message the message
43+
*/
44+
public TransactionTerminatedException(String message) {
45+
super(message);
46+
}
47+
48+
/**
49+
* Creates a new instance.
50+
*
51+
* @param code the code
52+
* @param message the message
53+
*/
54+
public TransactionTerminatedException(String code, String message) {
55+
super(code, message);
56+
}
57+
58+
/**
59+
* Creates a new instance.
60+
*
61+
* @param message the message
62+
* @param cause the cause
63+
*/
64+
public TransactionTerminatedException(String message, Throwable cause) {
65+
super(message, cause);
66+
}
67+
}

driver/src/main/java/org/neo4j/driver/internal/InternalTransaction.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,22 @@ public boolean isOpen() {
6666
return tx.isOpen();
6767
}
6868

69+
/**
70+
* <b>THIS IS A PRIVATE API</b>
71+
* <p>
72+
* Terminates the transaction by sending the Bolt {@code RESET} message and waiting for its response as long as the
73+
* transaction has not already been terminated, is not closed or closing.
74+
*
75+
* @since 5.11
76+
* @throws org.neo4j.driver.exceptions.ClientException if the transaction is closed or is closing
77+
* @see org.neo4j.driver.exceptions.TransactionTerminatedException
78+
*/
79+
public void terminate() {
80+
Futures.blockingGet(
81+
tx.terminateAsync(),
82+
() -> terminateConnectionOnThreadInterrupt("Thread interrupted while terminating the transaction"));
83+
}
84+
6985
private void terminateConnectionOnThreadInterrupt(String reason) {
7086
tx.connection().terminateAndRelease(reason);
7187
}

driver/src/main/java/org/neo4j/driver/internal/async/NetworkConnection.java

Lines changed: 90 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,17 @@
2222
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.poolId;
2323
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setTerminationReason;
2424
import static org.neo4j.driver.internal.util.Futures.asCompletionStage;
25+
import static org.neo4j.driver.internal.util.LockUtil.executeWithLock;
2526

2627
import io.netty.channel.Channel;
2728
import io.netty.channel.ChannelHandler;
2829
import java.time.Clock;
2930
import java.util.concurrent.CompletableFuture;
3031
import java.util.concurrent.CompletionStage;
3132
import java.util.concurrent.TimeUnit;
32-
import java.util.concurrent.atomic.AtomicReference;
33+
import java.util.concurrent.locks.Lock;
34+
import java.util.concurrent.locks.ReentrantLock;
35+
import java.util.function.Consumer;
3336
import org.neo4j.driver.Logger;
3437
import org.neo4j.driver.Logging;
3538
import org.neo4j.driver.internal.BoltServerAddress;
@@ -41,7 +44,12 @@
4144
import org.neo4j.driver.internal.handlers.ResetResponseHandler;
4245
import org.neo4j.driver.internal.messaging.BoltProtocol;
4346
import org.neo4j.driver.internal.messaging.Message;
47+
import org.neo4j.driver.internal.messaging.request.DiscardAllMessage;
48+
import org.neo4j.driver.internal.messaging.request.DiscardMessage;
49+
import org.neo4j.driver.internal.messaging.request.PullAllMessage;
50+
import org.neo4j.driver.internal.messaging.request.PullMessage;
4451
import org.neo4j.driver.internal.messaging.request.ResetMessage;
52+
import org.neo4j.driver.internal.messaging.request.RunWithMetadataMessage;
4553
import org.neo4j.driver.internal.metrics.ListenerEvent;
4654
import org.neo4j.driver.internal.metrics.MetricsListener;
4755
import org.neo4j.driver.internal.spi.Connection;
@@ -53,6 +61,7 @@
5361
*/
5462
public class NetworkConnection implements Connection {
5563
private final Logger log;
64+
private final Lock lock;
5665
private final Channel channel;
5766
private final InboundMessageDispatcher messageDispatcher;
5867
private final String serverAgent;
@@ -61,12 +70,13 @@ public class NetworkConnection implements Connection {
6170
private final ExtendedChannelPool channelPool;
6271
private final CompletableFuture<Void> releaseFuture;
6372
private final Clock clock;
64-
65-
private final AtomicReference<Status> status = new AtomicReference<>(Status.OPEN);
6673
private final MetricsListener metricsListener;
6774
private final ListenerEvent<?> inUseEvent;
6875

6976
private final Long connectionReadTimeout;
77+
78+
private Status status = Status.OPEN;
79+
private UnmanagedTransaction transaction;
7080
private ChannelHandler connectionReadTimeoutHandler;
7181

7282
public NetworkConnection(
@@ -76,6 +86,7 @@ public NetworkConnection(
7686
MetricsListener metricsListener,
7787
Logging logging) {
7888
this.log = logging.getLog(getClass());
89+
this.lock = new ReentrantLock();
7990
this.channel = channel;
8091
this.messageDispatcher = ChannelAttributes.messageDispatcher(channel);
8192
this.serverAgent = ChannelAttributes.serverAgent(channel);
@@ -93,7 +104,7 @@ public NetworkConnection(
93104

94105
@Override
95106
public boolean isOpen() {
96-
return status.get() == Status.OPEN;
107+
return executeWithLock(lock, () -> status == Status.OPEN);
97108
}
98109

99110
@Override
@@ -110,52 +121,31 @@ public void disableAutoRead() {
110121
}
111122
}
112123

113-
@Override
114-
public void flush() {
115-
if (verifyOpen(null, null)) {
116-
flushInEventLoop();
117-
}
118-
}
119-
120124
@Override
121125
public void write(Message message, ResponseHandler handler) {
122-
if (verifyOpen(handler, null)) {
126+
if (verifyOpen(handler)) {
123127
writeMessageInEventLoop(message, handler, false);
124128
}
125129
}
126130

127-
@Override
128-
public void write(Message message1, ResponseHandler handler1, Message message2, ResponseHandler handler2) {
129-
if (verifyOpen(handler1, handler2)) {
130-
writeMessagesInEventLoop(message1, handler1, message2, handler2, false);
131-
}
132-
}
133-
134131
@Override
135132
public void writeAndFlush(Message message, ResponseHandler handler) {
136-
if (verifyOpen(handler, null)) {
133+
if (verifyOpen(handler)) {
137134
writeMessageInEventLoop(message, handler, true);
138135
}
139136
}
140137

141138
@Override
142-
public void writeAndFlush(Message message1, ResponseHandler handler1, Message message2, ResponseHandler handler2) {
143-
if (verifyOpen(handler1, handler2)) {
144-
writeMessagesInEventLoop(message1, handler1, message2, handler2, true);
145-
}
146-
}
147-
148-
@Override
149-
public CompletionStage<Void> reset() {
150-
CompletableFuture<Void> result = new CompletableFuture<>();
151-
ResetResponseHandler handler = new ResetResponseHandler(messageDispatcher, result);
139+
public CompletionStage<Void> reset(Throwable throwable) {
140+
var result = new CompletableFuture<Void>();
141+
var handler = new ResetResponseHandler(messageDispatcher, result, throwable);
152142
writeResetMessageIfNeeded(handler, true);
153143
return result;
154144
}
155145

156146
@Override
157147
public CompletionStage<Void> release() {
158-
if (status.compareAndSet(Status.OPEN, Status.RELEASED)) {
148+
if (executeWithLock(lock, () -> updateStateIfOpen(Status.RELEASED))) {
159149
ChannelReleasingResetResponseHandler handler = new ChannelReleasingResetResponseHandler(
160150
channel, channelPool, messageDispatcher, clock, releaseFuture);
161151

@@ -167,7 +157,7 @@ public CompletionStage<Void> release() {
167157

168158
@Override
169159
public void terminateAndRelease(String reason) {
170-
if (status.compareAndSet(Status.OPEN, Status.TERMINATED)) {
160+
if (executeWithLock(lock, () -> updateStateIfOpen(Status.TERMINATED))) {
171161
setTerminationReason(channel, reason);
172162
asCompletionStage(channel.close())
173163
.exceptionally(throwable -> null)
@@ -194,6 +184,25 @@ public BoltProtocol protocol() {
194184
return protocol;
195185
}
196186

187+
@Override
188+
public void bindTransaction(UnmanagedTransaction transaction) {
189+
executeWithLock(lock, () -> {
190+
if (this.transaction != null) {
191+
throw new IllegalStateException("transaction is already set");
192+
}
193+
this.transaction = transaction;
194+
});
195+
}
196+
197+
private boolean updateStateIfOpen(Status newStatus) {
198+
if (Status.OPEN.equals(status)) {
199+
status = newStatus;
200+
return true;
201+
} else {
202+
return false;
203+
}
204+
}
205+
197206
private void writeResetMessageIfNeeded(ResponseHandler resetHandler, boolean isSessionReset) {
198207
channel.eventLoop().execute(() -> {
199208
if (isSessionReset && !isOpen()) {
@@ -208,73 +217,49 @@ private void writeResetMessageIfNeeded(ResponseHandler resetHandler, boolean isS
208217
});
209218
}
210219

211-
private void flushInEventLoop() {
212-
channel.eventLoop().execute(() -> {
213-
channel.flush();
214-
registerConnectionReadTimeout(channel);
215-
});
216-
}
217-
218220
private void writeMessageInEventLoop(Message message, ResponseHandler handler, boolean flush) {
219-
channel.eventLoop().execute(() -> {
220-
messageDispatcher.enqueue(handler);
221-
222-
if (flush) {
223-
channel.writeAndFlush(message).addListener(future -> registerConnectionReadTimeout(channel));
224-
} else {
225-
channel.write(message, channel.voidPromise());
226-
}
227-
});
228-
}
229-
230-
private void writeMessagesInEventLoop(
231-
Message message1, ResponseHandler handler1, Message message2, ResponseHandler handler2, boolean flush) {
232-
channel.eventLoop().execute(() -> {
233-
messageDispatcher.enqueue(handler1);
234-
messageDispatcher.enqueue(handler2);
235-
236-
channel.write(message1, channel.voidPromise());
237-
238-
if (flush) {
239-
channel.writeAndFlush(message2).addListener(future -> registerConnectionReadTimeout(channel));
240-
} else {
241-
channel.write(message2, channel.voidPromise());
242-
}
243-
});
221+
channel.eventLoop()
222+
.execute(() -> transactionTerminationAwareExecutor(message).accept(causeOfTermination -> {
223+
if (causeOfTermination == null) {
224+
messageDispatcher.enqueue(handler);
225+
226+
if (flush) {
227+
channel.writeAndFlush(message)
228+
.addListener(future -> registerConnectionReadTimeout(channel));
229+
} else {
230+
channel.write(message, channel.voidPromise());
231+
}
232+
} else {
233+
handler.onFailure(causeOfTermination);
234+
}
235+
}));
244236
}
245237

246238
private void setAutoRead(boolean value) {
247239
channel.config().setAutoRead(value);
248240
}
249241

250-
private boolean verifyOpen(ResponseHandler handler1, ResponseHandler handler2) {
251-
Status connectionStatus = this.status.get();
252-
switch (connectionStatus) {
253-
case OPEN:
254-
return true;
255-
case RELEASED:
242+
private boolean verifyOpen(ResponseHandler handler) {
243+
var connectionStatus = executeWithLock(lock, () -> status);
244+
return switch (connectionStatus) {
245+
case OPEN -> true;
246+
case RELEASED -> {
256247
Exception error =
257248
new IllegalStateException("Connection has been released to the pool and can't be used");
258-
if (handler1 != null) {
259-
handler1.onFailure(error);
249+
if (handler != null) {
250+
handler.onFailure(error);
260251
}
261-
if (handler2 != null) {
262-
handler2.onFailure(error);
263-
}
264-
return false;
265-
case TERMINATED:
252+
yield false;
253+
}
254+
case TERMINATED -> {
266255
Exception terminatedError =
267256
new IllegalStateException("Connection has been terminated and can't be used");
268-
if (handler1 != null) {
269-
handler1.onFailure(terminatedError);
270-
}
271-
if (handler2 != null) {
272-
handler2.onFailure(terminatedError);
257+
if (handler != null) {
258+
handler.onFailure(terminatedError);
273259
}
274-
return false;
275-
default:
276-
throw new IllegalStateException("Unknown status: " + connectionStatus);
277-
}
260+
yield false;
261+
}
262+
};
278263
}
279264

280265
private void registerConnectionReadTimeout(Channel channel) {
@@ -295,6 +280,25 @@ private void registerConnectionReadTimeout(Channel channel) {
295280
}
296281
}
297282

283+
private Consumer<Consumer<Throwable>> transactionTerminationAwareExecutor(Message message) {
284+
var result = (Consumer<Consumer<Throwable>>) consumer -> consumer.accept(null);
285+
if (isQueryMessage(message)) {
286+
var transaction = executeWithLock(lock, () -> this.transaction);
287+
if (transaction != null) {
288+
result = transaction::executeWithLockedState;
289+
}
290+
}
291+
return result;
292+
}
293+
294+
private boolean isQueryMessage(Message message) {
295+
return message instanceof RunWithMetadataMessage
296+
|| message instanceof PullMessage
297+
|| message instanceof PullAllMessage
298+
|| message instanceof DiscardMessage
299+
|| message instanceof DiscardAllMessage;
300+
}
301+
298302
private enum Status {
299303
OPEN,
300304
RELEASED,

0 commit comments

Comments
 (0)