18
18
*/
19
19
package neo4j .org .testkit .backend .channel .handler ;
20
20
21
+ import static org .neo4j .driver .internal .util .LockUtil .executeWithLock ;
22
+
21
23
import io .netty .channel .Channel ;
22
24
import io .netty .channel .ChannelHandlerContext ;
23
25
import io .netty .channel .ChannelInboundHandlerAdapter ;
24
26
import java .time .zone .ZoneRulesException ;
27
+ import java .util .ArrayDeque ;
28
+ import java .util .Queue ;
25
29
import java .util .concurrent .CompletableFuture ;
26
30
import java .util .concurrent .CompletionException ;
27
31
import java .util .concurrent .CompletionStage ;
28
32
import java .util .concurrent .Executor ;
29
33
import java .util .concurrent .Executors ;
34
+ import java .util .concurrent .locks .Lock ;
35
+ import java .util .concurrent .locks .ReentrantLock ;
30
36
import java .util .function .BiFunction ;
37
+ import java .util .function .Consumer ;
31
38
import neo4j .org .testkit .backend .CustomDriverError ;
32
39
import neo4j .org .testkit .backend .FrontendError ;
33
40
import neo4j .org .testkit .backend .TestkitState ;
@@ -47,6 +54,9 @@ public class TestkitRequestProcessorHandler extends ChannelInboundHandlerAdapter
47
54
private final BiFunction <TestkitRequest , TestkitState , CompletionStage <TestkitResponse >> processorImpl ;
48
55
// Some requests require multiple threads
49
56
private final Executor requestExecutorService = Executors .newFixedThreadPool (10 );
57
+ private final Lock lock = new ReentrantLock ();
58
+ private final Queue <TestkitResponse > responseQueue = new ArrayDeque <>();
59
+ private boolean canDispatchResponse = false ;
50
60
private Channel channel ;
51
61
52
62
public TestkitRequestProcessorHandler (BackendMode backendMode , Logging logging ) {
@@ -71,15 +81,26 @@ public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
71
81
public void channelRead (ChannelHandlerContext ctx , Object msg ) {
72
82
// Processing is done in a separate thread to avoid blocking EventLoop because some testing logic, like
73
83
// resolvers support, is blocking.
84
+ executeWithLock (lock , () -> {
85
+ canDispatchResponse = true ;
86
+ trySending (ctx ::writeAndFlush );
87
+ });
74
88
requestExecutorService .execute (() -> {
75
89
try {
76
90
var request = (TestkitRequest ) msg ;
77
91
var responseStage = processorImpl .apply (request , testkitState );
78
92
responseStage .whenComplete ((response , throwable ) -> {
79
93
if (throwable != null ) {
80
- ctx .writeAndFlush (createErrorResponse (throwable ));
94
+ var errorResponse = createErrorResponse (throwable );
95
+ executeWithLock (lock , () -> {
96
+ responseQueue .offer (errorResponse );
97
+ trySending (ctx ::writeAndFlush );
98
+ });
81
99
} else if (response != null ) {
82
- ctx .writeAndFlush (response );
100
+ executeWithLock (lock , () -> {
101
+ responseQueue .offer (response );
102
+ trySending (ctx ::writeAndFlush );
103
+ });
83
104
}
84
105
});
85
106
} catch (Throwable throwable ) {
@@ -88,6 +109,13 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
88
109
});
89
110
}
90
111
112
+ private void trySending (Consumer <TestkitResponse > responseWriter ) {
113
+ if (canDispatchResponse && !responseQueue .isEmpty ()) {
114
+ canDispatchResponse = false ;
115
+ responseWriter .accept (responseQueue .poll ());
116
+ }
117
+ }
118
+
91
119
private static CompletionStage <TestkitResponse > wrapSyncRequest (
92
120
TestkitRequest testkitRequest , TestkitState testkitState ) {
93
121
var result = new CompletableFuture <TestkitResponse >();
@@ -101,7 +129,11 @@ private static CompletionStage<TestkitResponse> wrapSyncRequest(
101
129
102
130
@ Override
103
131
public void exceptionCaught (ChannelHandlerContext ctx , Throwable cause ) {
104
- ctx .writeAndFlush (createErrorResponse (cause ));
132
+ var response = createErrorResponse (cause );
133
+ executeWithLock (lock , () -> {
134
+ responseQueue .offer (response );
135
+ trySending (ctx ::writeAndFlush );
136
+ });
105
137
}
106
138
107
139
private TestkitResponse createErrorResponse (Throwable throwable ) {
@@ -165,7 +197,10 @@ private void writeAndFlush(TestkitResponse response) {
165
197
if (channel == null ) {
166
198
throw new IllegalStateException ("Called before channel is initialized" );
167
199
}
168
- channel .writeAndFlush (response );
200
+ executeWithLock (lock , () -> {
201
+ responseQueue .offer (response );
202
+ trySending (channel ::writeAndFlush );
203
+ });
169
204
}
170
205
171
206
public enum BackendMode {
0 commit comments