Skip to content

Commit 6ee3936

Browse files
authored
Add reactive backend support (#998)
This update brings reactive backend support. Its scope is limited to providing partial transparent support for existing test cases. More updates are expected in future PRs.
1 parent 4238f82 commit 6ee3936

31 files changed

+660
-43
lines changed

testkit-backend/src/main/java/neo4j/org/testkit/backend/Runner.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,24 @@ public class Runner
3434
{
3535
public static void main( String[] args ) throws InterruptedException
3636
{
37-
boolean asyncMode = args.length > 0 && args[0].equals( "async" );
37+
TestkitRequestProcessorHandler.BackendMode backendMode;
38+
String modeArg = args.length > 0 ? args[0] : null;
39+
if ( "async".equals( modeArg ) )
40+
{
41+
backendMode = TestkitRequestProcessorHandler.BackendMode.ASYNC;
42+
}
43+
else if ( "reactive".equals( modeArg ) )
44+
{
45+
backendMode = TestkitRequestProcessorHandler.BackendMode.REACTIVE;
46+
}
47+
else
48+
{
49+
backendMode = TestkitRequestProcessorHandler.BackendMode.SYNC;
50+
}
51+
3852
EventLoopGroup group = new NioEventLoopGroup();
3953
try
54+
4055
{
4156
ServerBootstrap bootstrap = new ServerBootstrap();
4257
bootstrap.group( group )
@@ -50,7 +65,7 @@ protected void initChannel( SocketChannel channel )
5065
channel.pipeline().addLast( new TestkitMessageInboundHandler() );
5166
channel.pipeline().addLast( new TestkitMessageOutboundHandler() );
5267
channel.pipeline().addLast( new TestkitRequestResponseMapperHandler() );
53-
channel.pipeline().addLast( new TestkitRequestProcessorHandler( asyncMode ) );
68+
channel.pipeline().addLast( new TestkitRequestProcessorHandler( backendMode ) );
5469
}
5570
} );
5671
ChannelFuture server = bootstrap.bind().sync();
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package neo4j.org.testkit.backend;
20+
21+
import lombok.Getter;
22+
import org.reactivestreams.Subscriber;
23+
import org.reactivestreams.Subscription;
24+
25+
import java.util.concurrent.CompletableFuture;
26+
27+
public class RxBlockingSubscriber<T> implements Subscriber<T>
28+
{
29+
@Getter
30+
private final CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>();
31+
private CompletableFuture<CompletableFuture<T>> nextSignalConsumerFuture;
32+
33+
public void setNextSignalConsumer( CompletableFuture<T> nextSignalConsumer )
34+
{
35+
nextSignalConsumerFuture.complete( nextSignalConsumer );
36+
}
37+
38+
@Override
39+
public void onSubscribe( Subscription s )
40+
{
41+
nextSignalConsumerFuture = new CompletableFuture<>();
42+
subscriptionFuture.complete( s );
43+
}
44+
45+
@Override
46+
public void onNext( T t )
47+
{
48+
blockUntilNextSignalConsumer().complete( t );
49+
}
50+
51+
@Override
52+
public void onError( Throwable t )
53+
{
54+
blockUntilNextSignalConsumer().completeExceptionally( t );
55+
}
56+
57+
@Override
58+
public void onComplete()
59+
{
60+
blockUntilNextSignalConsumer().complete( null );
61+
}
62+
63+
private CompletableFuture<T> blockUntilNextSignalConsumer()
64+
{
65+
CompletableFuture<T> nextSignalConsumer;
66+
try
67+
{
68+
nextSignalConsumer = nextSignalConsumerFuture.get();
69+
}
70+
catch ( Throwable throwable )
71+
{
72+
throw new RuntimeException( "Failed waiting for next signal consumer", throwable );
73+
}
74+
nextSignalConsumerFuture = new CompletableFuture<>();
75+
return nextSignalConsumer;
76+
}
77+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package neo4j.org.testkit.backend;
20+
21+
import lombok.Getter;
22+
import lombok.Setter;
23+
24+
import java.util.concurrent.CompletableFuture;
25+
26+
import org.neo4j.driver.reactive.RxSession;
27+
28+
@Getter
29+
@Setter
30+
public class RxSessionState
31+
{
32+
public RxSession session;
33+
public CompletableFuture<Void> txWorkFuture;
34+
35+
public RxSessionState( RxSession session )
36+
{
37+
this.session = session;
38+
}
39+
}

testkit-backend/src/main/java/neo4j/org/testkit/backend/TestkitState.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import lombok.Getter;
2323
import neo4j.org.testkit.backend.messages.requests.TestkitCallbackResult;
2424
import neo4j.org.testkit.backend.messages.responses.TestkitResponse;
25+
import reactor.core.publisher.Mono;
2526

2627
import java.util.HashMap;
2728
import java.util.Map;
@@ -30,12 +31,15 @@
3031
import java.util.function.Consumer;
3132

3233
import org.neo4j.driver.Driver;
34+
import org.neo4j.driver.Record;
3335
import org.neo4j.driver.Result;
3436
import org.neo4j.driver.Transaction;
3537
import org.neo4j.driver.async.AsyncTransaction;
3638
import org.neo4j.driver.async.ResultCursor;
3739
import org.neo4j.driver.exceptions.Neo4jException;
3840
import org.neo4j.driver.internal.cluster.RoutingTableRegistry;
41+
import org.neo4j.driver.reactive.RxResult;
42+
import org.neo4j.driver.reactive.RxTransaction;
3943

4044
@Getter
4145
public class TestkitState
@@ -46,12 +50,17 @@ public class TestkitState
4650
private final Map<String,RoutingTableRegistry> routingTableRegistry = new HashMap<>();
4751
private final Map<String,SessionState> sessionStates = new HashMap<>();
4852
private final Map<String,AsyncSessionState> asyncSessionStates = new HashMap<>();
53+
private final Map<String,RxSessionState> rxSessionStates = new HashMap<>();
4954
private final Map<String,Result> results = new HashMap<>();
5055
private final Map<String,ResultCursor> resultCursors = new HashMap<>();
56+
private final Map<String,RxResult> rxResults = new HashMap<>();
57+
private final Map<String,RxBlockingSubscriber<Record>> rxResultIdToRecordSubscriber = new HashMap<>();
5158
@Getter( AccessLevel.NONE )
5259
private final Map<String,Transaction> transactions = new HashMap<>();
5360
@Getter( AccessLevel.NONE )
5461
private final Map<String,AsyncTransaction> asyncTransactions = new HashMap<>();
62+
@Getter( AccessLevel.NONE )
63+
private final Map<String,RxTransaction> rxTransactions = new HashMap<>();
5564
private final Map<String,Neo4jException> errors = new HashMap<>();
5665
@Getter( AccessLevel.NONE )
5766
private final AtomicInteger idGenerator = new AtomicInteger( 0 );
@@ -101,4 +110,20 @@ public CompletableFuture<AsyncTransaction> getAsyncTransaction( String id )
101110
}
102111
return CompletableFuture.completedFuture( asyncTransactions.get( id ) );
103112
}
113+
114+
public String addRxTransaction( RxTransaction transaction )
115+
{
116+
String id = newId();
117+
this.rxTransactions.put( id, transaction );
118+
return id;
119+
}
120+
121+
public Mono<RxTransaction> getRxTransaction( String id )
122+
{
123+
if ( !this.rxTransactions.containsKey( id ) )
124+
{
125+
return Mono.error( new RuntimeException( TRANSACTION_NOT_FOUND_MESSAGE ) );
126+
}
127+
return Mono.just( rxTransactions.get( id ) );
128+
}
104129
}

testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestProcessorHandler.java

Lines changed: 46 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import java.util.concurrent.CompletableFuture;
3131
import java.util.concurrent.CompletionException;
3232
import java.util.concurrent.CompletionStage;
33+
import java.util.concurrent.Executor;
34+
import java.util.concurrent.Executors;
3335
import java.util.function.BiFunction;
3436

3537
import org.neo4j.driver.exceptions.Neo4jException;
@@ -39,15 +41,24 @@
3941
public class TestkitRequestProcessorHandler extends ChannelInboundHandlerAdapter
4042
{
4143
private final TestkitState testkitState = new TestkitState( this::writeAndFlush );
42-
private final BiFunction<TestkitRequest, TestkitState, CompletionStage<TestkitResponse>> processorImpl;
44+
private final BiFunction<TestkitRequest,TestkitState,CompletionStage<TestkitResponse>> processorImpl;
45+
// Some requests require multiple threads
46+
private final Executor requestExecutorService = Executors.newFixedThreadPool( 10 );
4347
private Channel channel;
4448

45-
public TestkitRequestProcessorHandler( boolean asyncMode )
49+
public TestkitRequestProcessorHandler( BackendMode backendMode )
4650
{
47-
if (asyncMode) {
48-
processorImpl = (request, state) -> request.processAsync( state );
49-
} else {
51+
switch ( backendMode )
52+
{
53+
case ASYNC:
5054
processorImpl = TestkitRequestProcessorHandler::wrapSyncRequest;
55+
break;
56+
case REACTIVE:
57+
processorImpl = ( request, state ) -> request.processRx( state ).toFuture();
58+
break;
59+
default:
60+
processorImpl = TestkitRequest::processAsync;
61+
break;
5162
}
5263
}
5364

@@ -62,20 +73,29 @@ public void channelRegistered( ChannelHandlerContext ctx ) throws Exception
6273
public void channelRead( ChannelHandlerContext ctx, Object msg )
6374
{
6475
// Processing is done in a separate thread to avoid blocking EventLoop because some testing logic, like resolvers support, is blocking.
65-
CompletableFuture.supplyAsync( () -> (TestkitRequest) msg )
66-
.thenCompose( request -> processorImpl.apply( request, testkitState ) )
67-
.thenApply( response ->
68-
{
69-
if ( response != null )
70-
{
71-
ctx.writeAndFlush( response );
72-
}
73-
return null;
74-
} ).exceptionally( throwable ->
75-
{
76-
ctx.writeAndFlush( createErrorResponse( throwable ) );
77-
return null;
78-
} );
76+
requestExecutorService.execute( () ->
77+
{
78+
try
79+
{
80+
TestkitRequest request = (TestkitRequest) msg;
81+
CompletionStage<TestkitResponse> responseStage = processorImpl.apply( request, testkitState );
82+
responseStage.whenComplete( ( response, throwable ) ->
83+
{
84+
if ( throwable != null )
85+
{
86+
ctx.writeAndFlush( createErrorResponse( throwable ) );
87+
}
88+
else if ( response != null )
89+
{
90+
ctx.writeAndFlush( response );
91+
}
92+
} );
93+
}
94+
catch ( Throwable throwable )
95+
{
96+
ctx.writeAndFlush( createErrorResponse( throwable ) );
97+
}
98+
} );
7999
}
80100

81101
private static CompletionStage<TestkitResponse> wrapSyncRequest( TestkitRequest testkitRequest, TestkitState testkitState )
@@ -145,4 +165,11 @@ private void writeAndFlush( TestkitResponse response )
145165
}
146166
channel.writeAndFlush( response );
147167
}
168+
169+
public enum BackendMode
170+
{
171+
SYNC,
172+
ASYNC,
173+
REACTIVE
174+
}
148175
}

testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/CheckMultiDBSupport.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import neo4j.org.testkit.backend.TestkitState;
2424
import neo4j.org.testkit.backend.messages.responses.MultiDBSupport;
2525
import neo4j.org.testkit.backend.messages.responses.TestkitResponse;
26+
import reactor.core.publisher.Mono;
2627

2728
import java.util.concurrent.CompletionStage;
2829

@@ -48,6 +49,12 @@ public CompletionStage<TestkitResponse> processAsync( TestkitState testkitState
4849
.thenApply( this::createResponse );
4950
}
5051

52+
@Override
53+
public Mono<TestkitResponse> processRx( TestkitState testkitState )
54+
{
55+
return Mono.fromCompletionStage( processAsync( testkitState ) );
56+
}
57+
5158
private MultiDBSupport createResponse( boolean available )
5259
{
5360
return MultiDBSupport.builder()

testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/DriverClose.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import neo4j.org.testkit.backend.TestkitState;
2424
import neo4j.org.testkit.backend.messages.responses.Driver;
2525
import neo4j.org.testkit.backend.messages.responses.TestkitResponse;
26+
import reactor.core.publisher.Mono;
2627

2728
import java.util.concurrent.CompletionStage;
2829

@@ -47,6 +48,12 @@ public CompletionStage<TestkitResponse> processAsync( TestkitState testkitState
4748
.thenApply( ignored -> createResponse() );
4849
}
4950

51+
@Override
52+
public Mono<TestkitResponse> processRx( TestkitState testkitState )
53+
{
54+
return Mono.fromCompletionStage( processAsync( testkitState ) );
55+
}
56+
5057
private Driver createResponse()
5158
{
5259
return Driver.builder().data( Driver.DriverBody.builder().id( data.getDriverId() ).build() ).build();

testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import neo4j.org.testkit.backend.TestkitState;
2424
import neo4j.org.testkit.backend.messages.responses.FeatureList;
2525
import neo4j.org.testkit.backend.messages.responses.TestkitResponse;
26+
import reactor.core.publisher.Mono;
2627

2728
import java.util.Arrays;
2829
import java.util.Collections;
@@ -62,6 +63,12 @@ public CompletionStage<TestkitResponse> processAsync( TestkitState testkitState
6263
return CompletableFuture.completedFuture( createResponse( COMMON_FEATURES ) );
6364
}
6465

66+
@Override
67+
public Mono<TestkitResponse> processRx( TestkitState testkitState )
68+
{
69+
return Mono.just( createResponse( COMMON_FEATURES ) );
70+
}
71+
6572
private FeatureList createResponse( Set<String> features )
6673
{
6774
return FeatureList.builder().data( FeatureList.FeatureListBody.builder().features( features ).build() ).build();

testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import neo4j.org.testkit.backend.TestkitState;
2424
import neo4j.org.testkit.backend.messages.responses.RoutingTable;
2525
import neo4j.org.testkit.backend.messages.responses.TestkitResponse;
26+
import reactor.core.publisher.Mono;
2627

2728
import java.util.Arrays;
2829
import java.util.List;
@@ -78,7 +79,13 @@ public TestkitResponse process( TestkitState testkitState )
7879
@Override
7980
public CompletionStage<TestkitResponse> processAsync( TestkitState testkitState )
8081
{
81-
return CompletableFuture.completedFuture( process( testkitState ) ) ;
82+
return CompletableFuture.completedFuture( process( testkitState ) );
83+
}
84+
85+
@Override
86+
public Mono<TestkitResponse> processRx( TestkitState testkitState )
87+
{
88+
return Mono.just( process( testkitState ) );
8289
}
8390

8491
@Setter

0 commit comments

Comments
 (0)