diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ChannelErrorHandler.java b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ChannelErrorHandler.java index 99d0d4e306..dbc08d2cfd 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ChannelErrorHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ChannelErrorHandler.java @@ -71,7 +71,6 @@ public void channelInactive(ChannelHandlerContext ctx) { // it is most likely inactive because actual network connection broke or was explicitly closed by the driver messageDispatcher.handleChannelInactive(error); - ctx.channel().close(); } else { fail(error); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java index 8f5e71470a..ba84b7853e 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java @@ -153,6 +153,10 @@ public void handleChannelInactive(Throwable cause) { if (!gracefullyClosed) { handleChannelError(cause); } else { + while (!handlers.isEmpty()) { + ResponseHandler handler = removeHandler(); + handler.onFailure(cause); + } channel.close(); } } diff --git a/driver/src/test/java/org/neo4j/driver/integration/DriverCloseIT.java b/driver/src/test/java/org/neo4j/driver/integration/DriverCloseIT.java index 93c27f75d8..cea0df1870 100644 --- a/driver/src/test/java/org/neo4j/driver/integration/DriverCloseIT.java +++ b/driver/src/test/java/org/neo4j/driver/integration/DriverCloseIT.java @@ -18,15 +18,22 @@ */ package org.neo4j.driver.integration; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.neo4j.driver.SessionConfig.builder; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.neo4j.driver.AccessMode; +import org.neo4j.driver.Config; import org.neo4j.driver.Driver; import org.neo4j.driver.GraphDatabase; +import org.neo4j.driver.Result; import org.neo4j.driver.Session; +import org.neo4j.driver.internal.spi.ConnectionPool; import org.neo4j.driver.util.DatabaseExtension; import org.neo4j.driver.util.ParallelizableIT; @@ -84,6 +91,25 @@ void useSessionAfterDriverIsClosed() { assertThrows(IllegalStateException.class, () -> session.run("CREATE ()")); } + @Test + void shouldInterruptStreamConsumptionAndEndRetriesOnDriverClosure() { + int fetchSize = 5; + Config config = Config.builder().withFetchSize(fetchSize).build(); + Driver driver = GraphDatabase.driver(neo4j.uri(), neo4j.authToken(), config); + Session session = driver.session(); + + IllegalStateException exception = assertThrows( + IllegalStateException.class, + () -> session.readTransaction(tx -> { + Map parameters = new HashMap<>(); + parameters.put("limit", fetchSize * 3); + Result result = tx.run("UNWIND range(0, $limit) AS x RETURN x", parameters); + CompletableFuture.runAsync(driver::close); + return result.list(); + })); + assertEquals(ConnectionPool.CONNECTION_POOL_CLOSED_ERROR_MESSAGE, exception.getMessage()); + } + private static Driver createDriver() { return GraphDatabase.driver(neo4j.uri(), neo4j.authToken()); }