Skip to content

Add reactive backend support #998

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,24 @@ public class Runner
{
public static void main( String[] args ) throws InterruptedException
{
boolean asyncMode = args.length > 0 && args[0].equals( "async" );
TestkitRequestProcessorHandler.BackendMode backendMode;
String modeArg = args.length > 0 ? args[0] : null;
if ( "async".equals( modeArg ) )
{
backendMode = TestkitRequestProcessorHandler.BackendMode.ASYNC;
}
else if ( "reactive".equals( modeArg ) )
{
backendMode = TestkitRequestProcessorHandler.BackendMode.REACTIVE;
}
else
{
backendMode = TestkitRequestProcessorHandler.BackendMode.SYNC;
}

EventLoopGroup group = new NioEventLoopGroup();
try

{
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group( group )
Expand All @@ -50,7 +65,7 @@ protected void initChannel( SocketChannel channel )
channel.pipeline().addLast( new TestkitMessageInboundHandler() );
channel.pipeline().addLast( new TestkitMessageOutboundHandler() );
channel.pipeline().addLast( new TestkitRequestResponseMapperHandler() );
channel.pipeline().addLast( new TestkitRequestProcessorHandler( asyncMode ) );
channel.pipeline().addLast( new TestkitRequestProcessorHandler( backendMode ) );
}
} );
ChannelFuture server = bootstrap.bind().sync();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package neo4j.org.testkit.backend;

import lombok.Getter;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.util.concurrent.CompletableFuture;

public class RxBlockingSubscriber<T> implements Subscriber<T>
{
@Getter
private final CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>();
private CompletableFuture<CompletableFuture<T>> nextSignalConsumerFuture;

public void setNextSignalConsumer( CompletableFuture<T> nextSignalConsumer )
{
nextSignalConsumerFuture.complete( nextSignalConsumer );
}

@Override
public void onSubscribe( Subscription s )
{
nextSignalConsumerFuture = new CompletableFuture<>();
subscriptionFuture.complete( s );
}

@Override
public void onNext( T t )
{
blockUntilNextSignalConsumer().complete( t );
}

@Override
public void onError( Throwable t )
{
blockUntilNextSignalConsumer().completeExceptionally( t );
}

@Override
public void onComplete()
{
blockUntilNextSignalConsumer().complete( null );
}

private CompletableFuture<T> blockUntilNextSignalConsumer()
{
CompletableFuture<T> nextSignalConsumer;
try
{
nextSignalConsumer = nextSignalConsumerFuture.get();
}
catch ( Throwable throwable )
{
throw new RuntimeException( "Failed waiting for next signal consumer", throwable );
}
nextSignalConsumerFuture = new CompletableFuture<>();
return nextSignalConsumer;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package neo4j.org.testkit.backend;

import lombok.Getter;
import lombok.Setter;

import java.util.concurrent.CompletableFuture;

import org.neo4j.driver.reactive.RxSession;

@Getter
@Setter
public class RxSessionState
{
public RxSession session;
public CompletableFuture<Void> txWorkFuture;

public RxSessionState( RxSession session )
{
this.session = session;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import lombok.Getter;
import neo4j.org.testkit.backend.messages.requests.TestkitCallbackResult;
import neo4j.org.testkit.backend.messages.responses.TestkitResponse;
import reactor.core.publisher.Mono;

import java.util.HashMap;
import java.util.Map;
Expand All @@ -30,12 +31,15 @@
import java.util.function.Consumer;

import org.neo4j.driver.Driver;
import org.neo4j.driver.Record;
import org.neo4j.driver.Result;
import org.neo4j.driver.Transaction;
import org.neo4j.driver.async.AsyncTransaction;
import org.neo4j.driver.async.ResultCursor;
import org.neo4j.driver.exceptions.Neo4jException;
import org.neo4j.driver.internal.cluster.RoutingTableRegistry;
import org.neo4j.driver.reactive.RxResult;
import org.neo4j.driver.reactive.RxTransaction;

@Getter
public class TestkitState
Expand All @@ -46,12 +50,17 @@ public class TestkitState
private final Map<String,RoutingTableRegistry> routingTableRegistry = new HashMap<>();
private final Map<String,SessionState> sessionStates = new HashMap<>();
private final Map<String,AsyncSessionState> asyncSessionStates = new HashMap<>();
private final Map<String,RxSessionState> rxSessionStates = new HashMap<>();
private final Map<String,Result> results = new HashMap<>();
private final Map<String,ResultCursor> resultCursors = new HashMap<>();
private final Map<String,RxResult> rxResults = new HashMap<>();
private final Map<String,RxBlockingSubscriber<Record>> rxResultIdToRecordSubscriber = new HashMap<>();
@Getter( AccessLevel.NONE )
private final Map<String,Transaction> transactions = new HashMap<>();
@Getter( AccessLevel.NONE )
private final Map<String,AsyncTransaction> asyncTransactions = new HashMap<>();
@Getter( AccessLevel.NONE )
private final Map<String,RxTransaction> rxTransactions = new HashMap<>();
private final Map<String,Neo4jException> errors = new HashMap<>();
@Getter( AccessLevel.NONE )
private final AtomicInteger idGenerator = new AtomicInteger( 0 );
Expand Down Expand Up @@ -101,4 +110,20 @@ public CompletableFuture<AsyncTransaction> getAsyncTransaction( String id )
}
return CompletableFuture.completedFuture( asyncTransactions.get( id ) );
}

public String addRxTransaction( RxTransaction transaction )
{
String id = newId();
this.rxTransactions.put( id, transaction );
return id;
}

public Mono<RxTransaction> getRxTransaction( String id )
{
if ( !this.rxTransactions.containsKey( id ) )
{
return Mono.error( new RuntimeException( TRANSACTION_NOT_FOUND_MESSAGE ) );
}
return Mono.just( rxTransactions.get( id ) );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.BiFunction;

import org.neo4j.driver.exceptions.Neo4jException;
Expand All @@ -39,15 +41,24 @@
public class TestkitRequestProcessorHandler extends ChannelInboundHandlerAdapter
{
private final TestkitState testkitState = new TestkitState( this::writeAndFlush );
private final BiFunction<TestkitRequest, TestkitState, CompletionStage<TestkitResponse>> processorImpl;
private final BiFunction<TestkitRequest,TestkitState,CompletionStage<TestkitResponse>> processorImpl;
// Some requests require multiple threads
private final Executor requestExecutorService = Executors.newFixedThreadPool( 10 );
private Channel channel;

public TestkitRequestProcessorHandler( boolean asyncMode )
public TestkitRequestProcessorHandler( BackendMode backendMode )
{
if (asyncMode) {
processorImpl = (request, state) -> request.processAsync( state );
} else {
switch ( backendMode )
{
case ASYNC:
processorImpl = TestkitRequestProcessorHandler::wrapSyncRequest;
break;
case REACTIVE:
processorImpl = ( request, state ) -> request.processRx( state ).toFuture();
break;
default:
processorImpl = TestkitRequest::processAsync;
break;
}
}

Expand All @@ -62,20 +73,29 @@ public void channelRegistered( ChannelHandlerContext ctx ) throws Exception
public void channelRead( ChannelHandlerContext ctx, Object msg )
{
// Processing is done in a separate thread to avoid blocking EventLoop because some testing logic, like resolvers support, is blocking.
CompletableFuture.supplyAsync( () -> (TestkitRequest) msg )
.thenCompose( request -> processorImpl.apply( request, testkitState ) )
.thenApply( response ->
{
if ( response != null )
{
ctx.writeAndFlush( response );
}
return null;
} ).exceptionally( throwable ->
{
ctx.writeAndFlush( createErrorResponse( throwable ) );
return null;
} );
requestExecutorService.execute( () ->
{
try
{
TestkitRequest request = (TestkitRequest) msg;
CompletionStage<TestkitResponse> responseStage = processorImpl.apply( request, testkitState );
responseStage.whenComplete( ( response, throwable ) ->
{
if ( throwable != null )
{
ctx.writeAndFlush( createErrorResponse( throwable ) );
}
else if ( response != null )
{
ctx.writeAndFlush( response );
}
} );
}
catch ( Throwable throwable )
{
ctx.writeAndFlush( createErrorResponse( throwable ) );
}
} );
}

private static CompletionStage<TestkitResponse> wrapSyncRequest( TestkitRequest testkitRequest, TestkitState testkitState )
Expand Down Expand Up @@ -145,4 +165,11 @@ private void writeAndFlush( TestkitResponse response )
}
channel.writeAndFlush( response );
}

public enum BackendMode
{
SYNC,
ASYNC,
REACTIVE
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import neo4j.org.testkit.backend.TestkitState;
import neo4j.org.testkit.backend.messages.responses.MultiDBSupport;
import neo4j.org.testkit.backend.messages.responses.TestkitResponse;
import reactor.core.publisher.Mono;

import java.util.concurrent.CompletionStage;

Expand All @@ -48,6 +49,12 @@ public CompletionStage<TestkitResponse> processAsync( TestkitState testkitState
.thenApply( this::createResponse );
}

@Override
public Mono<TestkitResponse> processRx( TestkitState testkitState )
{
return Mono.fromCompletionStage( processAsync( testkitState ) );
}

private MultiDBSupport createResponse( boolean available )
{
return MultiDBSupport.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import neo4j.org.testkit.backend.TestkitState;
import neo4j.org.testkit.backend.messages.responses.Driver;
import neo4j.org.testkit.backend.messages.responses.TestkitResponse;
import reactor.core.publisher.Mono;

import java.util.concurrent.CompletionStage;

Expand All @@ -47,6 +48,12 @@ public CompletionStage<TestkitResponse> processAsync( TestkitState testkitState
.thenApply( ignored -> createResponse() );
}

@Override
public Mono<TestkitResponse> processRx( TestkitState testkitState )
{
return Mono.fromCompletionStage( processAsync( testkitState ) );
}

private Driver createResponse()
{
return Driver.builder().data( Driver.DriverBody.builder().id( data.getDriverId() ).build() ).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import neo4j.org.testkit.backend.TestkitState;
import neo4j.org.testkit.backend.messages.responses.FeatureList;
import neo4j.org.testkit.backend.messages.responses.TestkitResponse;
import reactor.core.publisher.Mono;

import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -62,6 +63,12 @@ public CompletionStage<TestkitResponse> processAsync( TestkitState testkitState
return CompletableFuture.completedFuture( createResponse( COMMON_FEATURES ) );
}

@Override
public Mono<TestkitResponse> processRx( TestkitState testkitState )
{
return Mono.just( createResponse( COMMON_FEATURES ) );
}

private FeatureList createResponse( Set<String> features )
{
return FeatureList.builder().data( FeatureList.FeatureListBody.builder().features( features ).build() ).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import neo4j.org.testkit.backend.TestkitState;
import neo4j.org.testkit.backend.messages.responses.RoutingTable;
import neo4j.org.testkit.backend.messages.responses.TestkitResponse;
import reactor.core.publisher.Mono;

import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -78,7 +79,13 @@ public TestkitResponse process( TestkitState testkitState )
@Override
public CompletionStage<TestkitResponse> processAsync( TestkitState testkitState )
{
return CompletableFuture.completedFuture( process( testkitState ) ) ;
return CompletableFuture.completedFuture( process( testkitState ) );
}

@Override
public Mono<TestkitResponse> processRx( TestkitState testkitState )
{
return Mono.just( process( testkitState ) );
}

@Setter
Expand Down
Loading