From 960d77b634e145e99fa97f5b8fd183ee3a03197c Mon Sep 17 00:00:00 2001 From: Dmitriy Tverdiakov Date: Mon, 6 Jun 2022 13:54:35 +0100 Subject: [PATCH 1/2] Add transaction interruption support for internal use This update is for Neo4j internal use only. --- .../internal/async/UnmanagedTransaction.java | 18 ++++++ .../reactive/InternalReactiveTransaction.java | 11 ++++ .../async/UnmanagedTransactionTest.java | 27 ++++++++ .../InternalReactiveTransactionTest.java | 62 +++++++++++++++++++ 4 files changed, 118 insertions(+) create mode 100644 driver/src/test/java/org/neo4j/driver/internal/reactive/InternalReactiveTransactionTest.java diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java index e16179ba67..3b60d35900 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java @@ -91,6 +91,7 @@ private enum State { private CompletableFuture commitFuture; private CompletableFuture rollbackFuture; private Throwable causeOfTermination; + private CompletionStage interruptStage; public UnmanagedTransaction(Connection connection, BookmarksHolder bookmarksHolder, long fetchSize) { this(connection, bookmarksHolder, fetchSize, new ResultCursorsHolder()); @@ -303,4 +304,21 @@ private CompletionStage closeAsync(boolean commit, boolean completeWithNul return stage; } + + /** + * Marks transaction as terminated and sends {@code RESET} message over allocated connection. + *

+ * THIS METHOD IS NOT PART OF PUBLIC API + * + * @return {@code RESET} response stage + */ + public CompletionStage interruptAsync() { + return executeWithLock(lock, () -> { + if (interruptStage == null) { + markTerminated(null); + interruptStage = connection.reset(); + } + return interruptStage; + }); + } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveTransaction.java index dc44b02f08..c1875fc293 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveTransaction.java @@ -57,4 +57,15 @@ public Publisher run(Query query) { }) .map(InternalReactiveResult::new); } + + /** + * Marks transaction as terminated and sends {@code RESET} message over allocated connection. + *

+ * THIS METHOD IS NOT PART OF PUBLIC API + * + * @return {@code RESET} response publisher + */ + public Publisher interrupt() { + return Mono.fromCompletionStage(tx.interruptAsync()); + } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java index 92397c1807..7f6e012cfa 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java @@ -440,6 +440,33 @@ void shouldReturnCompletedWithNullStageOnClosingInactiveTransactionExceptCommitt assertNull(closeStage.toCompletableFuture().join()); } + @Test + void shouldInterruptOnInterruptAsync() { + // Given + Connection connection = connectionMock(BoltProtocolV4.INSTANCE); + UnmanagedTransaction tx = beginTx(connection); + + // When + await(tx.interruptAsync()); + + // Then + then(connection).should().reset(); + } + + @Test + void shouldServeTheSameStageOnInterruptAsync() { + // Given + Connection connection = connectionMock(BoltProtocolV4.INSTANCE); + UnmanagedTransaction tx = beginTx(connection); + + // When + CompletionStage stage0 = tx.interruptAsync(); + CompletionStage stage1 = tx.interruptAsync(); + + // Then + assertEquals(stage0, stage1); + } + private static UnmanagedTransaction beginTx(Connection connection) { return beginTx(connection, Collections.emptySet()); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalReactiveTransactionTest.java b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalReactiveTransactionTest.java new file mode 100644 index 0000000000..c8c426a72d --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalReactiveTransactionTest.java @@ -0,0 +1,62 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.reactive; + +import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.then; +import static org.mockito.Mockito.mock; +import static org.neo4j.driver.internal.util.Futures.failedFuture; + +import org.junit.jupiter.api.Test; +import org.neo4j.driver.internal.async.UnmanagedTransaction; +import reactor.test.StepVerifier; + +public class InternalReactiveTransactionTest { + private InternalReactiveTransaction tx; + + @Test + void shouldDelegateInterrupt() { + // Given + UnmanagedTransaction utx = mock(UnmanagedTransaction.class); + given(utx.interruptAsync()).willReturn(completedFuture(null)); + tx = new InternalReactiveTransaction(utx); + + // When + StepVerifier.create(tx.interrupt()).expectComplete().verify(); + + // Then + then(utx).should().interruptAsync(); + } + + @Test + void shouldDelegateInterruptAndReportError() { + // Given + UnmanagedTransaction utx = mock(UnmanagedTransaction.class); + RuntimeException e = mock(RuntimeException.class); + given(utx.interruptAsync()).willReturn(failedFuture(e)); + tx = new InternalReactiveTransaction(utx); + + // When + StepVerifier.create(tx.interrupt()).expectErrorMatches(ar -> ar == e).verify(); + + // Then + then(utx).should().interruptAsync(); + } +} From 4d64c4cdbf5737e1ae85148d6ac88cefb87233ba Mon Sep 17 00:00:00 2001 From: Dmitriy Tverdiakov Date: Wed, 8 Jun 2022 10:11:33 +0100 Subject: [PATCH 2/2] Add additional warning to documentation --- .../org/neo4j/driver/internal/async/UnmanagedTransaction.java | 2 +- .../driver/internal/reactive/InternalReactiveTransaction.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java index 3b60d35900..b2c38a2a7c 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java @@ -308,7 +308,7 @@ private CompletionStage closeAsync(boolean commit, boolean completeWithNul /** * Marks transaction as terminated and sends {@code RESET} message over allocated connection. *

- * THIS METHOD IS NOT PART OF PUBLIC API + * THIS METHOD IS NOT PART OF PUBLIC API. This method may be changed or removed at any moment in time. * * @return {@code RESET} response stage */ diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveTransaction.java index c1875fc293..adebf16337 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveTransaction.java @@ -61,7 +61,7 @@ public Publisher run(Query query) { /** * Marks transaction as terminated and sends {@code RESET} message over allocated connection. *

- * THIS METHOD IS NOT PART OF PUBLIC API + * THIS METHOD IS NOT PART OF PUBLIC API. This method may be changed or removed at any moment in time. * * @return {@code RESET} response publisher */