Skip to content

Commit 9ecd1ed

Browse files
authored
Notify handlers of a failure on graceful pool closure (#1442) (#1443)
1 parent 37c04b4 commit 9ecd1ed

File tree

3 files changed

+30
-1
lines changed

3 files changed

+30
-1
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/inbound/ChannelErrorHandler.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ public void channelInactive(ChannelHandlerContext ctx) {
7171
// it is most likely inactive because actual network connection broke or was explicitly closed by the driver
7272

7373
messageDispatcher.handleChannelInactive(error);
74-
ctx.channel().close();
7574
} else {
7675
fail(error);
7776
}

driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,10 @@ public void handleChannelInactive(Throwable cause) {
153153
if (!gracefullyClosed) {
154154
handleChannelError(cause);
155155
} else {
156+
while (!handlers.isEmpty()) {
157+
ResponseHandler handler = removeHandler();
158+
handler.onFailure(cause);
159+
}
156160
channel.close();
157161
}
158162
}

driver/src/test/java/org/neo4j/driver/integration/DriverCloseIT.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,22 @@
1818
*/
1919
package org.neo4j.driver.integration;
2020

21+
import static org.junit.jupiter.api.Assertions.assertEquals;
2122
import static org.junit.jupiter.api.Assertions.assertThrows;
2223
import static org.neo4j.driver.SessionConfig.builder;
2324

25+
import java.util.HashMap;
26+
import java.util.Map;
27+
import java.util.concurrent.CompletableFuture;
2428
import org.junit.jupiter.api.Test;
2529
import org.junit.jupiter.api.extension.RegisterExtension;
2630
import org.neo4j.driver.AccessMode;
31+
import org.neo4j.driver.Config;
2732
import org.neo4j.driver.Driver;
2833
import org.neo4j.driver.GraphDatabase;
34+
import org.neo4j.driver.Result;
2935
import org.neo4j.driver.Session;
36+
import org.neo4j.driver.internal.spi.ConnectionPool;
3037
import org.neo4j.driver.util.DatabaseExtension;
3138
import org.neo4j.driver.util.ParallelizableIT;
3239

@@ -84,6 +91,25 @@ void useSessionAfterDriverIsClosed() {
8491
assertThrows(IllegalStateException.class, () -> session.run("CREATE ()"));
8592
}
8693

94+
@Test
95+
void shouldInterruptStreamConsumptionAndEndRetriesOnDriverClosure() {
96+
int fetchSize = 5;
97+
Config config = Config.builder().withFetchSize(fetchSize).build();
98+
Driver driver = GraphDatabase.driver(neo4j.uri(), neo4j.authToken(), config);
99+
Session session = driver.session();
100+
101+
IllegalStateException exception = assertThrows(
102+
IllegalStateException.class,
103+
() -> session.readTransaction(tx -> {
104+
Map<String, Object> parameters = new HashMap<>();
105+
parameters.put("limit", fetchSize * 3);
106+
Result result = tx.run("UNWIND range(0, $limit) AS x RETURN x", parameters);
107+
CompletableFuture.runAsync(driver::close);
108+
return result.list();
109+
}));
110+
assertEquals(ConnectionPool.CONNECTION_POOL_CLOSED_ERROR_MESSAGE, exception.getMessage());
111+
}
112+
87113
private static Driver createDriver() {
88114
return GraphDatabase.driver(neo4j.uri(), neo4j.authToken());
89115
}

0 commit comments

Comments
 (0)