Skip to content

Commit 960d77b

Browse files
committed
Add transaction interruption support for internal use
This update is for Neo4j internal use only.
1 parent 8845e0b commit 960d77b

File tree

4 files changed

+118
-0
lines changed

4 files changed

+118
-0
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ private enum State {
9191
private CompletableFuture<Void> commitFuture;
9292
private CompletableFuture<Void> rollbackFuture;
9393
private Throwable causeOfTermination;
94+
private CompletionStage<Void> interruptStage;
9495

9596
public UnmanagedTransaction(Connection connection, BookmarksHolder bookmarksHolder, long fetchSize) {
9697
this(connection, bookmarksHolder, fetchSize, new ResultCursorsHolder());
@@ -303,4 +304,21 @@ private CompletionStage<Void> closeAsync(boolean commit, boolean completeWithNul
303304

304305
return stage;
305306
}
307+
308+
/**
309+
* Marks transaction as terminated and sends {@code RESET} message over allocated connection.
310+
* <p>
311+
* <b>THIS METHOD IS NOT PART OF PUBLIC API</b>
312+
*
313+
* @return {@code RESET} response stage
314+
*/
315+
public CompletionStage<Void> interruptAsync() {
316+
return executeWithLock(lock, () -> {
317+
if (interruptStage == null) {
318+
markTerminated(null);
319+
interruptStage = connection.reset();
320+
}
321+
return interruptStage;
322+
});
323+
}
306324
}

driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveTransaction.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,4 +57,15 @@ public Publisher<ReactiveResult> run(Query query) {
5757
})
5858
.map(InternalReactiveResult::new);
5959
}
60+
61+
/**
62+
* Marks transaction as terminated and sends {@code RESET} message over allocated connection.
63+
* <p>
64+
* <b>THIS METHOD IS NOT PART OF PUBLIC API</b>
65+
*
66+
* @return {@code RESET} response publisher
67+
*/
68+
public Publisher<Void> interrupt() {
69+
return Mono.fromCompletionStage(tx.interruptAsync());
70+
}
6071
}

driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,33 @@ void shouldReturnCompletedWithNullStageOnClosingInactiveTransactionExceptCommitt
440440
assertNull(closeStage.toCompletableFuture().join());
441441
}
442442

443+
@Test
444+
void shouldInterruptOnInterruptAsync() {
445+
// Given
446+
Connection connection = connectionMock(BoltProtocolV4.INSTANCE);
447+
UnmanagedTransaction tx = beginTx(connection);
448+
449+
// When
450+
await(tx.interruptAsync());
451+
452+
// Then
453+
then(connection).should().reset();
454+
}
455+
456+
@Test
457+
void shouldServeTheSameStageOnInterruptAsync() {
458+
// Given
459+
Connection connection = connectionMock(BoltProtocolV4.INSTANCE);
460+
UnmanagedTransaction tx = beginTx(connection);
461+
462+
// When
463+
CompletionStage<Void> stage0 = tx.interruptAsync();
464+
CompletionStage<Void> stage1 = tx.interruptAsync();
465+
466+
// Then
467+
assertEquals(stage0, stage1);
468+
}
469+
443470
private static UnmanagedTransaction beginTx(Connection connection) {
444471
return beginTx(connection, Collections.emptySet());
445472
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.reactive;
20+
21+
import static java.util.concurrent.CompletableFuture.completedFuture;
22+
import static org.mockito.BDDMockito.given;
23+
import static org.mockito.BDDMockito.then;
24+
import static org.mockito.Mockito.mock;
25+
import static org.neo4j.driver.internal.util.Futures.failedFuture;
26+
27+
import org.junit.jupiter.api.Test;
28+
import org.neo4j.driver.internal.async.UnmanagedTransaction;
29+
import reactor.test.StepVerifier;
30+
31+
public class InternalReactiveTransactionTest {
32+
private InternalReactiveTransaction tx;
33+
34+
@Test
35+
void shouldDelegateInterrupt() {
36+
// Given
37+
UnmanagedTransaction utx = mock(UnmanagedTransaction.class);
38+
given(utx.interruptAsync()).willReturn(completedFuture(null));
39+
tx = new InternalReactiveTransaction(utx);
40+
41+
// When
42+
StepVerifier.create(tx.interrupt()).expectComplete().verify();
43+
44+
// Then
45+
then(utx).should().interruptAsync();
46+
}
47+
48+
@Test
49+
void shouldDelegateInterruptAndReportError() {
50+
// Given
51+
UnmanagedTransaction utx = mock(UnmanagedTransaction.class);
52+
RuntimeException e = mock(RuntimeException.class);
53+
given(utx.interruptAsync()).willReturn(failedFuture(e));
54+
tx = new InternalReactiveTransaction(utx);
55+
56+
// When
57+
StepVerifier.create(tx.interrupt()).expectErrorMatches(ar -> ar == e).verify();
58+
59+
// Then
60+
then(utx).should().interruptAsync();
61+
}
62+
}

0 commit comments

Comments
 (0)