Skip to content

Commit e1613f7

Browse files
committed
Fix tx status handling
1 parent 52aef12 commit e1613f7

28 files changed

+766
-563
lines changed
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.exceptions;
20+
21+
import java.io.Serial;
22+
23+
/**
24+
* Indicates that the transaction has been terminated.
25+
* <p>
26+
* Any usage of the terminated transaction and any of its results must be stopped and the transaction must be closed
27+
* explicitly. Moreover, any error in the transaction result(s) should be considered as a transaction termination and
28+
* must be handled in the same way.
29+
* <p>
30+
* The exception will contain a non-null {@link #code()} if it is created based on the server's response. It will not
31+
* have a code if it is generated by the driver.
32+
*
33+
* @since 5.11
34+
*/
35+
public class TransactionTerminatedException extends ClientException {
36+
@Serial
37+
private static final long serialVersionUID = 7639191706067500206L;
38+
39+
/**
40+
* Creates a new instance.
41+
*
42+
* @param message the message
43+
*/
44+
public TransactionTerminatedException(String message) {
45+
super(message);
46+
}
47+
48+
/**
49+
* Creates a new instance.
50+
*
51+
* @param code the code
52+
* @param message the message
53+
*/
54+
public TransactionTerminatedException(String code, String message) {
55+
super(code, message);
56+
}
57+
58+
/**
59+
* Creates a new instance.
60+
*
61+
* @param message the message
62+
* @param cause the cause
63+
*/
64+
public TransactionTerminatedException(String message, Throwable cause) {
65+
super(message, cause);
66+
}
67+
}

driver/src/main/java/org/neo4j/driver/internal/InternalTransaction.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,22 @@ public boolean isOpen() {
6666
return tx.isOpen();
6767
}
6868

69+
/**
70+
* <b>THIS IS A PRIVATE API</b>
71+
* <p>
72+
* Terminates the transaction by sending the Bolt {@code RESET} message and waiting for its response as long as the
73+
* transaction has not already been terminated, is not closed or closing.
74+
*
75+
* @since 5.11
76+
* @throws org.neo4j.driver.exceptions.ClientException if the transaction is closed or is closing
77+
* @see org.neo4j.driver.exceptions.TransactionTerminatedException
78+
*/
79+
public void terminate() {
80+
Futures.blockingGet(
81+
tx.terminateAsync(),
82+
() -> terminateConnectionOnThreadInterrupt("Thread interrupted while terminating the transaction"));
83+
}
84+
6985
private void terminateConnectionOnThreadInterrupt(String reason) {
7086
tx.connection().terminateAndRelease(reason);
7187
}

driver/src/main/java/org/neo4j/driver/internal/async/NetworkConnection.java

Lines changed: 90 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,17 @@
2222
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.poolId;
2323
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setTerminationReason;
2424
import static org.neo4j.driver.internal.util.Futures.asCompletionStage;
25+
import static org.neo4j.driver.internal.util.LockUtil.executeWithLock;
2526

2627
import io.netty.channel.Channel;
2728
import io.netty.channel.ChannelHandler;
2829
import java.time.Clock;
2930
import java.util.concurrent.CompletableFuture;
3031
import java.util.concurrent.CompletionStage;
3132
import java.util.concurrent.TimeUnit;
32-
import java.util.concurrent.atomic.AtomicReference;
33+
import java.util.concurrent.locks.Lock;
34+
import java.util.concurrent.locks.ReentrantLock;
35+
import java.util.function.Consumer;
3336
import org.neo4j.driver.Logger;
3437
import org.neo4j.driver.Logging;
3538
import org.neo4j.driver.internal.BoltServerAddress;
@@ -41,7 +44,12 @@
4144
import org.neo4j.driver.internal.handlers.ResetResponseHandler;
4245
import org.neo4j.driver.internal.messaging.BoltProtocol;
4346
import org.neo4j.driver.internal.messaging.Message;
47+
import org.neo4j.driver.internal.messaging.request.DiscardAllMessage;
48+
import org.neo4j.driver.internal.messaging.request.DiscardMessage;
49+
import org.neo4j.driver.internal.messaging.request.PullAllMessage;
50+
import org.neo4j.driver.internal.messaging.request.PullMessage;
4451
import org.neo4j.driver.internal.messaging.request.ResetMessage;
52+
import org.neo4j.driver.internal.messaging.request.RunWithMetadataMessage;
4553
import org.neo4j.driver.internal.metrics.ListenerEvent;
4654
import org.neo4j.driver.internal.metrics.MetricsListener;
4755
import org.neo4j.driver.internal.spi.Connection;
@@ -53,6 +61,7 @@
5361
*/
5462
public class NetworkConnection implements Connection {
5563
private final Logger log;
64+
private final Lock lock;
5665
private final Channel channel;
5766
private final InboundMessageDispatcher messageDispatcher;
5867
private final String serverAgent;
@@ -61,12 +70,13 @@ public class NetworkConnection implements Connection {
6170
private final ExtendedChannelPool channelPool;
6271
private final CompletableFuture<Void> releaseFuture;
6372
private final Clock clock;
64-
65-
private final AtomicReference<Status> status = new AtomicReference<>(Status.OPEN);
6673
private final MetricsListener metricsListener;
6774
private final ListenerEvent<?> inUseEvent;
6875

6976
private final Long connectionReadTimeout;
77+
78+
private Status status = Status.OPEN;
79+
private UnmanagedTransaction transaction;
7080
private ChannelHandler connectionReadTimeoutHandler;
7181

7282
public NetworkConnection(
@@ -76,6 +86,7 @@ public NetworkConnection(
7686
MetricsListener metricsListener,
7787
Logging logging) {
7888
this.log = logging.getLog(getClass());
89+
this.lock = new ReentrantLock();
7990
this.channel = channel;
8091
this.messageDispatcher = ChannelAttributes.messageDispatcher(channel);
8192
this.serverAgent = ChannelAttributes.serverAgent(channel);
@@ -93,7 +104,7 @@ public NetworkConnection(
93104

94105
@Override
95106
public boolean isOpen() {
96-
return status.get() == Status.OPEN;
107+
return executeWithLock(lock, () -> status == Status.OPEN);
97108
}
98109

99110
@Override
@@ -110,52 +121,31 @@ public void disableAutoRead() {
110121
}
111122
}
112123

113-
@Override
114-
public void flush() {
115-
if (verifyOpen(null, null)) {
116-
flushInEventLoop();
117-
}
118-
}
119-
120124
@Override
121125
public void write(Message message, ResponseHandler handler) {
122-
if (verifyOpen(handler, null)) {
126+
if (verifyOpen(handler)) {
123127
writeMessageInEventLoop(message, handler, false);
124128
}
125129
}
126130

127-
@Override
128-
public void write(Message message1, ResponseHandler handler1, Message message2, ResponseHandler handler2) {
129-
if (verifyOpen(handler1, handler2)) {
130-
writeMessagesInEventLoop(message1, handler1, message2, handler2, false);
131-
}
132-
}
133-
134131
@Override
135132
public void writeAndFlush(Message message, ResponseHandler handler) {
136-
if (verifyOpen(handler, null)) {
133+
if (verifyOpen(handler)) {
137134
writeMessageInEventLoop(message, handler, true);
138135
}
139136
}
140137

141138
@Override
142-
public void writeAndFlush(Message message1, ResponseHandler handler1, Message message2, ResponseHandler handler2) {
143-
if (verifyOpen(handler1, handler2)) {
144-
writeMessagesInEventLoop(message1, handler1, message2, handler2, true);
145-
}
146-
}
147-
148-
@Override
149-
public CompletionStage<Void> reset() {
150-
CompletableFuture<Void> result = new CompletableFuture<>();
151-
ResetResponseHandler handler = new ResetResponseHandler(messageDispatcher, result);
139+
public CompletionStage<Void> reset(Throwable throwable) {
140+
var result = new CompletableFuture<Void>();
141+
var handler = new ResetResponseHandler(messageDispatcher, result, throwable);
152142
writeResetMessageIfNeeded(handler, true);
153143
return result;
154144
}
155145

156146
@Override
157147
public CompletionStage<Void> release() {
158-
if (status.compareAndSet(Status.OPEN, Status.RELEASED)) {
148+
if (executeWithLock(lock, () -> updateStateIfOpen(Status.RELEASED))) {
159149
ChannelReleasingResetResponseHandler handler = new ChannelReleasingResetResponseHandler(
160150
channel, channelPool, messageDispatcher, clock, releaseFuture);
161151

@@ -167,7 +157,7 @@ public CompletionStage<Void> release() {
167157

168158
@Override
169159
public void terminateAndRelease(String reason) {
170-
if (status.compareAndSet(Status.OPEN, Status.TERMINATED)) {
160+
if (executeWithLock(lock, () -> updateStateIfOpen(Status.TERMINATED))) {
171161
setTerminationReason(channel, reason);
172162
asCompletionStage(channel.close())
173163
.exceptionally(throwable -> null)
@@ -194,6 +184,25 @@ public BoltProtocol protocol() {
194184
return protocol;
195185
}
196186

187+
@Override
188+
public void bindTransaction(UnmanagedTransaction transaction) {
189+
executeWithLock(lock, () -> {
190+
if (this.transaction != null) {
191+
throw new IllegalStateException("transaction is already set");
192+
}
193+
this.transaction = transaction;
194+
});
195+
}
196+
197+
private boolean updateStateIfOpen(Status newStatus) {
198+
if (Status.OPEN.equals(status)) {
199+
status = newStatus;
200+
return true;
201+
} else {
202+
return false;
203+
}
204+
}
205+
197206
private void writeResetMessageIfNeeded(ResponseHandler resetHandler, boolean isSessionReset) {
198207
channel.eventLoop().execute(() -> {
199208
if (isSessionReset && !isOpen()) {
@@ -208,73 +217,49 @@ private void writeResetMessageIfNeeded(ResponseHandler resetHandler, boolean isS
208217
});
209218
}
210219

211-
private void flushInEventLoop() {
212-
channel.eventLoop().execute(() -> {
213-
channel.flush();
214-
registerConnectionReadTimeout(channel);
215-
});
216-
}
217-
218220
private void writeMessageInEventLoop(Message message, ResponseHandler handler, boolean flush) {
219-
channel.eventLoop().execute(() -> {
220-
messageDispatcher.enqueue(handler);
221-
222-
if (flush) {
223-
channel.writeAndFlush(message).addListener(future -> registerConnectionReadTimeout(channel));
224-
} else {
225-
channel.write(message, channel.voidPromise());
226-
}
227-
});
228-
}
229-
230-
private void writeMessagesInEventLoop(
231-
Message message1, ResponseHandler handler1, Message message2, ResponseHandler handler2, boolean flush) {
232-
channel.eventLoop().execute(() -> {
233-
messageDispatcher.enqueue(handler1);
234-
messageDispatcher.enqueue(handler2);
235-
236-
channel.write(message1, channel.voidPromise());
237-
238-
if (flush) {
239-
channel.writeAndFlush(message2).addListener(future -> registerConnectionReadTimeout(channel));
240-
} else {
241-
channel.write(message2, channel.voidPromise());
242-
}
243-
});
221+
channel.eventLoop()
222+
.execute(() -> transactionTerminationAwareExecutor(message).accept(causeOfTermination -> {
223+
if (causeOfTermination == null) {
224+
messageDispatcher.enqueue(handler);
225+
226+
if (flush) {
227+
channel.writeAndFlush(message)
228+
.addListener(future -> registerConnectionReadTimeout(channel));
229+
} else {
230+
channel.write(message, channel.voidPromise());
231+
}
232+
} else {
233+
handler.onFailure(causeOfTermination);
234+
}
235+
}));
244236
}
245237

246238
private void setAutoRead(boolean value) {
247239
channel.config().setAutoRead(value);
248240
}
249241

250-
private boolean verifyOpen(ResponseHandler handler1, ResponseHandler handler2) {
251-
Status connectionStatus = this.status.get();
252-
switch (connectionStatus) {
253-
case OPEN:
254-
return true;
255-
case RELEASED:
242+
private boolean verifyOpen(ResponseHandler handler) {
243+
var connectionStatus = executeWithLock(lock, () -> status);
244+
return switch (connectionStatus) {
245+
case OPEN -> true;
246+
case RELEASED -> {
256247
Exception error =
257248
new IllegalStateException("Connection has been released to the pool and can't be used");
258-
if (handler1 != null) {
259-
handler1.onFailure(error);
249+
if (handler != null) {
250+
handler.onFailure(error);
260251
}
261-
if (handler2 != null) {
262-
handler2.onFailure(error);
263-
}
264-
return false;
265-
case TERMINATED:
252+
yield false;
253+
}
254+
case TERMINATED -> {
266255
Exception terminatedError =
267256
new IllegalStateException("Connection has been terminated and can't be used");
268-
if (handler1 != null) {
269-
handler1.onFailure(terminatedError);
270-
}
271-
if (handler2 != null) {
272-
handler2.onFailure(terminatedError);
257+
if (handler != null) {
258+
handler.onFailure(terminatedError);
273259
}
274-
return false;
275-
default:
276-
throw new IllegalStateException("Unknown status: " + connectionStatus);
277-
}
260+
yield false;
261+
}
262+
};
278263
}
279264

280265
private void registerConnectionReadTimeout(Channel channel) {
@@ -295,6 +280,25 @@ private void registerConnectionReadTimeout(Channel channel) {
295280
}
296281
}
297282

283+
private Consumer<Consumer<Throwable>> transactionTerminationAwareExecutor(Message message) {
284+
var result = (Consumer<Consumer<Throwable>>) consumer -> consumer.accept(null);
285+
if (isQueryMessage(message)) {
286+
var transaction = executeWithLock(lock, () -> this.transaction);
287+
if (transaction != null) {
288+
result = transaction::executeWithLockedState;
289+
}
290+
}
291+
return result;
292+
}
293+
294+
private boolean isQueryMessage(Message message) {
295+
return message instanceof RunWithMetadataMessage
296+
|| message instanceof PullMessage
297+
|| message instanceof PullAllMessage
298+
|| message instanceof DiscardMessage
299+
|| message instanceof DiscardAllMessage;
300+
}
301+
298302
private enum Status {
299303
OPEN,
300304
RELEASED,

0 commit comments

Comments
 (0)