diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ChannelErrorHandler.java b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ChannelErrorHandler.java index 99d0d4e306..dbc08d2cfd 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ChannelErrorHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ChannelErrorHandler.java @@ -71,7 +71,6 @@ public void channelInactive(ChannelHandlerContext ctx) { // it is most likely inactive because actual network connection broke or was explicitly closed by the driver messageDispatcher.handleChannelInactive(error); - ctx.channel().close(); } else { fail(error); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java index 0960460ed7..20c9a030b9 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java @@ -168,6 +168,10 @@ public void handleChannelInactive(Throwable cause) { if (!gracefullyClosed) { handleChannelError(cause); } else { + while (!handlers.isEmpty()) { + ResponseHandler handler = removeHandler(); + handler.onFailure(cause); + } channel.close(); } } diff --git a/driver/src/test/java/org/neo4j/driver/integration/DriverCloseIT.java b/driver/src/test/java/org/neo4j/driver/integration/DriverCloseIT.java index 4d0cd0e845..a5b6b46b72 100644 --- a/driver/src/test/java/org/neo4j/driver/integration/DriverCloseIT.java +++ b/driver/src/test/java/org/neo4j/driver/integration/DriverCloseIT.java @@ -18,15 +18,20 @@ */ package org.neo4j.driver.integration; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.neo4j.driver.SessionConfig.builder; +import java.util.Map; +import java.util.concurrent.CompletableFuture; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.neo4j.driver.AccessMode; +import org.neo4j.driver.Config; import org.neo4j.driver.Driver; import org.neo4j.driver.GraphDatabase; import org.neo4j.driver.Session; +import org.neo4j.driver.internal.spi.ConnectionPool; import org.neo4j.driver.testutil.DatabaseExtension; import org.neo4j.driver.testutil.ParallelizableIT; @@ -84,6 +89,23 @@ void useSessionAfterDriverIsClosed() { assertThrows(IllegalStateException.class, () -> session.run("CREATE ()")); } + @Test + void shouldInterruptStreamConsumptionAndEndRetriesOnDriverClosure() { + var fetchSize = 5; + var config = Config.builder().withFetchSize(fetchSize).build(); + var driver = GraphDatabase.driver(neo4j.uri(), neo4j.authTokenManager(), config); + var session = driver.session(); + + var exception = assertThrows( + IllegalStateException.class, + () -> session.executeRead(tx -> { + var result = tx.run("UNWIND range(0, $limit) AS x RETURN x", Map.of("limit", fetchSize * 3)); + CompletableFuture.runAsync(driver::close); + return result.list(); + })); + assertEquals(ConnectionPool.CONNECTION_POOL_CLOSED_ERROR_MESSAGE, exception.getMessage()); + } + private static Driver createDriver() { return GraphDatabase.driver(neo4j.uri(), neo4j.authTokenManager()); }