Skip to content

Commit 3bbc806

Browse files
committed
Update TestkitRequestProcessorHandler to adhere to req/res pattern
This update aims to make sure the Teskit backend does not break the request / response pattern. For instance, by sending 2 responses together in case of internal callbacks.
1 parent 30a5123 commit 3bbc806

File tree

4 files changed

+82
-16
lines changed

4 files changed

+82
-16
lines changed
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package neo4j.org.testkit.backend;
20+
21+
import java.util.ArrayDeque;
22+
import java.util.Queue;
23+
import java.util.function.Consumer;
24+
import neo4j.org.testkit.backend.messages.responses.TestkitResponse;
25+
26+
public class ResponseQueueHanlder {
27+
private final Consumer<TestkitResponse> responseWriter;
28+
private final Queue<TestkitResponse> responseQueue = new ArrayDeque<>();
29+
private boolean responseReady;
30+
31+
ResponseQueueHanlder(Consumer<TestkitResponse> responseWriter) {
32+
this.responseWriter = responseWriter;
33+
}
34+
35+
public synchronized void setResponseReadyAndDispatchFirst() {
36+
responseReady = true;
37+
dispatchFirst();
38+
}
39+
40+
public synchronized void offerAndDispatchFirst(TestkitResponse response) {
41+
responseQueue.offer(response);
42+
if (responseReady) {
43+
dispatchFirst();
44+
}
45+
}
46+
47+
private synchronized void dispatchFirst() {
48+
var response = responseQueue.poll();
49+
if (response != null) {
50+
responseReady = false;
51+
responseWriter.accept(response);
52+
}
53+
}
54+
}

testkit-backend/src/main/java/neo4j/org/testkit/backend/Runner.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,14 @@ public static void main(String[] args) throws InterruptedException {
4949
.childHandler(new ChannelInitializer<SocketChannel>() {
5050
@Override
5151
protected void initChannel(SocketChannel channel) {
52+
var responseQueueHanlder = new ResponseQueueHanlder(channel::writeAndFlush);
5253
channel.pipeline().addLast(new TestkitMessageInboundHandler());
5354
channel.pipeline().addLast(new TestkitMessageOutboundHandler());
54-
channel.pipeline().addLast(new TestkitRequestResponseMapperHandler(logging));
55-
channel.pipeline().addLast(new TestkitRequestProcessorHandler(backendMode, logging));
55+
channel.pipeline()
56+
.addLast(new TestkitRequestResponseMapperHandler(logging, responseQueueHanlder));
57+
channel.pipeline()
58+
.addLast(new TestkitRequestProcessorHandler(
59+
backendMode, logging, responseQueueHanlder));
5660
}
5761
});
5862
var server = bootstrap.bind().sync();

testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestProcessorHandler.java

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.function.BiFunction;
3131
import neo4j.org.testkit.backend.CustomDriverError;
3232
import neo4j.org.testkit.backend.FrontendError;
33+
import neo4j.org.testkit.backend.ResponseQueueHanlder;
3334
import neo4j.org.testkit.backend.TestkitState;
3435
import neo4j.org.testkit.backend.messages.requests.TestkitRequest;
3536
import neo4j.org.testkit.backend.messages.responses.BackendError;
@@ -47,9 +48,11 @@ public class TestkitRequestProcessorHandler extends ChannelInboundHandlerAdapter
4748
private final BiFunction<TestkitRequest, TestkitState, CompletionStage<TestkitResponse>> processorImpl;
4849
// Some requests require multiple threads
4950
private final Executor requestExecutorService = Executors.newFixedThreadPool(10);
51+
private final ResponseQueueHanlder responseQueueHanlder;
5052
private Channel channel;
5153

52-
public TestkitRequestProcessorHandler(BackendMode backendMode, Logging logging) {
54+
public TestkitRequestProcessorHandler(
55+
BackendMode backendMode, Logging logging, ResponseQueueHanlder responseQueueHanlder) {
5356
switch (backendMode) {
5457
case ASYNC -> processorImpl = TestkitRequest::processAsync;
5558
case REACTIVE_LEGACY -> processorImpl =
@@ -59,6 +62,7 @@ public TestkitRequestProcessorHandler(BackendMode backendMode, Logging logging)
5962
default -> processorImpl = TestkitRequestProcessorHandler::wrapSyncRequest;
6063
}
6164
testkitState = new TestkitState(this::writeAndFlush, logging);
65+
this.responseQueueHanlder = responseQueueHanlder;
6266
}
6367

6468
@Override
@@ -74,14 +78,14 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
7478
requestExecutorService.execute(() -> {
7579
try {
7680
var request = (TestkitRequest) msg;
77-
var responseStage = processorImpl.apply(request, testkitState);
78-
responseStage.whenComplete((response, throwable) -> {
79-
if (throwable != null) {
80-
ctx.writeAndFlush(createErrorResponse(throwable));
81-
} else if (response != null) {
82-
ctx.writeAndFlush(response);
83-
}
84-
});
81+
processorImpl
82+
.apply(request, testkitState)
83+
.exceptionally(this::createErrorResponse)
84+
.whenComplete((response, ignored) -> {
85+
if (response != null) {
86+
responseQueueHanlder.offerAndDispatchFirst(response);
87+
}
88+
});
8589
} catch (Throwable throwable) {
8690
exceptionCaught(ctx, throwable);
8791
}
@@ -101,7 +105,8 @@ private static CompletionStage<TestkitResponse> wrapSyncRequest(
101105

102106
@Override
103107
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
104-
ctx.writeAndFlush(createErrorResponse(cause));
108+
var response = createErrorResponse(cause);
109+
responseQueueHanlder.offerAndDispatchFirst(response);
105110
}
106111

107112
private TestkitResponse createErrorResponse(Throwable throwable) {
@@ -165,7 +170,7 @@ private void writeAndFlush(TestkitResponse response) {
165170
if (channel == null) {
166171
throw new IllegalStateException("Called before channel is initialized");
167172
}
168-
channel.writeAndFlush(response);
173+
responseQueueHanlder.offerAndDispatchFirst(response);
169174
}
170175

171176
public enum BackendMode {

testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestResponseMapperHandler.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.netty.channel.ChannelDuplexHandler;
2424
import io.netty.channel.ChannelHandlerContext;
2525
import io.netty.channel.ChannelPromise;
26+
import neo4j.org.testkit.backend.ResponseQueueHanlder;
2627
import neo4j.org.testkit.backend.messages.TestkitModule;
2728
import neo4j.org.testkit.backend.messages.requests.TestkitRequest;
2829
import neo4j.org.testkit.backend.messages.responses.TestkitResponse;
@@ -32,17 +33,19 @@
3233
public class TestkitRequestResponseMapperHandler extends ChannelDuplexHandler {
3334
private final Logger log;
3435
private final ObjectMapper objectMapper = newObjectMapper();
36+
private final ResponseQueueHanlder responseQueueHanlder;
3537

36-
public TestkitRequestResponseMapperHandler(Logging logging) {
38+
public TestkitRequestResponseMapperHandler(Logging logging, ResponseQueueHanlder responseQueueHanlder) {
3739
log = logging.getLog(getClass());
40+
this.responseQueueHanlder = responseQueueHanlder;
3841
}
3942

4043
@Override
4144
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
4245
var testkitMessage = (String) msg;
4346
log.debug("Inbound Testkit message '%s'", testkitMessage.trim());
44-
TestkitRequest testkitRequest;
45-
testkitRequest = objectMapper.readValue(testkitMessage, TestkitRequest.class);
47+
responseQueueHanlder.setResponseReadyAndDispatchFirst();
48+
var testkitRequest = objectMapper.readValue(testkitMessage, TestkitRequest.class);
4649
ctx.fireChannelRead(testkitRequest);
4750
}
4851

0 commit comments

Comments
 (0)