Skip to content

Commit 52aef12

Browse files
authored
Notify handlers of a failure on graceful pool closure (#1442)
1 parent 92b4f3b commit 52aef12

File tree

3 files changed

+26
-1
lines changed

3 files changed

+26
-1
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/inbound/ChannelErrorHandler.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ public void channelInactive(ChannelHandlerContext ctx) {
7171
// it is most likely inactive because actual network connection broke or was explicitly closed by the driver
7272

7373
messageDispatcher.handleChannelInactive(error);
74-
ctx.channel().close();
7574
} else {
7675
fail(error);
7776
}

driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,10 @@ public void handleChannelInactive(Throwable cause) {
168168
if (!gracefullyClosed) {
169169
handleChannelError(cause);
170170
} else {
171+
while (!handlers.isEmpty()) {
172+
ResponseHandler handler = removeHandler();
173+
handler.onFailure(cause);
174+
}
171175
channel.close();
172176
}
173177
}

driver/src/test/java/org/neo4j/driver/integration/DriverCloseIT.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,20 @@
1818
*/
1919
package org.neo4j.driver.integration;
2020

21+
import static org.junit.jupiter.api.Assertions.assertEquals;
2122
import static org.junit.jupiter.api.Assertions.assertThrows;
2223
import static org.neo4j.driver.SessionConfig.builder;
2324

25+
import java.util.Map;
26+
import java.util.concurrent.CompletableFuture;
2427
import org.junit.jupiter.api.Test;
2528
import org.junit.jupiter.api.extension.RegisterExtension;
2629
import org.neo4j.driver.AccessMode;
30+
import org.neo4j.driver.Config;
2731
import org.neo4j.driver.Driver;
2832
import org.neo4j.driver.GraphDatabase;
2933
import org.neo4j.driver.Session;
34+
import org.neo4j.driver.internal.spi.ConnectionPool;
3035
import org.neo4j.driver.testutil.DatabaseExtension;
3136
import org.neo4j.driver.testutil.ParallelizableIT;
3237

@@ -84,6 +89,23 @@ void useSessionAfterDriverIsClosed() {
8489
assertThrows(IllegalStateException.class, () -> session.run("CREATE ()"));
8590
}
8691

92+
@Test
93+
void shouldInterruptStreamConsumptionAndEndRetriesOnDriverClosure() {
94+
var fetchSize = 5;
95+
var config = Config.builder().withFetchSize(fetchSize).build();
96+
var driver = GraphDatabase.driver(neo4j.uri(), neo4j.authTokenManager(), config);
97+
var session = driver.session();
98+
99+
var exception = assertThrows(
100+
IllegalStateException.class,
101+
() -> session.executeRead(tx -> {
102+
var result = tx.run("UNWIND range(0, $limit) AS x RETURN x", Map.of("limit", fetchSize * 3));
103+
CompletableFuture.runAsync(driver::close);
104+
return result.list();
105+
}));
106+
assertEquals(ConnectionPool.CONNECTION_POOL_CLOSED_ERROR_MESSAGE, exception.getMessage());
107+
}
108+
87109
private static Driver createDriver() {
88110
return GraphDatabase.driver(neo4j.uri(), neo4j.authTokenManager());
89111
}

0 commit comments

Comments
 (0)