Skip to content

Commit f1f56c8

Browse files
committed
Notify handlers of a failure on graceful pool closure
1 parent 92b4f3b commit f1f56c8

File tree

3 files changed

+28
-1
lines changed

3 files changed

+28
-1
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/inbound/ChannelErrorHandler.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ public void channelInactive(ChannelHandlerContext ctx) {
7171
// it is most likely inactive because actual network connection broke or was explicitly closed by the driver
7272

7373
messageDispatcher.handleChannelInactive(error);
74-
ctx.channel().close();
7574
} else {
7675
fail(error);
7776
}

driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,10 @@ public void handleChannelInactive(Throwable cause) {
168168
if (!gracefullyClosed) {
169169
handleChannelError(cause);
170170
} else {
171+
while (!handlers.isEmpty()) {
172+
ResponseHandler handler = removeHandler();
173+
handler.onFailure(cause);
174+
}
171175
channel.close();
172176
}
173177
}

driver/src/test/java/org/neo4j/driver/integration/DriverCloseIT.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,25 @@
1818
*/
1919
package org.neo4j.driver.integration;
2020

21+
import static org.junit.jupiter.api.Assertions.assertEquals;
2122
import static org.junit.jupiter.api.Assertions.assertThrows;
23+
import static org.junit.jupiter.api.Assertions.assertTrue;
2224
import static org.neo4j.driver.SessionConfig.builder;
2325

2426
import org.junit.jupiter.api.Test;
2527
import org.junit.jupiter.api.extension.RegisterExtension;
2628
import org.neo4j.driver.AccessMode;
29+
import org.neo4j.driver.Config;
2730
import org.neo4j.driver.Driver;
2831
import org.neo4j.driver.GraphDatabase;
2932
import org.neo4j.driver.Session;
33+
import org.neo4j.driver.internal.spi.ConnectionPool;
3034
import org.neo4j.driver.testutil.DatabaseExtension;
3135
import org.neo4j.driver.testutil.ParallelizableIT;
3236

37+
import java.util.Map;
38+
import java.util.concurrent.CompletableFuture;
39+
3340
@ParallelizableIT
3441
class DriverCloseIT {
3542
@RegisterExtension
@@ -84,6 +91,23 @@ void useSessionAfterDriverIsClosed() {
8491
assertThrows(IllegalStateException.class, () -> session.run("CREATE ()"));
8592
}
8693

94+
@Test
95+
void shouldInterruptStreamConsumptionAndEndRetriesOnDriverClosure() {
96+
var fetchSize = 5;
97+
var config = Config.builder()
98+
.withFetchSize(fetchSize)
99+
.build();
100+
var driver = GraphDatabase.driver(neo4j.uri(), neo4j.authTokenManager(), config);
101+
var session = driver.session();
102+
103+
var exception = assertThrows(IllegalStateException.class, () -> session.executeRead(tx -> {
104+
var result = tx.run("UNWIND range(0, $limit) AS x RETURN x", Map.of("limit", fetchSize * 3));
105+
CompletableFuture.runAsync(driver::close);
106+
return result.list();
107+
}));
108+
assertEquals(ConnectionPool.CONNECTION_POOL_CLOSED_ERROR_MESSAGE, exception.getMessage());
109+
}
110+
87111
private static Driver createDriver() {
88112
return GraphDatabase.driver(neo4j.uri(), neo4j.authTokenManager());
89113
}

0 commit comments

Comments
 (0)