Skip to content

Commit 2d160d3

Browse files
committed
Update TestkitRequestProcessorHandler to adhere to req/res pattern
This update aims to make sure the Teskit backend does not break the request / response pattern. For instance, by sending 2 responses together in case of internal callbacks.
1 parent e3d1080 commit 2d160d3

File tree

1 file changed

+43
-8
lines changed

1 file changed

+43
-8
lines changed

testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestProcessorHandler.java

Lines changed: 43 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,15 @@
2222
import io.netty.channel.ChannelHandlerContext;
2323
import io.netty.channel.ChannelInboundHandlerAdapter;
2424
import java.time.zone.ZoneRulesException;
25+
import java.util.ArrayDeque;
26+
import java.util.Queue;
2527
import java.util.concurrent.CompletableFuture;
2628
import java.util.concurrent.CompletionException;
2729
import java.util.concurrent.CompletionStage;
2830
import java.util.concurrent.Executor;
2931
import java.util.concurrent.Executors;
3032
import java.util.function.BiFunction;
33+
import java.util.function.Consumer;
3134
import neo4j.org.testkit.backend.CustomDriverError;
3235
import neo4j.org.testkit.backend.FrontendError;
3336
import neo4j.org.testkit.backend.TestkitState;
@@ -48,6 +51,7 @@ public class TestkitRequestProcessorHandler extends ChannelInboundHandlerAdapter
4851
// Some requests require multiple threads
4952
private final Executor requestExecutorService = Executors.newFixedThreadPool(10);
5053
private Channel channel;
54+
private ResponseQueueHanlder responseQueueHanlder;
5155

5256
public TestkitRequestProcessorHandler(BackendMode backendMode, Logging logging) {
5357
switch (backendMode) {
@@ -64,22 +68,22 @@ public TestkitRequestProcessorHandler(BackendMode backendMode, Logging logging)
6468
@Override
6569
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
6670
channel = ctx.channel();
71+
responseQueueHanlder = new ResponseQueueHanlder(channel::writeAndFlush);
6772
super.channelRegistered(ctx);
6873
}
6974

7075
@Override
7176
public void channelRead(ChannelHandlerContext ctx, Object msg) {
7277
// Processing is done in a separate thread to avoid blocking EventLoop because some testing logic, like
7378
// resolvers support, is blocking.
79+
responseQueueHanlder.setResponseReadyAndDispatchFirst();
7480
requestExecutorService.execute(() -> {
7581
try {
7682
var request = (TestkitRequest) msg;
77-
var responseStage = processorImpl.apply(request, testkitState);
78-
responseStage.whenComplete((response, throwable) -> {
79-
if (throwable != null) {
80-
ctx.writeAndFlush(createErrorResponse(throwable));
81-
} else if (response != null) {
82-
ctx.writeAndFlush(response);
83+
processorImpl.apply(request, testkitState).whenComplete((response, throwable) -> {
84+
var testkitResponse = throwable != null ? createErrorResponse(throwable) : response;
85+
if (testkitResponse != null) {
86+
responseQueueHanlder.offerAndDispatchFirst(testkitResponse);
8387
}
8488
});
8589
} catch (Throwable throwable) {
@@ -101,7 +105,8 @@ private static CompletionStage<TestkitResponse> wrapSyncRequest(
101105

102106
@Override
103107
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
104-
ctx.writeAndFlush(createErrorResponse(cause));
108+
var response = createErrorResponse(cause);
109+
responseQueueHanlder.offerAndDispatchFirst(response);
105110
}
106111

107112
private TestkitResponse createErrorResponse(Throwable throwable) {
@@ -165,7 +170,7 @@ private void writeAndFlush(TestkitResponse response) {
165170
if (channel == null) {
166171
throw new IllegalStateException("Called before channel is initialized");
167172
}
168-
channel.writeAndFlush(response);
173+
responseQueueHanlder.offerAndDispatchFirst(response);
169174
}
170175

171176
public enum BackendMode {
@@ -174,4 +179,34 @@ public enum BackendMode {
174179
REACTIVE_LEGACY,
175180
REACTIVE
176181
}
182+
183+
private static class ResponseQueueHanlder {
184+
private final Consumer<TestkitResponse> responseWriter;
185+
private final Queue<TestkitResponse> responseQueue = new ArrayDeque<>();
186+
private boolean responseReady;
187+
188+
ResponseQueueHanlder(Consumer<TestkitResponse> responseWriter) {
189+
this.responseWriter = responseWriter;
190+
}
191+
192+
synchronized void setResponseReadyAndDispatchFirst() {
193+
responseReady = true;
194+
dispatchFirst();
195+
}
196+
197+
synchronized void offerAndDispatchFirst(TestkitResponse response) {
198+
responseQueue.offer(response);
199+
if (responseReady) {
200+
dispatchFirst();
201+
}
202+
}
203+
204+
private synchronized void dispatchFirst() {
205+
var response = responseQueue.poll();
206+
if (response != null) {
207+
responseReady = false;
208+
responseWriter.accept(responseQueue.poll());
209+
}
210+
}
211+
}
177212
}

0 commit comments

Comments
 (0)