Skip to content

Commit ddf14e0

Browse files
committed
Add reactive backend support
This update brings reactive backend support. Its scope is limited to providing partial transparent support for existing test cases. More updates are expected in future PRs.
1 parent 4238f82 commit ddf14e0

31 files changed

+660
-43
lines changed

testkit-backend/src/main/java/neo4j/org/testkit/backend/Runner.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,24 @@ public class Runner
3434
{
3535
public static void main( String[] args ) throws InterruptedException
3636
{
37-
boolean asyncMode = args.length > 0 && args[0].equals( "async" );
37+
TestkitRequestProcessorHandler.BackendMode backendMode;
38+
String modeArg = args.length > 0 ? args[0] : null;
39+
if ( "async".equals( modeArg ) )
40+
{
41+
backendMode = TestkitRequestProcessorHandler.BackendMode.ASYNC;
42+
}
43+
else if ( "reactive".equals( modeArg ) )
44+
{
45+
backendMode = TestkitRequestProcessorHandler.BackendMode.REACTIVE;
46+
}
47+
else
48+
{
49+
backendMode = TestkitRequestProcessorHandler.BackendMode.SYNC;
50+
}
51+
3852
EventLoopGroup group = new NioEventLoopGroup();
3953
try
54+
4055
{
4156
ServerBootstrap bootstrap = new ServerBootstrap();
4257
bootstrap.group( group )
@@ -50,7 +65,7 @@ protected void initChannel( SocketChannel channel )
5065
channel.pipeline().addLast( new TestkitMessageInboundHandler() );
5166
channel.pipeline().addLast( new TestkitMessageOutboundHandler() );
5267
channel.pipeline().addLast( new TestkitRequestResponseMapperHandler() );
53-
channel.pipeline().addLast( new TestkitRequestProcessorHandler( asyncMode ) );
68+
channel.pipeline().addLast( new TestkitRequestProcessorHandler( backendMode ) );
5469
}
5570
} );
5671
ChannelFuture server = bootstrap.bind().sync();
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package neo4j.org.testkit.backend;
20+
21+
import lombok.Getter;
22+
import org.reactivestreams.Subscriber;
23+
import org.reactivestreams.Subscription;
24+
25+
import java.util.concurrent.CompletableFuture;
26+
27+
public class RxBlockingSubscriber<T> implements Subscriber<T>
28+
{
29+
@Getter
30+
private final CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>();
31+
private CompletableFuture<CompletableFuture<T>> nextSignalConsumerFuture;
32+
33+
public void setNextSignalConsumer( CompletableFuture<T> nextSignalConsumer )
34+
{
35+
nextSignalConsumerFuture.complete( nextSignalConsumer );
36+
}
37+
38+
@Override
39+
public void onSubscribe( Subscription s )
40+
{
41+
nextSignalConsumerFuture = new CompletableFuture<>();
42+
subscriptionFuture.complete( s );
43+
}
44+
45+
@Override
46+
public void onNext( T t )
47+
{
48+
blockUntilNextSignalConsumer().complete( t );
49+
}
50+
51+
@Override
52+
public void onError( Throwable t )
53+
{
54+
blockUntilNextSignalConsumer().completeExceptionally( t );
55+
}
56+
57+
@Override
58+
public void onComplete()
59+
{
60+
blockUntilNextSignalConsumer().complete( null );
61+
}
62+
63+
private CompletableFuture<T> blockUntilNextSignalConsumer()
64+
{
65+
CompletableFuture<T> nextSignalConsumer;
66+
try
67+
{
68+
nextSignalConsumer = nextSignalConsumerFuture.get();
69+
}
70+
catch ( Throwable throwable )
71+
{
72+
throw new RuntimeException( "Failed waiting for next signal consumer", throwable );
73+
}
74+
nextSignalConsumerFuture = new CompletableFuture<>();
75+
return nextSignalConsumer;
76+
}
77+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package neo4j.org.testkit.backend;
20+
21+
import lombok.Getter;
22+
import lombok.Setter;
23+
24+
import java.util.concurrent.CompletableFuture;
25+
26+
import org.neo4j.driver.reactive.RxSession;
27+
28+
@Getter
29+
@Setter
30+
public class RxSessionState
31+
{
32+
public RxSession session;
33+
public CompletableFuture<Void> txWorkFuture;
34+
35+
public RxSessionState( RxSession session )
36+
{
37+
this.session = session;
38+
}
39+
}

testkit-backend/src/main/java/neo4j/org/testkit/backend/TestkitState.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import lombok.Getter;
2323
import neo4j.org.testkit.backend.messages.requests.TestkitCallbackResult;
2424
import neo4j.org.testkit.backend.messages.responses.TestkitResponse;
25+
import reactor.core.publisher.Mono;
2526

2627
import java.util.HashMap;
2728
import java.util.Map;
@@ -30,12 +31,15 @@
3031
import java.util.function.Consumer;
3132

3233
import org.neo4j.driver.Driver;
34+
import org.neo4j.driver.Record;
3335
import org.neo4j.driver.Result;
3436
import org.neo4j.driver.Transaction;
3537
import org.neo4j.driver.async.AsyncTransaction;
3638
import org.neo4j.driver.async.ResultCursor;
3739
import org.neo4j.driver.exceptions.Neo4jException;
3840
import org.neo4j.driver.internal.cluster.RoutingTableRegistry;
41+
import org.neo4j.driver.reactive.RxResult;
42+
import org.neo4j.driver.reactive.RxTransaction;
3943

4044
@Getter
4145
public class TestkitState
@@ -46,12 +50,17 @@ public class TestkitState
4650
private final Map<String,RoutingTableRegistry> routingTableRegistry = new HashMap<>();
4751
private final Map<String,SessionState> sessionStates = new HashMap<>();
4852
private final Map<String,AsyncSessionState> asyncSessionStates = new HashMap<>();
53+
private final Map<String,RxSessionState> rxSessionStates = new HashMap<>();
4954
private final Map<String,Result> results = new HashMap<>();
5055
private final Map<String,ResultCursor> resultCursors = new HashMap<>();
56+
private final Map<String,RxResult> rxResults = new HashMap<>();
57+
private final Map<String,RxBlockingSubscriber<Record>> rxResultIdToRecordSubscriber = new HashMap<>();
5158
@Getter( AccessLevel.NONE )
5259
private final Map<String,Transaction> transactions = new HashMap<>();
5360
@Getter( AccessLevel.NONE )
5461
private final Map<String,AsyncTransaction> asyncTransactions = new HashMap<>();
62+
@Getter( AccessLevel.NONE )
63+
private final Map<String,RxTransaction> rxTransactions = new HashMap<>();
5564
private final Map<String,Neo4jException> errors = new HashMap<>();
5665
@Getter( AccessLevel.NONE )
5766
private final AtomicInteger idGenerator = new AtomicInteger( 0 );
@@ -101,4 +110,20 @@ public CompletableFuture<AsyncTransaction> getAsyncTransaction( String id )
101110
}
102111
return CompletableFuture.completedFuture( asyncTransactions.get( id ) );
103112
}
113+
114+
public String addRxTransaction( RxTransaction transaction )
115+
{
116+
String id = newId();
117+
this.rxTransactions.put( id, transaction );
118+
return id;
119+
}
120+
121+
public Mono<RxTransaction> getRxTransaction( String id )
122+
{
123+
if ( !this.rxTransactions.containsKey( id ) )
124+
{
125+
return Mono.error( new RuntimeException( TRANSACTION_NOT_FOUND_MESSAGE ) );
126+
}
127+
return Mono.just( rxTransactions.get( id ) );
128+
}
104129
}

testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestProcessorHandler.java

Lines changed: 46 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import java.util.concurrent.CompletableFuture;
3131
import java.util.concurrent.CompletionException;
3232
import java.util.concurrent.CompletionStage;
33+
import java.util.concurrent.Executor;
34+
import java.util.concurrent.Executors;
3335
import java.util.function.BiFunction;
3436

3537
import org.neo4j.driver.exceptions.Neo4jException;
@@ -39,15 +41,24 @@
3941
public class TestkitRequestProcessorHandler extends ChannelInboundHandlerAdapter
4042
{
4143
private final TestkitState testkitState = new TestkitState( this::writeAndFlush );
42-
private final BiFunction<TestkitRequest, TestkitState, CompletionStage<TestkitResponse>> processorImpl;
44+
private final BiFunction<TestkitRequest,TestkitState,CompletionStage<TestkitResponse>> processorImpl;
45+
// Some requests require multiple threads
46+
private final Executor requestExecutorService = Executors.newFixedThreadPool( 10 );
4347
private Channel channel;
4448

45-
public TestkitRequestProcessorHandler( boolean asyncMode )
49+
public TestkitRequestProcessorHandler( BackendMode backendMode )
4650
{
47-
if (asyncMode) {
48-
processorImpl = (request, state) -> request.processAsync( state );
49-
} else {
51+
switch ( backendMode )
52+
{
53+
case ASYNC:
5054
processorImpl = TestkitRequestProcessorHandler::wrapSyncRequest;
55+
break;
56+
case REACTIVE:
57+
processorImpl = ( request, state ) -> request.processRx( state ).toFuture();
58+
break;
59+
default:
60+
processorImpl = TestkitRequest::processAsync;
61+
break;
5162
}
5263
}
5364

@@ -62,20 +73,29 @@ public void channelRegistered( ChannelHandlerContext ctx ) throws Exception
6273
public void channelRead( ChannelHandlerContext ctx, Object msg )
6374
{
6475
// Processing is done in a separate thread to avoid blocking EventLoop because some testing logic, like resolvers support, is blocking.
65-
CompletableFuture.supplyAsync( () -> (TestkitRequest) msg )
66-
.thenCompose( request -> processorImpl.apply( request, testkitState ) )
67-
.thenApply( response ->
68-
{
69-
if ( response != null )
70-
{
71-
ctx.writeAndFlush( response );
72-
}
73-
return null;
74-
} ).exceptionally( throwable ->
75-
{
76-
ctx.writeAndFlush( createErrorResponse( throwable ) );
77-
return null;
78-
} );
76+
requestExecutorService.execute( () ->
77+
{
78+
try
79+
{
80+
TestkitRequest request = (TestkitRequest) msg;
81+
CompletionStage<TestkitResponse> responseStage = processorImpl.apply( request, testkitState );
82+
responseStage.whenComplete( ( response, throwable ) ->
83+
{
84+
if ( throwable != null )
85+
{
86+
ctx.writeAndFlush( createErrorResponse( throwable ) );
87+
}
88+
else if ( response != null )
89+
{
90+
ctx.writeAndFlush( response );
91+
}
92+
} );
93+
}
94+
catch ( Throwable throwable )
95+
{
96+
ctx.writeAndFlush( createErrorResponse( throwable ) );
97+
}
98+
} );
7999
}
80100

81101
private static CompletionStage<TestkitResponse> wrapSyncRequest( TestkitRequest testkitRequest, TestkitState testkitState )
@@ -145,4 +165,11 @@ private void writeAndFlush( TestkitResponse response )
145165
}
146166
channel.writeAndFlush( response );
147167
}
168+
169+
public enum BackendMode
170+
{
171+
SYNC,
172+
ASYNC,
173+
REACTIVE
174+
}
148175
}

testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/CheckMultiDBSupport.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import neo4j.org.testkit.backend.TestkitState;
2424
import neo4j.org.testkit.backend.messages.responses.MultiDBSupport;
2525
import neo4j.org.testkit.backend.messages.responses.TestkitResponse;
26+
import reactor.core.publisher.Mono;
2627

2728
import java.util.concurrent.CompletionStage;
2829

@@ -48,6 +49,12 @@ public CompletionStage<TestkitResponse> processAsync( TestkitState testkitState
4849
.thenApply( this::createResponse );
4950
}
5051

52+
@Override
53+
public Mono<TestkitResponse> processRx( TestkitState testkitState )
54+
{
55+
return Mono.fromCompletionStage( processAsync( testkitState ) );
56+
}
57+
5158
private MultiDBSupport createResponse( boolean available )
5259
{
5360
return MultiDBSupport.builder()

testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/DriverClose.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import neo4j.org.testkit.backend.TestkitState;
2424
import neo4j.org.testkit.backend.messages.responses.Driver;
2525
import neo4j.org.testkit.backend.messages.responses.TestkitResponse;
26+
import reactor.core.publisher.Mono;
2627

2728
import java.util.concurrent.CompletionStage;
2829

@@ -47,6 +48,12 @@ public CompletionStage<TestkitResponse> processAsync( TestkitState testkitState
4748
.thenApply( ignored -> createResponse() );
4849
}
4950

51+
@Override
52+
public Mono<TestkitResponse> processRx( TestkitState testkitState )
53+
{
54+
return Mono.fromCompletionStage( processAsync( testkitState ) );
55+
}
56+
5057
private Driver createResponse()
5158
{
5259
return Driver.builder().data( Driver.DriverBody.builder().id( data.getDriverId() ).build() ).build();

testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import neo4j.org.testkit.backend.TestkitState;
2424
import neo4j.org.testkit.backend.messages.responses.FeatureList;
2525
import neo4j.org.testkit.backend.messages.responses.TestkitResponse;
26+
import reactor.core.publisher.Mono;
2627

2728
import java.util.Arrays;
2829
import java.util.Collections;
@@ -62,6 +63,12 @@ public CompletionStage<TestkitResponse> processAsync( TestkitState testkitState
6263
return CompletableFuture.completedFuture( createResponse( COMMON_FEATURES ) );
6364
}
6465

66+
@Override
67+
public Mono<TestkitResponse> processRx( TestkitState testkitState )
68+
{
69+
return Mono.just( createResponse( COMMON_FEATURES ) );
70+
}
71+
6572
private FeatureList createResponse( Set<String> features )
6673
{
6774
return FeatureList.builder().data( FeatureList.FeatureListBody.builder().features( features ).build() ).build();

testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import neo4j.org.testkit.backend.TestkitState;
2424
import neo4j.org.testkit.backend.messages.responses.RoutingTable;
2525
import neo4j.org.testkit.backend.messages.responses.TestkitResponse;
26+
import reactor.core.publisher.Mono;
2627

2728
import java.util.Arrays;
2829
import java.util.List;
@@ -78,7 +79,13 @@ public TestkitResponse process( TestkitState testkitState )
7879
@Override
7980
public CompletionStage<TestkitResponse> processAsync( TestkitState testkitState )
8081
{
81-
return CompletableFuture.completedFuture( process( testkitState ) ) ;
82+
return CompletableFuture.completedFuture( process( testkitState ) );
83+
}
84+
85+
@Override
86+
public Mono<TestkitResponse> processRx( TestkitState testkitState )
87+
{
88+
return Mono.just( process( testkitState ) );
8289
}
8390

8491
@Setter

0 commit comments

Comments
 (0)