diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java index e16179ba67..b2c38a2a7c 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java @@ -91,6 +91,7 @@ private enum State { private CompletableFuture commitFuture; private CompletableFuture rollbackFuture; private Throwable causeOfTermination; + private CompletionStage interruptStage; public UnmanagedTransaction(Connection connection, BookmarksHolder bookmarksHolder, long fetchSize) { this(connection, bookmarksHolder, fetchSize, new ResultCursorsHolder()); @@ -303,4 +304,21 @@ private CompletionStage closeAsync(boolean commit, boolean completeWithNul return stage; } + + /** + * Marks transaction as terminated and sends {@code RESET} message over allocated connection. + *

+ * THIS METHOD IS NOT PART OF PUBLIC API. This method may be changed or removed at any moment in time. + * + * @return {@code RESET} response stage + */ + public CompletionStage interruptAsync() { + return executeWithLock(lock, () -> { + if (interruptStage == null) { + markTerminated(null); + interruptStage = connection.reset(); + } + return interruptStage; + }); + } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveTransaction.java index dc44b02f08..adebf16337 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveTransaction.java @@ -57,4 +57,15 @@ public Publisher run(Query query) { }) .map(InternalReactiveResult::new); } + + /** + * Marks transaction as terminated and sends {@code RESET} message over allocated connection. + *

+ * THIS METHOD IS NOT PART OF PUBLIC API. This method may be changed or removed at any moment in time. + * + * @return {@code RESET} response publisher + */ + public Publisher interrupt() { + return Mono.fromCompletionStage(tx.interruptAsync()); + } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java index 92397c1807..7f6e012cfa 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java @@ -440,6 +440,33 @@ void shouldReturnCompletedWithNullStageOnClosingInactiveTransactionExceptCommitt assertNull(closeStage.toCompletableFuture().join()); } + @Test + void shouldInterruptOnInterruptAsync() { + // Given + Connection connection = connectionMock(BoltProtocolV4.INSTANCE); + UnmanagedTransaction tx = beginTx(connection); + + // When + await(tx.interruptAsync()); + + // Then + then(connection).should().reset(); + } + + @Test + void shouldServeTheSameStageOnInterruptAsync() { + // Given + Connection connection = connectionMock(BoltProtocolV4.INSTANCE); + UnmanagedTransaction tx = beginTx(connection); + + // When + CompletionStage stage0 = tx.interruptAsync(); + CompletionStage stage1 = tx.interruptAsync(); + + // Then + assertEquals(stage0, stage1); + } + private static UnmanagedTransaction beginTx(Connection connection) { return beginTx(connection, Collections.emptySet()); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalReactiveTransactionTest.java b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalReactiveTransactionTest.java new file mode 100644 index 0000000000..c8c426a72d --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalReactiveTransactionTest.java @@ -0,0 +1,62 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.reactive; + +import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.then; +import static org.mockito.Mockito.mock; +import static org.neo4j.driver.internal.util.Futures.failedFuture; + +import org.junit.jupiter.api.Test; +import org.neo4j.driver.internal.async.UnmanagedTransaction; +import reactor.test.StepVerifier; + +public class InternalReactiveTransactionTest { + private InternalReactiveTransaction tx; + + @Test + void shouldDelegateInterrupt() { + // Given + UnmanagedTransaction utx = mock(UnmanagedTransaction.class); + given(utx.interruptAsync()).willReturn(completedFuture(null)); + tx = new InternalReactiveTransaction(utx); + + // When + StepVerifier.create(tx.interrupt()).expectComplete().verify(); + + // Then + then(utx).should().interruptAsync(); + } + + @Test + void shouldDelegateInterruptAndReportError() { + // Given + UnmanagedTransaction utx = mock(UnmanagedTransaction.class); + RuntimeException e = mock(RuntimeException.class); + given(utx.interruptAsync()).willReturn(failedFuture(e)); + tx = new InternalReactiveTransaction(utx); + + // When + StepVerifier.create(tx.interrupt()).expectErrorMatches(ar -> ar == e).verify(); + + // Then + then(utx).should().interruptAsync(); + } +}