From 12d8ffed0bf16d722ccc5abba4e003fbf7796c48 Mon Sep 17 00:00:00 2001 From: Love Leifland Date: Wed, 8 Mar 2023 08:45:24 +0100 Subject: [PATCH] Restore Session.reset as a private API This reverts commit 33447f573986c6117fa98ec7ccc85b787830cd15. --- .../driver/internal/InternalSession.java | 7 + .../driver/internal/async/NetworkSession.java | 18 + .../internal/reactive/InternalRxSession.java | 6 + .../driver/integration/SessionResetIT.java | 754 ++++++++++++++++++ .../async/InternalAsyncTransactionTest.java | 16 + .../internal/async/NetworkSessionTest.java | 36 + .../reactive/InternalRxSessionTest.java | 16 + .../driver/testutil/DatabaseExtension.java | 10 + .../org/neo4j/driver/testutil/TestUtil.java | 38 + .../test/resources/longRunningStatement.jar | Bin 4675 -> 4538 bytes test-procedures/README.md | 8 + test-procedures/pom.xml | 54 ++ .../neo4j/driver/LongRunningProcedures.java | 84 ++ 13 files changed, 1047 insertions(+) create mode 100644 driver/src/test/java/org/neo4j/driver/integration/SessionResetIT.java create mode 100644 test-procedures/README.md create mode 100644 test-procedures/pom.xml create mode 100644 test-procedures/src/main/java/org/neo4j/driver/LongRunningProcedures.java diff --git a/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java b/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java index 4e82b18bcc..ba0c1e158e 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java @@ -140,6 +140,13 @@ public Set lastBookmarks() { return session.lastBookmarks(); } + // Private API + public void reset() { + Futures.blockingGet( + session.resetAsync(), + () -> terminateConnectionOnThreadInterrupt("Thread interrupted while resetting the session")); + } + private T transaction( AccessMode mode, @SuppressWarnings("deprecation") TransactionWork work, TransactionConfig config) { // use different code path compared to async so that work is executed in the caller thread diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java index 57b02a713c..be43e241e8 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java @@ -164,6 +164,24 @@ public CompletionStage beginTransactionAsync( return newTransactionStage; } + // Private API + public CompletionStage resetAsync() { + return existingTransactionOrNull() + .thenAccept(tx -> { + if (tx != null) { + tx.markTerminated(null); + } + }) + .thenCompose(ignore -> connectionStage) + .thenCompose(connection -> { + if (connection != null) { + // there exists an active connection, send a RESET message over it + return connection.reset(); + } + return completedWithNull(); + }); + } + public RetryLogic retryLogic() { return retryLogic; } diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java index 7dd371c34a..8ac4c6817f 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java @@ -18,6 +18,8 @@ */ package org.neo4j.driver.internal.reactive; +import static org.neo4j.driver.internal.reactive.RxUtils.createEmptyPublisher; + import java.util.Map; import java.util.concurrent.CompletableFuture; import org.neo4j.driver.AccessMode; @@ -129,6 +131,10 @@ public Bookmark lastBookmark() { return InternalBookmark.from(session.lastBookmarks()); } + public Publisher reset() { + return createEmptyPublisher(session::resetAsync); + } + @Override public Publisher close() { return doClose(); diff --git a/driver/src/test/java/org/neo4j/driver/integration/SessionResetIT.java b/driver/src/test/java/org/neo4j/driver/integration/SessionResetIT.java new file mode 100644 index 0000000000..bf911d356d --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/integration/SessionResetIT.java @@ -0,0 +1,754 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.integration; + +import static java.util.Collections.newSetFromMap; +import static java.util.concurrent.CompletableFuture.runAsync; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.stream.IntStream.range; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.startsWith; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.junit.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.neo4j.driver.Values.parameters; +import static org.neo4j.driver.testutil.DaemonThreadFactory.daemon; +import static org.neo4j.driver.testutil.TestUtil.activeQueryCount; +import static org.neo4j.driver.testutil.TestUtil.activeQueryNames; +import static org.neo4j.driver.testutil.TestUtil.await; +import static org.neo4j.driver.testutil.TestUtil.awaitCondition; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.URI; +import java.nio.channels.ClosedChannelException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import org.hamcrest.CoreMatchers; +import org.hamcrest.MatcherAssert; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.neo4j.driver.Driver; +import org.neo4j.driver.Result; +import org.neo4j.driver.Session; +import org.neo4j.driver.SimpleQueryRunner; +import org.neo4j.driver.Transaction; +import org.neo4j.driver.exceptions.ClientException; +import org.neo4j.driver.exceptions.Neo4jException; +import org.neo4j.driver.exceptions.ServiceUnavailableException; +import org.neo4j.driver.exceptions.TransientException; +import org.neo4j.driver.internal.InternalSession; +import org.neo4j.driver.testutil.DatabaseExtension; +import org.neo4j.driver.testutil.ParallelizableIT; +import org.neo4j.driver.testutil.TestUtil; +import org.testcontainers.utility.MountableFile; + +@ParallelizableIT +class SessionResetIT { + private static final int CSV_FILE_SIZE = 10_000; + private static final int LOAD_CSV_BATCH_SIZE = 10; + + private static final String SHORT_QUERY_1 = "CREATE (n:Node {name: 'foo', occupation: 'bar'})"; + private static final String SHORT_QUERY_2 = "MATCH (n:Node {name: 'foo'}) RETURN count(n)"; + private static final String LONG_QUERY = "UNWIND range(0, 10000000) AS i CREATE (n:Node {idx: i}) DELETE n"; + private static final String LONG_PERIODIC_COMMIT_QUERY_TEMPLATE = + """ + USING PERIODIC COMMIT 1 + LOAD CSV FROM '%%s' AS line + UNWIND range(1, %d) AS index + CREATE (n:Node {id: index, name: line[0], occupation: line[1]}) + """ + .formatted(LOAD_CSV_BATCH_SIZE); + private static final String LONG_CALL_IN_TX_QUERY_TEMPLATE = + """ + LOAD CSV FROM '%%s' AS line + CALL { + WITH line + UNWIND range(1, %d) as index + CREATE (n:Node {id: index, name: line[0], occupation: line[1]}) + } IN TRANSACTIONS OF 1 ROW + """ + .formatted(LOAD_CSV_BATCH_SIZE); + + private static final int STRESS_TEST_THREAD_COUNT = Runtime.getRuntime().availableProcessors() * 2; + private static final long STRESS_TEST_DURATION_MS = SECONDS.toMillis(5); + private static final String[] STRESS_TEST_QUERIES = {SHORT_QUERY_1, SHORT_QUERY_2, LONG_QUERY}; + private static final String LONG_RUNNING_PLUGIN_PATH = "/longRunningStatement.jar"; + + @RegisterExtension + static final DatabaseExtension neo4j = + new DatabaseExtension().installPlugin(MountableFile.forClasspathResource(LONG_RUNNING_PLUGIN_PATH)); + + private ExecutorService executor; + + @BeforeEach + void setUp() { + executor = Executors.newCachedThreadPool(daemon(getClass().getSimpleName() + "-thread")); + } + + @AfterEach + void tearDown() { + if (executor != null) { + executor.shutdownNow(); + } + } + + @Test + void shouldTerminateAutoCommitQuery() { + testQueryTermination(LONG_QUERY, true); + } + + @Test + void shouldTerminateQueryInUnmanagedTransaction() { + testQueryTermination(LONG_QUERY, false); + } + + /** + * It is currently unsafe to terminate periodic commit query because it'll then be half-committed. + * So the driver give no guarantee when the periodic commit could be terminated. + * For a user who want to terminate a periodic commit, he or she should use kill query by id. + */ + @Test + void shouldTerminatePeriodicCommitQueryRandomly() { + Future queryResult = runQueryInDifferentThreadAndResetSession(longPeriodicCommitQuery(), true); + + final var e = assertThrows(ExecutionException.class, () -> queryResult.get(1, MINUTES)); + assertThat(e.getMessage(), containsString("The transaction has been terminated")); + assertThat(e.getCause(), instanceOf(Neo4jException.class)); + + awaitNoActiveQueries(); + + assertThat(countNodes(), lessThanOrEqualTo(((long) CSV_FILE_SIZE) * LOAD_CSV_BATCH_SIZE)); + } + + @Test + void shouldTerminateAutoCommitQueriesRandomly() throws Exception { + testRandomQueryTermination(true); + } + + @Test + void shouldTerminateQueriesInUnmanagedTransactionsRandomly() throws Exception { + testRandomQueryTermination(false); + } + + @Test + void shouldRejectNewTransactionWhenOpenTransactionExistsAndShouldFailRunResultOnSessionReset() { + try (Session session = neo4j.driver().session()) { + Transaction tx1 = session.beginTransaction(); + + CompletableFuture txRunFuture = CompletableFuture.runAsync( + () -> tx1.run("CALL test.driver.longRunningStatement($seconds)", parameters("seconds", 10))); + + awaitActiveQueriesToContain("CALL test.driver.longRunningStatement"); + ((InternalSession) session).reset(); + + ClientException e1 = assertThrows(ClientException.class, session::beginTransaction); + assertThat( + e1.getMessage(), + containsString("You cannot begin a transaction on a session with an open transaction")); + + ClientException e2 = assertThrows(ClientException.class, () -> tx1.run("RETURN 1")); + assertThat(e2.getMessage(), containsString("Cannot run more queries in this transaction")); + + // Make sure failure from the terminated long running query is propagated + Neo4jException e3 = assertThrows(Neo4jException.class, () -> await(txRunFuture)); + assertThat(e3.getMessage(), containsString("The transaction has been terminated")); + } + } + + @Test + void shouldSuccessfullyCloseAfterSessionReset() { + try (Session session = neo4j.driver().session()) { + CompletableFuture.runAsync( + () -> session.run("CALL test.driver.longRunningStatement($seconds)", parameters("seconds", 10))); + + awaitActiveQueriesToContain("CALL test.driver.longRunningStatement"); + ((InternalSession) session).reset(); + } + } + + @Test + void shouldBeAbleToBeginNewTransactionAfterFirstTransactionInterruptedBySessionResetIsClosed() { + try (Session session = neo4j.driver().session()) { + Transaction tx1 = session.beginTransaction(); + + CompletableFuture txRunFuture = runAsync( + () -> tx1.run("CALL test.driver.longRunningStatement($seconds)", parameters("seconds", 10))); + + awaitActiveQueriesToContain("CALL test.driver.longRunningStatement"); + ((InternalSession) session).reset(); + + Neo4jException e = assertThrows(Neo4jException.class, () -> await(txRunFuture)); + assertThat(e.getMessage(), containsString("The transaction has been terminated")); + tx1.close(); + + try (Transaction tx2 = session.beginTransaction()) { + tx2.run("CREATE (n:FirstNode)"); + tx2.commit(); + } + + Result result = session.run("MATCH (n) RETURN count(n)"); + long nodes = result.single().get("count(n)").asLong(); + MatcherAssert.assertThat(nodes, equalTo(1L)); + } + } + + @Test + void shouldKillLongRunningQuery() { + final int executionTimeout = 10; // 10s + final int killTimeout = 1; // 1s + final AtomicLong startTime = new AtomicLong(-1); + long endTime; + + try (Session session = neo4j.driver().session()) { + CompletableFuture sessionRunFuture = CompletableFuture.runAsync(() -> { + // When + startTime.set(System.currentTimeMillis()); + session.run("CALL test.driver.longRunningStatement($seconds)", parameters("seconds", executionTimeout)); + }); + + resetSessionAfterTimeout(session, killTimeout); + + assertThrows(Neo4jException.class, () -> await(sessionRunFuture)); + } + + endTime = System.currentTimeMillis(); + assertTrue(startTime.get() > 0); + assertTrue(endTime - startTime.get() > killTimeout * 1000); // get reset by session.reset + assertTrue(endTime - startTime.get() < executionTimeout * 1000 / 2); // finished before execution finished + } + + @Test + void shouldKillLongStreamingResult() { + // Given + final int executionTimeout = 10; // 10s + final int killTimeout = 1; // 1s + final AtomicInteger recordCount = new AtomicInteger(); + final AtomicLong startTime = new AtomicLong(-1); + long endTime; + + Neo4jException e = assertThrows(Neo4jException.class, () -> { + try (Session session = neo4j.driver().session()) { + Result result = session.run( + "CALL test.driver.longStreamingResult($seconds)", parameters("seconds", executionTimeout)); + + resetSessionAfterTimeout(session, killTimeout); + + // When + startTime.set(System.currentTimeMillis()); + while (result.hasNext()) { + result.next(); + recordCount.incrementAndGet(); + } + } + }); + + endTime = System.currentTimeMillis(); + assertThat(e.getMessage(), containsString("The transaction has been terminated")); + assertThat(recordCount.get(), greaterThan(1)); + + assertTrue(startTime.get() > 0); + assertTrue(endTime - startTime.get() > killTimeout * 1000); // get reset by session.reset + assertTrue(endTime - startTime.get() < executionTimeout * 1000 / 2); // finished before execution finished + } + + private void resetSessionAfterTimeout(Session session, int timeout) { + executor.submit(() -> { + try { + Thread.sleep(timeout * 1000); // let the query execute for timeout seconds + } catch (InterruptedException ignore) { + } finally { + ((InternalSession) session).reset(); // reset the session after timeout + } + }); + } + + @Test + void shouldAllowMoreQueriesAfterSessionReset() { + // Given + try (Session session = neo4j.driver().session()) { + + session.run("RETURN 1").consume(); + + // When reset the state of this session + ((InternalSession) session).reset(); + + // Then can run successfully more queries without any error + session.run("RETURN 2").consume(); + } + } + + @Test + void shouldAllowMoreTxAfterSessionReset() { + // Given + try (Session session = neo4j.driver().session()) { + try (Transaction tx = session.beginTransaction()) { + tx.run("RETURN 1"); + tx.commit(); + } + + // When reset the state of this session + ((InternalSession) session).reset(); + + // Then can run more Tx + try (Transaction tx = session.beginTransaction()) { + tx.run("RETURN 2"); + tx.commit(); + } + } + } + + @Test + void shouldMarkTxAsFailedAndDisallowRunAfterSessionReset() { + // Given + try (Session session = neo4j.driver().session()) { + Transaction tx = session.beginTransaction(); + // When reset the state of this session + ((InternalSession) session).reset(); + + // Then + Exception e = assertThrows(Exception.class, () -> { + tx.run("RETURN 1"); + tx.commit(); + }); + assertThat(e.getMessage(), startsWith("Cannot run more queries in this transaction")); + } + } + + @Test + void shouldAllowMoreTxAfterSessionResetInTx() { + // Given + try (Session session = neo4j.driver().session()) { + try (Transaction ignore = session.beginTransaction()) { + // When reset the state of this session + ((InternalSession) session).reset(); + } + + // Then can run more Tx + try (Transaction tx = session.beginTransaction()) { + tx.run("RETURN 2"); + tx.commit(); + } + } + } + + @Test + void resetShouldStopQueryWaitingForALock() throws Exception { + testResetOfQueryWaitingForLock(new NodeIdUpdater() { + @Override + void performUpdate( + Driver driver, + int nodeId, + int newNodeId, + AtomicReference usedSessionRef, + CountDownLatch latchToWait) + throws Exception { + try (Session session = driver.session()) { + usedSessionRef.set(session); + latchToWait.await(); + Result result = updateNodeId(session, nodeId, newNodeId); + result.consume(); + } + } + }); + } + + @Test + void resetShouldStopTransactionWaitingForALock() throws Exception { + testResetOfQueryWaitingForLock(new NodeIdUpdater() { + @Override + public void performUpdate( + Driver driver, + int nodeId, + int newNodeId, + AtomicReference usedSessionRef, + CountDownLatch latchToWait) + throws Exception { + try (Session session = neo4j.driver().session(); + Transaction tx = session.beginTransaction()) { + usedSessionRef.set(session); + latchToWait.await(); + Result result = updateNodeId(tx, nodeId, newNodeId); + result.consume(); + } + } + }); + } + + @Test + void resetShouldStopWriteTransactionWaitingForALock() throws Exception { + AtomicInteger invocationsOfWork = new AtomicInteger(); + + testResetOfQueryWaitingForLock(new NodeIdUpdater() { + @Override + public void performUpdate( + Driver driver, + int nodeId, + int newNodeId, + AtomicReference usedSessionRef, + CountDownLatch latchToWait) + throws Exception { + try (Session session = driver.session()) { + usedSessionRef.set(session); + latchToWait.await(); + + session.executeWrite(tx -> { + invocationsOfWork.incrementAndGet(); + Result result = updateNodeId(tx, nodeId, newNodeId); + result.consume(); + return null; + }); + } + } + }); + + assertEquals(1, invocationsOfWork.get()); + } + + @Test + void shouldBeAbleToRunMoreQueriesAfterResetOnNoErrorState() { + try (Session session = neo4j.driver().session()) { + // Given + ((InternalSession) session).reset(); + + // When + Transaction tx = session.beginTransaction(); + tx.run("CREATE (n:FirstNode)"); + tx.commit(); + + // Then the outcome of both queries should be visible + Result result = session.run("MATCH (n) RETURN count(n)"); + long nodes = result.single().get("count(n)").asLong(); + assertThat(nodes, equalTo(1L)); + } + } + + @Test + void shouldHandleResetBeforeRun() { + try (Session session = neo4j.driver().session(); + Transaction tx = session.beginTransaction()) { + ((InternalSession) session).reset(); + + ClientException e = assertThrows(ClientException.class, () -> tx.run("CREATE (n:FirstNode)")); + assertThat(e.getMessage(), containsString("Cannot run more queries in this transaction")); + } + } + + @Test + void shouldHandleResetFromMultipleThreads() throws Throwable { + Session session = neo4j.driver().session(); + + CountDownLatch beforeCommit = new CountDownLatch(1); + CountDownLatch afterReset = new CountDownLatch(1); + + Future txFuture = executor.submit(() -> { + Transaction tx1 = session.beginTransaction(); + tx1.run("CREATE (n:FirstNode)"); + beforeCommit.countDown(); + afterReset.await(); + + // session has been reset, it should not be possible to commit the transaction + try { + tx1.commit(); + } catch (Neo4jException ignore) { + } + + try (Transaction tx2 = session.beginTransaction()) { + tx2.run("CREATE (n:SecondNode)"); + tx2.commit(); + } + + return null; + }); + + Future resetFuture = executor.submit(() -> { + beforeCommit.await(); + ((InternalSession) session).reset(); + afterReset.countDown(); + return null; + }); + + executor.shutdown(); + executor.awaitTermination(20, SECONDS); + + txFuture.get(20, SECONDS); + resetFuture.get(20, SECONDS); + + assertEquals(0, countNodes("FirstNode")); + assertEquals(1, countNodes("SecondNode")); + } + + private void testResetOfQueryWaitingForLock(NodeIdUpdater nodeIdUpdater) throws Exception { + int nodeId = 42; + int newNodeId1 = 4242; + int newNodeId2 = 424242; + + createNodeWithId(nodeId); + + CountDownLatch nodeLocked = new CountDownLatch(1); + AtomicReference otherSessionRef = new AtomicReference<>(); + + try (Session session = neo4j.driver().session(); + Transaction tx = session.beginTransaction()) { + Future txResult = nodeIdUpdater.update(nodeId, newNodeId1, otherSessionRef, nodeLocked); + + Result result = updateNodeId(tx, nodeId, newNodeId2); + result.consume(); + + nodeLocked.countDown(); + // give separate thread some time to block on a lock + Thread.sleep(2_000); + ((InternalSession) otherSessionRef.get()).reset(); + + assertTransactionTerminated(txResult); + tx.commit(); + } + + try (Session session = neo4j.driver().session()) { + Result result = session.run("MATCH (n) RETURN n.id AS id"); + int value = result.single().get("id").asInt(); + assertEquals(newNodeId2, value); + } + } + + private void createNodeWithId(int id) { + try (Session session = neo4j.driver().session()) { + session.run("CREATE (n {id: $id})", parameters("id", id)); + } + } + + private static Result updateNodeId(SimpleQueryRunner queryRunner, int currentId, int newId) { + return queryRunner.run( + "MATCH (n {id: $currentId}) SET n.id = $newId", parameters("currentId", currentId, "newId", newId)); + } + + private static void assertTransactionTerminated(Future work) { + ExecutionException e = assertThrows(ExecutionException.class, () -> work.get(20, TimeUnit.SECONDS)); + assertThat(e.getCause(), CoreMatchers.instanceOf(ClientException.class)); + assertThat(e.getCause().getMessage(), startsWith("The transaction has been terminated")); + } + + private void testRandomQueryTermination(boolean autoCommit) throws Exception { + Set runningSessions = newSetFromMap(new ConcurrentHashMap<>()); + AtomicBoolean stop = new AtomicBoolean(); + List> futures = new ArrayList<>(); + + for (int i = 0; i < STRESS_TEST_THREAD_COUNT; i++) { + futures.add(executor.submit(() -> { + ThreadLocalRandom random = ThreadLocalRandom.current(); + while (!stop.get()) { + runRandomQuery(autoCommit, random, runningSessions, stop); + } + })); + } + + long deadline = System.currentTimeMillis() + STRESS_TEST_DURATION_MS; + while (!stop.get()) { + if (System.currentTimeMillis() > deadline) { + stop.set(true); + } + + resetAny(runningSessions); + + MILLISECONDS.sleep(30); + } + + futures.forEach(TestUtil::await); + awaitNoActiveQueries(); + } + + private void runRandomQuery(boolean autoCommit, Random random, Set runningSessions, AtomicBoolean stop) { + try { + Session session = neo4j.driver().session(); + runningSessions.add(session); + try { + String query = STRESS_TEST_QUERIES[random.nextInt(STRESS_TEST_QUERIES.length - 1)]; + runQuery(session, query, autoCommit); + } finally { + runningSessions.remove(session); + session.close(); + } + } catch (Throwable error) { + if (!stop.get() && !isAcceptable(error)) { + stop.set(true); + throw error; + } + // else it is fine to receive some errors from the driver because + // sessions are being reset concurrently by the main thread, driver can also be closed concurrently + } + } + + private void testQueryTermination(String query, boolean autoCommit) { + Future queryResult = runQueryInDifferentThreadAndResetSession(query, autoCommit); + ExecutionException e = assertThrows(ExecutionException.class, () -> queryResult.get(10, SECONDS)); + assertThat(e.getCause(), instanceOf(Neo4jException.class)); + awaitNoActiveQueries(); + } + + private Future runQueryInDifferentThreadAndResetSession(String query, boolean autoCommit) { + AtomicReference sessionRef = new AtomicReference<>(); + + Future queryResult = runAsync(() -> { + Session session = neo4j.driver().session(); + sessionRef.set(session); + runQuery(session, query, autoCommit); + }); + + awaitActiveQueriesToContain(query); + + Session session = sessionRef.get(); + assertNotNull(session); + ((InternalSession) session).reset(); + + return queryResult; + } + + private static void runQuery(Session session, String query, boolean autoCommit) { + if (autoCommit) { + session.run(query).consume(); + } else { + try (Transaction tx = session.beginTransaction()) { + tx.run(query); + tx.commit(); + } + } + } + + private void awaitNoActiveQueries() { + awaitCondition(() -> activeQueryCount(neo4j) == 0); + } + + private void awaitActiveQueriesToContain(String value) { + awaitCondition(() -> activeQueryNames(neo4j).stream().anyMatch(query -> query.contains(value))); + } + + private long countNodes() { + return countNodes(null); + } + + private long countNodes(String label) { + try (Session session = neo4j.driver().session()) { + Result result = + session.run("MATCH (n" + (label == null ? "" : ":" + label) + ") RETURN count(n) AS result"); + return result.single().get(0).asLong(); + } + } + + private static void resetAny(Set sessions) { + sessions.stream().findAny().ifPresent(session -> { + if (sessions.remove(session)) { + resetSafely(session); + } + }); + } + + private static void resetSafely(Session session) { + try { + if (session.isOpen()) { + ((InternalSession) session).reset(); + } + } catch (ClientException e) { + if (session.isOpen()) { + throw e; + } + // else this thread lost race with close and it's fine + } + } + + private static boolean isAcceptable(Throwable error) { + // get the root cause + while (error.getCause() != null) { + error = error.getCause(); + } + + return isTransactionTerminatedException(error) + || error instanceof ServiceUnavailableException + || error instanceof ClientException + || error instanceof ClosedChannelException; + } + + private static boolean isTransactionTerminatedException(Throwable error) { + return error instanceof TransientException + && error.getMessage().startsWith("The transaction has been terminated") + || error.getMessage().startsWith("Trying to execute query in a terminated transaction"); + } + + private String longPeriodicCommitQuery() { + URI fileUri = createTmpCsvFile(); + final var query = + neo4j.isNeo4j44OrEarlier() ? LONG_PERIODIC_COMMIT_QUERY_TEMPLATE : LONG_CALL_IN_TX_QUERY_TEMPLATE; + return String.format(query, fileUri); + } + + private static URI createTmpCsvFile() { + try { + final String content = range(0, CSV_FILE_SIZE) + .mapToObj(i -> "Foo-" + i + ", Bar-" + i) + .collect(Collectors.joining("\n")); + final String path = neo4j.addImportFile(SessionResetIT.class.getSimpleName(), ".csv", content); + return URI.create(path); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private abstract class NodeIdUpdater { + final Future update( + int nodeId, int newNodeId, AtomicReference usedSessionRef, CountDownLatch latchToWait) { + return executor.submit(() -> { + performUpdate(neo4j.driver(), nodeId, newNodeId, usedSessionRef, latchToWait); + return null; + }); + } + + abstract void performUpdate( + Driver driver, + int nodeId, + int newNodeId, + AtomicReference usedSessionRef, + CountDownLatch latchToWait) + throws Exception; + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/InternalAsyncTransactionTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/InternalAsyncTransactionTest.java index a950ee1e03..191fb448b6 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/InternalAsyncTransactionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/InternalAsyncTransactionTest.java @@ -21,6 +21,8 @@ import static java.util.Collections.singletonList; import static java.util.Collections.singletonMap; import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; import static org.junit.Assert.assertFalse; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -53,11 +55,13 @@ import org.neo4j.driver.Value; import org.neo4j.driver.async.AsyncTransaction; import org.neo4j.driver.async.ResultCursor; +import org.neo4j.driver.exceptions.ClientException; import org.neo4j.driver.internal.DatabaseNameUtil; import org.neo4j.driver.internal.InternalRecord; import org.neo4j.driver.internal.messaging.v4.BoltProtocolV4; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ConnectionProvider; +import org.neo4j.driver.internal.util.Futures; import org.neo4j.driver.internal.value.IntegerValue; import org.neo4j.driver.summary.ResultSummary; @@ -120,6 +124,18 @@ void shouldRollback() { assertFalse(tx.isOpen()); } + @Test + void shouldRollbackWhenFailedRun() { + Futures.blockingGet(networkSession.resetAsync()); + ClientException clientException = assertThrows(ClientException.class, () -> await(tx.commitAsync())); + + assertThat( + clientException.getMessage(), + containsString("It has been rolled back either because of an error or explicit termination")); + verify(connection).release(); + assertFalse(tx.isOpen()); + } + @Test void shouldReleaseConnectionWhenFailedToCommit() { setupFailingCommit(connection); diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/NetworkSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/NetworkSessionTest.java index 5aba0c63c7..2e8016a9a2 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/NetworkSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/NetworkSessionTest.java @@ -72,6 +72,7 @@ import org.neo4j.driver.internal.messaging.v4.BoltProtocolV4; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ConnectionProvider; +import org.neo4j.driver.testutil.TestUtil; class NetworkSessionTest { private static final String DATABASE = "neo4j"; @@ -197,6 +198,13 @@ void releasesOpenConnectionUsedForRunWhenSessionIsClosed() { inOrder.verify(connection, atLeastOnce()).release(); } + @Test + void resetDoesNothingWhenNoTransactionAndNoConnection() { + TestUtil.await(session.resetAsync()); + + verify(connectionProvider, never()).acquireConnection(any(ConnectionContext.class)); + } + @Test void closeWithoutConnection() { NetworkSession session = newSession(connectionProvider); @@ -312,6 +320,22 @@ void testPassingNoBookmarkShouldRetainBookmark() { assertThat(session.lastBookmarks(), equalTo(bookmarks)); } + @Test + void connectionShouldBeResetAfterSessionReset() { + String query = "RETURN 1"; + setupSuccessfulRunAndPull(connection, query); + + run(session, query); + + InOrder connectionInOrder = inOrder(connection); + connectionInOrder.verify(connection, never()).reset(); + connectionInOrder.verify(connection).release(); + + await(session.resetAsync()); + connectionInOrder.verify(connection).reset(); + connectionInOrder.verify(connection, never()).release(); + } + @Test void shouldHaveEmptyLastBookmarksInitially() { assertTrue(session.lastBookmarks().isEmpty()); @@ -438,6 +462,18 @@ void shouldBeginTxAfterRunFailureToAcquireConnection() { verifyBeginTx(connection); } + @Test + void shouldMarkTransactionAsTerminatedAndThenResetConnectionOnReset() { + UnmanagedTransaction tx = beginTransaction(session); + + assertTrue(tx.isOpen()); + verify(connection, never()).reset(); + + TestUtil.await(session.resetAsync()); + + verify(connection).reset(); + } + private static ResultCursor run(NetworkSession session, String query) { return await(session.runAsync(new Query(query), TransactionConfig.empty())); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxSessionTest.java index e43b91bdf9..0afbbc28d6 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxSessionTest.java @@ -295,6 +295,22 @@ void shouldDelegateBookmarks() { verifyNoMoreInteractions(session); } + @Test + void shouldDelegateReset() throws Throwable { + // Given + NetworkSession session = mock(NetworkSession.class); + when(session.resetAsync()).thenReturn(completedWithNull()); + InternalRxSession rxSession = new InternalRxSession(session); + + // When + Publisher mono = rxSession.reset(); + + // Then + StepVerifier.create(mono).verifyComplete(); + verify(session).resetAsync(); + verifyNoMoreInteractions(session); + } + @Test void shouldDelegateClose() { // Given diff --git a/driver/src/test/java/org/neo4j/driver/testutil/DatabaseExtension.java b/driver/src/test/java/org/neo4j/driver/testutil/DatabaseExtension.java index 8bbba3e005..2ba6c271a4 100644 --- a/driver/src/test/java/org/neo4j/driver/testutil/DatabaseExtension.java +++ b/driver/src/test/java/org/neo4j/driver/testutil/DatabaseExtension.java @@ -182,6 +182,16 @@ public String addImportFile(String prefix, String suffix, String contents) throw return String.format("file:///%s", tmpFile.getName()); } + public DatabaseExtension installPlugin(MountableFile plugin) { + if (driver != null) driver.close(); + if (neo4jContainer != null) neo4jContainer.close(); + neo4jContainer = setupNeo4jContainer(cert, key, defaultConfig).withPlugins(plugin); + neo4jContainer.start(); + driver = GraphDatabase.driver(boltUri, authToken); + waitForBoltAvailability(); + return this; + } + public URI uri() { return boltUri; } diff --git a/driver/src/test/java/org/neo4j/driver/testutil/TestUtil.java b/driver/src/test/java/org/neo4j/driver/testutil/TestUtil.java index a9320edb05..35351e9012 100644 --- a/driver/src/test/java/org/neo4j/driver/testutil/TestUtil.java +++ b/driver/src/test/java/org/neo4j/driver/testutil/TestUtil.java @@ -24,6 +24,7 @@ import static java.util.stream.Collectors.toList; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.doAnswer; @@ -61,7 +62,9 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.BooleanSupplier; import java.util.function.Predicate; import org.mockito.ArgumentMatcher; import org.mockito.invocation.InvocationOnMock; @@ -485,6 +488,41 @@ public static void interruptWhenInWaitingState(Thread thread) { }); } + public static int activeQueryCount(DatabaseExtension db) { + return activeQueryNames(db).size(); + } + + public static List activeQueryNames(DatabaseExtension db) { + try (Session session = db.driver().session()) { + final var query = db.isNeo4j44OrEarlier() + ? "CALL dbms.listQueries() YIELD query RETURN query" + : "SHOW TRANSACTIONS YIELD currentQuery"; + return session.run(query).stream() + .map(record -> record.get(0).asString()) + .filter(q -> !q.contains(query)) // do not include show transactions query + .collect(toList()); + } + } + + public static void awaitCondition(BooleanSupplier condition) { + awaitCondition(condition, DEFAULT_WAIT_TIME_MS, MILLISECONDS); + } + + public static void awaitCondition(BooleanSupplier condition, long value, TimeUnit unit) { + long deadline = System.currentTimeMillis() + unit.toMillis(value); + while (!condition.getAsBoolean()) { + if (System.currentTimeMillis() > deadline) { + fail("Condition was not met in time"); + } + try { + MILLISECONDS.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + fail("Interrupted while waiting"); + } + } + } + public static String randomString(int size) { StringBuilder sb = new StringBuilder(size); ThreadLocalRandom random = ThreadLocalRandom.current(); diff --git a/driver/src/test/resources/longRunningStatement.jar b/driver/src/test/resources/longRunningStatement.jar index a5416792a74e0165ae83de9283718e21de65339c..6bd9612a6a2ce56f31ab3cf99d72edc294acbffa 100644 GIT binary patch literal 4538 zcmbVP2UwHY5)GkBjWnr(h9UXG&L@pit1~w>34PNK}1A*X&@qk zZ@RiGOvL-8re+Yrx^Fs!AtGuN{sFSSijnIT`4c!rgDXCXdx&ly3|5fwudsC(;tnlx2d-Q9L9D$1rkDR9th{7 z=#L}zQ{P9pLp`Kj{sZphzu+7ZFccJV1gH38?pi?iXq<@%a|Q$eq<#h0^Kf@I@pgBI zxjP#nJRG2o-Uz4{kD)gb?u`_6aJBRD!W!#CT2w$c$MW^r9h9HB>^x$+Yu$5|gf2Qc zY?yx3x-B5BRD##t5wjlb2Wq>e+{DFekjouS4jsuDk?XpjD)+>rTMR(^v;Z8(%h_r9 zN>xL@r3fuU)|Q}#&Q*BkWIk3A{#N$pdkzW5c-!}AerlJB>Hv)gctiP1%}CM}t!f8R zv681++9T_3opt5;?0d?pGWDLsXIa(X+uAhEQ#M1}E5bq;3^elJGLI{#RPpy^c%~+C zYN*w`VAY+R#b~cy`wq?;)#(-%J8M7a&?ELkd-O+toX|j!{&>QiUB`wWD05r&k==v} zGdtTNnQX84ozRzMn@?o5ABv@t;m=i{7~N(kJvR{x?Fv4XrV1frw~;c{p3e$Uxpfcw zfJQRu!##$CS+)4989}xJ$dK(pux0UXct3Gc0MH~QPcDljLLEU%4E|jOYoDa@kb>vx z325$RBb`;|vXo&#N=NVLN7Hk61IQ0I%)z?7>6_+ABqMBQS^z-!e_pqP1$$;;<;7^m zxVtQjbm~i#CllupCjzIcW9%*`sqjRpM`A*F$|J{cdQz~yw-->G>K__c>S|UXOopZ~ zY(a=wqcwD!EAXqF<-KncnP>D8?PBTF^e4wQGiT0EDr}Cw5ev56I?+TM$fU1;0=k)^^-O zx0aD0N!;yv*xXxI{zjeLOvEgj6)vK{Pe+Tx&ix}9M>pqw?}zO+&S9;TFs zw^AC4ovYndlR_iwBa@N!u@*?1TLv^w7JX@+-0;2Be%}|;GNvdi6xgsAWx)h{()3{F zoJ2mPmn%gP=D0%&VemRl7uJLwNnBc0(p%+;O}z&d?N3j4nnf(CTcevdPIa?8zFTw5 z)N#@Aw#b0ETZW*_ziH_Or%7dck3AeJ z?Kg4Ali}z=;rJkT^q%4=*Za$otU#ExY!`u@BI9Qw3c;z{(FHmvOQuD1|MikQf1M=H z=P{f8J8KuIOqpW_$Fm<+kPA*oVwalvnGt3<36_9tujyseO6?5Q*t?lPSSW)hSw^F)1M!^PUtWpAx`tEgM| zWZtaWe{t3I9;)FQaSzKZ&x%mimap4=4*A0ORdB%rGH|PAR`LqlMbn#j2AZtJE$XbL z8m0hmYJ2VMHTv!c7nZtMus{*6sFRv=HC}9uJdv^ZuB4RN&En45T+?Nf@A(-O!`!bE zcf5;(LO)E~^gcQh6bNHNipfX_i`;mzPAr@3>J(OLD7LDu!0c3Hw6n3a$Aw#!>l9sD z8gx);u>cN<3T2WG8H*^mPKDl>t9FW?F{vZo9t|IN$6Y`f8~+2A=(coo$PYMJb{O6wIC|LUDg}oqw2q?4+CO zI!9&ZN^cq1pX}55XPGM?kAX%l{Ng;RwMH@Ov_-h*>`K=2f)O+IkylSv;DBs$&x@BM z;rFufX2?)eKX7L4li|hIz>7?^m)LTgio*23&=Pwsur- zZt1rRNgr!W8>*iT#xzEvtF(E5IcGM=OKC~#WlZp7yYA~-L#_JJbZx`98Egp$DmROd ziEHz2wRpQ@thzeR%W}kvNWaNXIs#5)QDk*C++!AIU}U^=w@z}*&uc}6Tdg&M zcdiaAY*}8)oSr$04gR6%*rQEfYkEepGZ#pqK#pkRH`-5+5x~ zdO;!EH6-eNRfqw0mn@upLt;|sgv)QB*FgwC<5LT%GlDv2-i(&TQ(Sy;YAdBMZQB2Q z$vd6GmYVw&0dTQk7e0yGAC(Vl+woS1j!@f<`Z+f{6x98wfjJK5J`N_dNuqzWNuvMO z{v2nE{|{^=)C(yBKk9>yVxa(>A#|8_21Lm=ghncd8~~8}8{Xg^Zlb_bImou2cJMqihY?5K;wi1;^L&HI}FGA^R! zZyTxzoQ&ZR-(1|O69IELPo0awOd^2&HsN)hfo zqQTp(l^gWq6}B4VeT#j8C#AzLQLZCzhhv#6!`Ka5C6;EwJh`Y(r+2@&i2xOlYQfu2 zH4j{D4_H4Fu)DFa;J2}*1|R6=ILktz2d~8HZ@8&F*G;9GM#2;V&26X(@2!9DyorK* z*64x>P-#X;yF?)u^cOC_PprArF$^fB0(+H(-X-=@(E^7tl=ApsATuXj>!Gw+O}Wxy zG4O%M;@5NBnw1&~-K*Yr&8az7SQ$&h7(U}40rg6&I~sH@HHhTo_Qo&N|JGfTFD9vSDs%n>8%?hO zFH;sItF#cNAwYVf*79RW2kR#`rH2c_l}=JyV@gnw`SstD{ed zP``;$r5CLQ@`Q<*dMaLxYm=dK-qRla1la=~erX3?LV{LYz-|JyZJ(Iyulx$*AbDEHGxL2eNa8 z0FRgns4PwmS3gsn-odXgdUO-*8bOGFjKqH}2MKu*3nOn%aSEf4p>`<5Td zPq-h^{z)wU^l~M9B7rFo{!J*er-4~5MXbw zhamlrR_U-(9OiwT%zeluO!wa2&T;ZDD#u@Y+pir5w(spcVJ-hHlw;nGSCT{Tzpf>R z{_{hl{)YEIE6dN>{Pp+$a}Y(E|38KOL4KYFMi5ehJ_P`%3BP1w0Dy*|Q~`j00OgARQzF2}KBmA~GNZqy-2ebVXEM+k zDFPBeKxxv86al3u{xI`L{mvi%%>0w}*19X}?tSig_rA05J`1OyWCPIB(gK14DD(i| z1Ra19V4!2FDWzu!1s`+*02F|;a2ndf8RY+-cJ@!fY@``qzy_LzdQcsNsk8xf)1bRc zA1)>Rf(b4q-qYP%X>xKvZgLhQ3F~<#3768MrJ2b_CU8hR1^GQngUNZru<7E(Yz49= zn;2RDM`=+etZ);=(+e?F5gpw5^oIP zqchqWi!wpkdte--?cMCWy%UXJ88&OMs50of7e{i|XZabh+^^rsm5~8~j*&wLKID{G z5YYxBJeRv@u5L6juiuZZOuWH);Z4^{cZRA{Pj_RZ>Jz|lOYU{~IekXE@n&)ksZI=a z64MNKf2L>XdYoMau4T=yW4hwJOF>`a+h8qb>%{`?Ft;g4y-yQfoF+Ncu6_+&XJza~R;kbFe# zW_C8tpe=Fzx+(9($c*fW@rek|f>C1=s2W9~ti^KydVXUspXFUmlez#AUJ;}l7uTsW z7kj4Vy>{8Mrm|1WLX39~B6*5=r$jM`z6@D)UxMvxFYvLO5)GqEL-)q!50-A4?X}7W zgKriTBa-e#2O;<}3$%Pf4GV{8k8>5o z);od>04O1y=|4Xog8yTmf7{q}q$`5eoa@q&6C&5lSd61;0D@srtq(2EUA59MK6y`A z7ySN>9)^}7bqwo!wQqkLxRX!3^gdqBCzn`2Tx4FTx}0@)Pv~@YucaVM1h-W8>Z69W zz>InS3Ga$0pZC`xfR>$8VZn;PmSrv`p&f=#=NAQlEnMxXWt^NTUPoDvTK8Y=l4(6T z5-n!!4CNuZ%cl=E=<`yJ4>Z&q55J+aj-y;VPsRpn?#W81CK|6s-a%AN$YkBh5PEQC zqF!UVHr||E%9)6io4tU1OCIofY%Xtmj|uw5%VvH(tL#e#~FsB#CfH#H2D9`W!n zyqWO`k7=Emea32pSVNv|cF$Iv^({TP$u&KU*BGY5d^f64$*jMd8}!E5K)Lr*Yl6n` zW=zJMG8&^5-}98L1ekc1~RhS+(KHeElKnOJL~2mG2t%uD&qP&=I?Vp~n@UYSYTNst3zQ`A|=Wtt;i+-ATI#t63PGUsIoc!QNCy zJ>If^G{|;RCfJdm(MK`V?>rj*I->e0K|;^;f;) zOs{`gL4lMokvhu8H0;Tkux=;uI!Ux1xkKG)A>+y5;Q1gUh_NAJ$pxo3tEjNf7iv!R ztjJ&f*l@Z;owWPBg&8pywaD7q0x?6W)aZ_FKJJ#vfsr@#;_gwUFmwW|i@1;5x4$*m z8my(i-6h}=KVW&62dNW{CU$N*T#*@N4L%nm1P@_zuB@30)`{*;nx^0S>0w|_p{P<# z=-aJK_1fZlMBdqUytL|#YzY~&!uJpZgiO)YeJ=2FY{&T|*wE;MykshoW80-U#a^79 zIKyO1TmP}7)zQIY)f@cD_1ZcejMn-pslmaure4$X9%2&x!?8w`hD&YpN-q+`1*AOX zH%zry6UmD2&&bYuBCg!ZJ2qN&d5?*`DUVvMC7HTMAg9+i)O=P%f@tHF)FDu->nA=N z+HK6&>?Vzl5!^{p*Kj6SN+TxKA0`UmmCLk$<- zC>y4wTQ4>-ST94>CkVR};l!1@O}2pIyGX)|&XG$CrNyRH*{e)_VNZ0c)SGN*wbR_8 z3qbi(`gV*Qc?(JgfBG;m+Rl{I_WVYttzrAGjRr$v3f++e`7cZC=Ghq;6Q4ReVq_ zSE%5RhKwB?nc5k{tz@qF_MN(4t)`i8L<#c{5xkV#XdL>gA=3{+5V*$w8XQ?^%{#s1 zU!dN}7mawuK(=EzTH|F_Se(y)Onb^>()HK zL;s5>E?2O_Y}BTOHwf!-Wze4DV@vuiZ_+U^{q)UaT8|iBCw-&AZ4|8~WxJ&!FyA>nh z9@D2-8t2XRuO1f?V+L|u5f{*4HYkq;=mCG08yLT@D9w1>rCJkhH!;xv5o2TdYTVJX<)J8INY=)#VC!_?&O5Pl*~yEu{$=%JyfT(+ zxH_!w^xG0JxU}MldT;K&Kv;VzZay$yC>2!Ub(2ZK|%LJezJR7)SkcQ))R}bTXXW}ZmyJJ!{Zbz zf^dG5cHJ~a2(`;AD=>NMs3GHXubXIT+hL^%Ve^Iq!fjO4VDdT3mE#)DgM#O&4W^dy zG=|Vg2sV=!rv3_h$#R7&q*@f7@R7x3bSWyceb-2j+$iy68OJ&KKJGGhD5Np4mS)ho zFTE;DbJA43LMy&qe3v0pkgAU=nbKYYCYfxjNJcKKIn3Y(?Rmz8V~4>s>8?izu~b=Y zTZH0S4L)4W0IP_EJN&eiqSbU86tn)CriN|Rysz+xg=MIzEM3!u(@(`?PvC&`bKZ2S zA&|r+7fu~WQAuEOK!+JIk|!AtVvYKlC&T$=>KSr=$+@^~sJ-Up17GCiXThl>NymSY zd53qhr<903KU8@o3aIZlwaweI+cCFDoaNF+9LiPQ_>Ud zX<&?6Tw12~WbnC6T!L|_hI0eg7(?pgQQl;*LWAk2m}QUQB;otpboAD-)*lxFV;4@P zv*o!eSfl}JbxTV=|I|1V;BvF^1oNExm~k%zFyU+9$LQSsXly zJ3R+0X?;xW?s-qsR?o#sEq_3yb0IuI7))t{@ETN$;X>^1D2$ID5C~hRUtVn4SpN(F z1+vrKjOS)1RPj;Mz(7vq9U*fUD+JL&oVN={jbfs9^QfqV!6Ut?-Yr|^0Vli9Gz@Vn zrU&k?Xoo*U7oGKuA>82Q>B_8!i$U{|vqvKUF`-g`%tpcXO11w8f)MJdpFJ2Zy9*V?^8U`U|QYIWI# zCj9=At?*2el3H?RFg;>V@5>-GED^T0%+BSvVOv8{j`(FgfY}!#H%o{~2HODK^}%+2qpzid&zr~g)61;28W zMvSBr^CSA-oYD7uhZMgTd8o->V}Mlim)G>0LjMSSsK{S~mF^GVKkD<3=nnPxYdHKB zy8oii-)H#t%Q{r!ukq|3X85-@f1l{vL#_2S3u(l0 + 4.0.0 + + org.neo4j.driver + test-procedures + 5.7-SNAPSHOT + + + UTF-8 + UTF-8 + 11 + + 'v'yyyyMMdd-HHmm + true + 4.4.18 + + + + + org.neo4j + neo4j + ${neo4j.version} + provided + + + + + + + maven-compiler-plugin + 3.10.1 + + + com.diffplug.spotless + spotless-maven-plugin + 2.23.0 + + + + check + + + + + + + + + + + + \ No newline at end of file diff --git a/test-procedures/src/main/java/org/neo4j/driver/LongRunningProcedures.java b/test-procedures/src/main/java/org/neo4j/driver/LongRunningProcedures.java new file mode 100644 index 0000000000..e2cef1a25e --- /dev/null +++ b/test-procedures/src/main/java/org/neo4j/driver/LongRunningProcedures.java @@ -0,0 +1,84 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver; + +import java.util.stream.LongStream; +import java.util.stream.Stream; +import org.neo4j.graphdb.Transaction; +import org.neo4j.logging.Log; +import org.neo4j.procedure.Context; +import org.neo4j.procedure.Name; +import org.neo4j.procedure.Procedure; + +public class LongRunningProcedures { + @Context + public Log log; + + @Context + public Transaction tx; + + public LongRunningProcedures() {} + + @Procedure("test.driver.longRunningStatement") + public void longRunningStatement(@Name("seconds") long seconds) { + final long start = System.currentTimeMillis(); + + while (System.currentTimeMillis() <= start + seconds * 1000L) { + long count = 0; + try { + Thread.sleep(100L); + count = nodeCount(); // Fails if transaction is terminated + } catch (InterruptedException e) { + this.log.error(e.getMessage() + " (last node count " + count + ")", e); + } + } + } + + @Procedure("test.driver.longStreamingResult") + public Stream longStreamingResult(@Name("seconds") long seconds) { + return LongStream.range(0L, seconds * 100L) + .map((x) -> { + if (x == 0L) { + return x; + } else { + try { + Thread.sleep(10L); + } catch (InterruptedException var4) { + this.log.error(var4.getMessage(), var4); + } + + nodeCount(); // Fails if transaction is terminated + return x; + } + }) + .mapToObj(l -> new Output(l)); + } + + private long nodeCount() { + return tx.getAllNodes().stream().count(); + } + + public static class Output { + public final Long out; + + public Output(long value) { + this.out = value; + } + } +}