diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/ResponseQueueHanlder.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/ResponseQueueHanlder.java index f660e72c61..3c6af437bf 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/ResponseQueueHanlder.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/ResponseQueueHanlder.java @@ -26,28 +26,28 @@ public class ResponseQueueHanlder { private final Consumer responseWriter; private final Queue responseQueue = new ArrayDeque<>(); - private boolean responseReady; + private int requestCount; ResponseQueueHanlder(Consumer responseWriter) { this.responseWriter = responseWriter; } - public synchronized void setResponseReadyAndDispatchFirst() { - responseReady = true; - dispatchFirst(); + public synchronized void increaseRequestCountAndDispatchFirstResponse() { + requestCount++; + dispatchFirstResponse(); } - public synchronized void offerAndDispatchFirst(TestkitResponse response) { + public synchronized void offerAndDispatchFirstResponse(TestkitResponse response) { responseQueue.offer(response); - if (responseReady) { - dispatchFirst(); + if (requestCount > 0) { + dispatchFirstResponse(); } } - private synchronized void dispatchFirst() { + private synchronized void dispatchFirstResponse() { var response = responseQueue.poll(); if (response != null) { - responseReady = false; + requestCount--; responseWriter.accept(response); } } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestProcessorHandler.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestProcessorHandler.java index 310b99283a..867e1f390d 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestProcessorHandler.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestProcessorHandler.java @@ -83,7 +83,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { .exceptionally(this::createErrorResponse) .whenComplete((response, ignored) -> { if (response != null) { - responseQueueHanlder.offerAndDispatchFirst(response); + responseQueueHanlder.offerAndDispatchFirstResponse(response); } }); } catch (Throwable throwable) { @@ -106,7 +106,7 @@ private static CompletionStage wrapSyncRequest( @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { var response = createErrorResponse(cause); - responseQueueHanlder.offerAndDispatchFirst(response); + responseQueueHanlder.offerAndDispatchFirstResponse(response); } private TestkitResponse createErrorResponse(Throwable throwable) { @@ -170,7 +170,7 @@ private void writeAndFlush(TestkitResponse response) { if (channel == null) { throw new IllegalStateException("Called before channel is initialized"); } - responseQueueHanlder.offerAndDispatchFirst(response); + responseQueueHanlder.offerAndDispatchFirstResponse(response); } public enum BackendMode { diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestResponseMapperHandler.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestResponseMapperHandler.java index 7d2a7e9a4e..d3ec4a478c 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestResponseMapperHandler.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestResponseMapperHandler.java @@ -44,7 +44,7 @@ public TestkitRequestResponseMapperHandler(Logging logging, ResponseQueueHanlder public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { var testkitMessage = (String) msg; log.debug("Inbound Testkit message '%s'", testkitMessage.trim()); - responseQueueHanlder.setResponseReadyAndDispatchFirst(); + responseQueueHanlder.increaseRequestCountAndDispatchFirstResponse(); var testkitRequest = objectMapper.readValue(testkitMessage, TestkitRequest.class); ctx.fireChannelRead(testkitRequest); }