diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/Runner.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/Runner.java index a35986683a..b1f09baeea 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/Runner.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/Runner.java @@ -34,9 +34,24 @@ public class Runner { public static void main( String[] args ) throws InterruptedException { - boolean asyncMode = args.length > 0 && args[0].equals( "async" ); + TestkitRequestProcessorHandler.BackendMode backendMode; + String modeArg = args.length > 0 ? args[0] : null; + if ( "async".equals( modeArg ) ) + { + backendMode = TestkitRequestProcessorHandler.BackendMode.ASYNC; + } + else if ( "reactive".equals( modeArg ) ) + { + backendMode = TestkitRequestProcessorHandler.BackendMode.REACTIVE; + } + else + { + backendMode = TestkitRequestProcessorHandler.BackendMode.SYNC; + } + EventLoopGroup group = new NioEventLoopGroup(); try + { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group( group ) @@ -50,7 +65,7 @@ protected void initChannel( SocketChannel channel ) channel.pipeline().addLast( new TestkitMessageInboundHandler() ); channel.pipeline().addLast( new TestkitMessageOutboundHandler() ); channel.pipeline().addLast( new TestkitRequestResponseMapperHandler() ); - channel.pipeline().addLast( new TestkitRequestProcessorHandler( asyncMode ) ); + channel.pipeline().addLast( new TestkitRequestProcessorHandler( backendMode ) ); } } ); ChannelFuture server = bootstrap.bind().sync(); diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/RxBlockingSubscriber.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/RxBlockingSubscriber.java new file mode 100644 index 0000000000..7658123cf0 --- /dev/null +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/RxBlockingSubscriber.java @@ -0,0 +1,77 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package neo4j.org.testkit.backend; + +import lombok.Getter; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import java.util.concurrent.CompletableFuture; + +public class RxBlockingSubscriber implements Subscriber +{ + @Getter + private final CompletableFuture subscriptionFuture = new CompletableFuture<>(); + private CompletableFuture> nextSignalConsumerFuture; + + public void setNextSignalConsumer( CompletableFuture nextSignalConsumer ) + { + nextSignalConsumerFuture.complete( nextSignalConsumer ); + } + + @Override + public void onSubscribe( Subscription s ) + { + nextSignalConsumerFuture = new CompletableFuture<>(); + subscriptionFuture.complete( s ); + } + + @Override + public void onNext( T t ) + { + blockUntilNextSignalConsumer().complete( t ); + } + + @Override + public void onError( Throwable t ) + { + blockUntilNextSignalConsumer().completeExceptionally( t ); + } + + @Override + public void onComplete() + { + blockUntilNextSignalConsumer().complete( null ); + } + + private CompletableFuture blockUntilNextSignalConsumer() + { + CompletableFuture nextSignalConsumer; + try + { + nextSignalConsumer = nextSignalConsumerFuture.get(); + } + catch ( Throwable throwable ) + { + throw new RuntimeException( "Failed waiting for next signal consumer", throwable ); + } + nextSignalConsumerFuture = new CompletableFuture<>(); + return nextSignalConsumer; + } +} diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/RxSessionState.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/RxSessionState.java new file mode 100644 index 0000000000..0dfe5524e9 --- /dev/null +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/RxSessionState.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package neo4j.org.testkit.backend; + +import lombok.Getter; +import lombok.Setter; + +import java.util.concurrent.CompletableFuture; + +import org.neo4j.driver.reactive.RxSession; + +@Getter +@Setter +public class RxSessionState +{ + public RxSession session; + public CompletableFuture txWorkFuture; + + public RxSessionState( RxSession session ) + { + this.session = session; + } +} diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/TestkitState.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/TestkitState.java index 535b177e2e..33671d533a 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/TestkitState.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/TestkitState.java @@ -22,6 +22,7 @@ import lombok.Getter; import neo4j.org.testkit.backend.messages.requests.TestkitCallbackResult; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; +import reactor.core.publisher.Mono; import java.util.HashMap; import java.util.Map; @@ -30,12 +31,15 @@ import java.util.function.Consumer; import org.neo4j.driver.Driver; +import org.neo4j.driver.Record; import org.neo4j.driver.Result; import org.neo4j.driver.Transaction; import org.neo4j.driver.async.AsyncTransaction; import org.neo4j.driver.async.ResultCursor; import org.neo4j.driver.exceptions.Neo4jException; import org.neo4j.driver.internal.cluster.RoutingTableRegistry; +import org.neo4j.driver.reactive.RxResult; +import org.neo4j.driver.reactive.RxTransaction; @Getter public class TestkitState @@ -46,12 +50,17 @@ public class TestkitState private final Map routingTableRegistry = new HashMap<>(); private final Map sessionStates = new HashMap<>(); private final Map asyncSessionStates = new HashMap<>(); + private final Map rxSessionStates = new HashMap<>(); private final Map results = new HashMap<>(); private final Map resultCursors = new HashMap<>(); + private final Map rxResults = new HashMap<>(); + private final Map> rxResultIdToRecordSubscriber = new HashMap<>(); @Getter( AccessLevel.NONE ) private final Map transactions = new HashMap<>(); @Getter( AccessLevel.NONE ) private final Map asyncTransactions = new HashMap<>(); + @Getter( AccessLevel.NONE ) + private final Map rxTransactions = new HashMap<>(); private final Map errors = new HashMap<>(); @Getter( AccessLevel.NONE ) private final AtomicInteger idGenerator = new AtomicInteger( 0 ); @@ -101,4 +110,20 @@ public CompletableFuture getAsyncTransaction( String id ) } return CompletableFuture.completedFuture( asyncTransactions.get( id ) ); } + + public String addRxTransaction( RxTransaction transaction ) + { + String id = newId(); + this.rxTransactions.put( id, transaction ); + return id; + } + + public Mono getRxTransaction( String id ) + { + if ( !this.rxTransactions.containsKey( id ) ) + { + return Mono.error( new RuntimeException( TRANSACTION_NOT_FOUND_MESSAGE ) ); + } + return Mono.just( rxTransactions.get( id ) ); + } } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestProcessorHandler.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestProcessorHandler.java index 5060bf5428..83b97b85be 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestProcessorHandler.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestProcessorHandler.java @@ -30,6 +30,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; import java.util.function.BiFunction; import org.neo4j.driver.exceptions.Neo4jException; @@ -39,15 +41,24 @@ public class TestkitRequestProcessorHandler extends ChannelInboundHandlerAdapter { private final TestkitState testkitState = new TestkitState( this::writeAndFlush ); - private final BiFunction> processorImpl; + private final BiFunction> processorImpl; + // Some requests require multiple threads + private final Executor requestExecutorService = Executors.newFixedThreadPool( 10 ); private Channel channel; - public TestkitRequestProcessorHandler( boolean asyncMode ) + public TestkitRequestProcessorHandler( BackendMode backendMode ) { - if (asyncMode) { - processorImpl = (request, state) -> request.processAsync( state ); - } else { + switch ( backendMode ) + { + case ASYNC: processorImpl = TestkitRequestProcessorHandler::wrapSyncRequest; + break; + case REACTIVE: + processorImpl = ( request, state ) -> request.processRx( state ).toFuture(); + break; + default: + processorImpl = TestkitRequest::processAsync; + break; } } @@ -62,20 +73,29 @@ public void channelRegistered( ChannelHandlerContext ctx ) throws Exception public void channelRead( ChannelHandlerContext ctx, Object msg ) { // Processing is done in a separate thread to avoid blocking EventLoop because some testing logic, like resolvers support, is blocking. - CompletableFuture.supplyAsync( () -> (TestkitRequest) msg ) - .thenCompose( request -> processorImpl.apply( request, testkitState ) ) - .thenApply( response -> - { - if ( response != null ) - { - ctx.writeAndFlush( response ); - } - return null; - } ).exceptionally( throwable -> - { - ctx.writeAndFlush( createErrorResponse( throwable ) ); - return null; - } ); + requestExecutorService.execute( () -> + { + try + { + TestkitRequest request = (TestkitRequest) msg; + CompletionStage responseStage = processorImpl.apply( request, testkitState ); + responseStage.whenComplete( ( response, throwable ) -> + { + if ( throwable != null ) + { + ctx.writeAndFlush( createErrorResponse( throwable ) ); + } + else if ( response != null ) + { + ctx.writeAndFlush( response ); + } + } ); + } + catch ( Throwable throwable ) + { + ctx.writeAndFlush( createErrorResponse( throwable ) ); + } + } ); } private static CompletionStage wrapSyncRequest( TestkitRequest testkitRequest, TestkitState testkitState ) @@ -145,4 +165,11 @@ private void writeAndFlush( TestkitResponse response ) } channel.writeAndFlush( response ); } + + public enum BackendMode + { + SYNC, + ASYNC, + REACTIVE + } } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/CheckMultiDBSupport.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/CheckMultiDBSupport.java index 7ecfecf886..332280064f 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/CheckMultiDBSupport.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/CheckMultiDBSupport.java @@ -23,6 +23,7 @@ import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.messages.responses.MultiDBSupport; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; +import reactor.core.publisher.Mono; import java.util.concurrent.CompletionStage; @@ -48,6 +49,12 @@ public CompletionStage processAsync( TestkitState testkitState .thenApply( this::createResponse ); } + @Override + public Mono processRx( TestkitState testkitState ) + { + return Mono.fromCompletionStage( processAsync( testkitState ) ); + } + private MultiDBSupport createResponse( boolean available ) { return MultiDBSupport.builder() diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/DriverClose.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/DriverClose.java index 1017e159ec..2ba3e2f762 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/DriverClose.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/DriverClose.java @@ -23,6 +23,7 @@ import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.messages.responses.Driver; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; +import reactor.core.publisher.Mono; import java.util.concurrent.CompletionStage; @@ -47,6 +48,12 @@ public CompletionStage processAsync( TestkitState testkitState .thenApply( ignored -> createResponse() ); } + @Override + public Mono processRx( TestkitState testkitState ) + { + return Mono.fromCompletionStage( processAsync( testkitState ) ); + } + private Driver createResponse() { return Driver.builder().data( Driver.DriverBody.builder().id( data.getDriverId() ).build() ).build(); diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java index 7389f69d8d..ffed5b782c 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java @@ -23,6 +23,7 @@ import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.messages.responses.FeatureList; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; +import reactor.core.publisher.Mono; import java.util.Arrays; import java.util.Collections; @@ -62,6 +63,12 @@ public CompletionStage processAsync( TestkitState testkitState return CompletableFuture.completedFuture( createResponse( COMMON_FEATURES ) ); } + @Override + public Mono processRx( TestkitState testkitState ) + { + return Mono.just( createResponse( COMMON_FEATURES ) ); + } + private FeatureList createResponse( Set features ) { return FeatureList.builder().data( FeatureList.FeatureListBody.builder().features( features ).build() ).build(); diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java index 2448c1a45d..41d2bf0d66 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java @@ -23,6 +23,7 @@ import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.messages.responses.RoutingTable; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; +import reactor.core.publisher.Mono; import java.util.Arrays; import java.util.List; @@ -78,7 +79,13 @@ public TestkitResponse process( TestkitState testkitState ) @Override public CompletionStage processAsync( TestkitState testkitState ) { - return CompletableFuture.completedFuture( process( testkitState ) ) ; + return CompletableFuture.completedFuture( process( testkitState ) ); + } + + @Override + public Mono processRx( TestkitState testkitState ) + { + return Mono.just( process( testkitState ) ); } @Setter diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java index eaddb1f147..a919a10b85 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java @@ -29,6 +29,7 @@ import neo4j.org.testkit.backend.messages.responses.ResolverResolutionRequired; import neo4j.org.testkit.backend.messages.responses.TestkitCallback; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; +import reactor.core.publisher.Mono; import java.net.InetAddress; import java.net.URI; @@ -118,6 +119,12 @@ public CompletionStage processAsync( TestkitState testkitState return CompletableFuture.completedFuture( process( testkitState ) ); } + @Override + public Mono processRx( TestkitState testkitState ) + { + return Mono.fromCompletionStage( processAsync( testkitState ) ); + } + private ServerAddressResolver callbackResolver( TestkitState testkitState ) { return address -> diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewSession.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewSession.java index 889d18377c..0f21cbd287 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewSession.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewSession.java @@ -21,10 +21,12 @@ import lombok.Getter; import lombok.Setter; import neo4j.org.testkit.backend.AsyncSessionState; +import neo4j.org.testkit.backend.RxSessionState; import neo4j.org.testkit.backend.SessionState; import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.messages.responses.Session; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; +import reactor.core.publisher.Mono; import java.util.List; import java.util.Map; @@ -54,11 +56,18 @@ public TestkitResponse process( TestkitState testkitState ) @Override public CompletionStage processAsync( TestkitState testkitState ) { - return CompletableFuture.completedFuture( createSessionStateAndResponse( testkitState, this::createAsyncSessionState, testkitState.getAsyncSessionStates() ) ); + return CompletableFuture.completedFuture( + createSessionStateAndResponse( testkitState, this::createAsyncSessionState, testkitState.getAsyncSessionStates() ) ); } - private TestkitResponse createSessionStateAndResponse( TestkitState testkitState, BiFunction sessionStateProducer, - Map sessionStateContainer ) + @Override + public Mono processRx( TestkitState testkitState ) + { + return Mono.just( createSessionStateAndResponse( testkitState, this::createRxSessionState, testkitState.getRxSessionStates() ) ); + } + + protected TestkitResponse createSessionStateAndResponse( TestkitState testkitState, BiFunction sessionStateProducer, + Map sessionStateContainer ) { Driver driver = testkitState.getDrivers().get( data.getDriverId() ); AccessMode formattedAccessMode = data.getAccessMode().equals( "r" ) ? AccessMode.READ : AccessMode.WRITE; @@ -93,6 +102,11 @@ private AsyncSessionState createAsyncSessionState( Driver driver, SessionConfig return new AsyncSessionState( driver.asyncSession( sessionConfig ) ); } + private RxSessionState createRxSessionState( Driver driver, SessionConfig sessionConfig ) + { + return new RxSessionState( driver.rxSession( sessionConfig ) ); + } + @Setter @Getter public static class NewSessionBody diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultConsume.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultConsume.java index 11778a077b..fe65bbd8cc 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultConsume.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultConsume.java @@ -24,11 +24,13 @@ import neo4j.org.testkit.backend.messages.responses.NullRecord; import neo4j.org.testkit.backend.messages.responses.Summary; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; +import reactor.core.publisher.Mono; import java.util.concurrent.CompletionStage; import org.neo4j.driver.Result; import org.neo4j.driver.exceptions.NoSuchRecordException; +import org.neo4j.driver.reactive.RxResult; @Setter @Getter @@ -58,6 +60,14 @@ public CompletionStage processAsync( TestkitState testkitState .thenApply( this::createResponse ); } + @Override + public Mono processRx( TestkitState testkitState ) + { + RxResult result = testkitState.getRxResults().get( data.getResultId() ); + return Mono.fromDirect( result.consume() ) + .map( this::createResponse ); + } + private Summary createResponse( org.neo4j.driver.summary.ResultSummary summary ) { Summary.ServerInfo serverInfo = Summary.ServerInfo.builder() diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultList.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultList.java index 07a3e62e2e..f0ec4d563c 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultList.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultList.java @@ -24,6 +24,7 @@ import neo4j.org.testkit.backend.messages.responses.Record; import neo4j.org.testkit.backend.messages.responses.RecordList; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; +import reactor.core.publisher.Mono; import java.util.List; import java.util.concurrent.CompletionStage; @@ -49,6 +50,12 @@ public CompletionStage processAsync( TestkitState testkitState .thenApply( this::createResponse ); } + @Override + public Mono processRx( TestkitState testkitState ) + { + throw new UnsupportedOperationException( "Operation not supported" ); + } + private RecordList createResponse( List records ) { List mappedRecords = records.stream() diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultNext.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultNext.java index d79e422f53..012d84a715 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultNext.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultNext.java @@ -20,15 +20,20 @@ import lombok.Getter; import lombok.Setter; +import neo4j.org.testkit.backend.RxBlockingSubscriber; import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.messages.responses.NullRecord; import neo4j.org.testkit.backend.messages.responses.Record; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; +import reactor.core.publisher.Mono; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.function.Function; import org.neo4j.driver.Result; import org.neo4j.driver.exceptions.NoSuchRecordException; +import org.neo4j.driver.reactive.RxResult; @Setter @Getter @@ -58,6 +63,44 @@ public CompletionStage processAsync( TestkitState testkitState .thenApply( record -> record != null ? createResponse( record ) : NullRecord.builder().build() ); } + @Override + public Mono processRx( TestkitState testkitState ) + { + CompletableFuture> subscriberFuture; + String resultId = data.getResultId(); + RxBlockingSubscriber currentSubscriber = testkitState.getRxResultIdToRecordSubscriber().get( resultId ); + + if ( currentSubscriber == null ) + { + RxBlockingSubscriber subscriber = new RxBlockingSubscriber<>(); + subscriberFuture = subscriber.getSubscriptionFuture().thenApply( subscription -> + { + subscription.request( 1000 ); + return subscriber; + } ); + testkitState.getRxResultIdToRecordSubscriber().put( resultId, subscriber ); + RxResult result = testkitState.getRxResults().get( resultId ); + result.records().subscribe( subscriber ); + } + else + { + subscriberFuture = CompletableFuture.completedFuture( currentSubscriber ); + } + + CompletableFuture responseFuture = subscriberFuture + .thenApply( recordsSubscriber -> + { + CompletableFuture recordConsumer = + new CompletableFuture<>(); + recordsSubscriber.setNextSignalConsumer( recordConsumer ); + return recordConsumer; + } ) + .thenCompose( Function.identity() ) + .thenApply( record -> record != null ? createResponse( record ) : NullRecord.builder().build() ); + + return Mono.fromCompletionStage( responseFuture ); + } + private Record createResponse( org.neo4j.driver.Record record ) { return Record.builder().data( Record.RecordBody.builder().values( record ).build() ).build(); diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryableNegative.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryableNegative.java index bde0bff0ac..c412b358ba 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryableNegative.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryableNegative.java @@ -21,9 +21,11 @@ import lombok.Getter; import lombok.Setter; import neo4j.org.testkit.backend.AsyncSessionState; +import neo4j.org.testkit.backend.RxSessionState; import neo4j.org.testkit.backend.SessionState; import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; +import reactor.core.publisher.Mono; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -72,6 +74,23 @@ public CompletionStage processAsync( TestkitState testkitState return CompletableFuture.completedFuture( null ); } + @Override + public Mono processRx( TestkitState testkitState ) + { + RxSessionState sessionState = testkitState.getRxSessionStates().get( data.getSessionId() ); + Throwable throwable; + if ( !"".equals( data.getErrorId() ) ) + { + throwable = testkitState.getErrors().get( data.getErrorId() ); + } + else + { + throwable = new RuntimeException( "Error from client in retryable tx" ); + } + sessionState.getTxWorkFuture().completeExceptionally( throwable ); + return Mono.empty(); + } + @Setter @Getter public static class RetryableNegativeBody diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryablePositive.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryablePositive.java index 2f0676ea91..d4f5b28b70 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryablePositive.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryablePositive.java @@ -23,6 +23,7 @@ import neo4j.org.testkit.backend.SessionState; import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; +import reactor.core.publisher.Mono; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -52,6 +53,13 @@ public CompletionStage processAsync( TestkitState testkitState return CompletableFuture.completedFuture( null ); } + @Override + public Mono processRx( TestkitState testkitState ) + { + testkitState.getRxSessionStates().get( data.getSessionId() ).getTxWorkFuture().complete( null ); + return Mono.empty(); + } + @Setter @Getter public static class RetryablePositiveBody diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionBeginTransaction.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionBeginTransaction.java index c9a75aaa5e..0c12a6f61a 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionBeginTransaction.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionBeginTransaction.java @@ -21,10 +21,12 @@ import lombok.Getter; import lombok.Setter; import neo4j.org.testkit.backend.AsyncSessionState; +import neo4j.org.testkit.backend.RxSessionState; import neo4j.org.testkit.backend.SessionState; import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; import neo4j.org.testkit.backend.messages.responses.Transaction; +import reactor.core.publisher.Mono; import java.time.Duration; import java.util.Map; @@ -34,6 +36,7 @@ import org.neo4j.driver.TransactionConfig; import org.neo4j.driver.async.AsyncSession; +import org.neo4j.driver.reactive.RxSession; @Setter @Getter @@ -86,6 +89,30 @@ public CompletionStage processAsync( TestkitState testkitState } } + @Override + public Mono processRx( TestkitState testkitState ) + { + RxSessionState sessionState = testkitState.getRxSessionStates().get( data.getSessionId() ); + if ( sessionState != null ) + { + RxSession session = sessionState.getSession(); + TransactionConfig.Builder builder = TransactionConfig.builder(); + Optional.ofNullable( data.txMeta ).ifPresent( builder::withMetadata ); + + if ( data.getTimeout() != null ) + { + builder.withTimeout( Duration.ofMillis( data.getTimeout() ) ); + } + + return Mono.fromDirect( session.beginTransaction( builder.build() ) ) + .map( tx -> transaction( testkitState.addRxTransaction( tx ) ) ); + } + else + { + return Mono.error( new RuntimeException( "Could not find session" ) ); + } + } + private Transaction transaction( String txId ) { return Transaction.builder().data( Transaction.TransactionBody.builder().id( txId ).build() ).build(); diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionClose.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionClose.java index d942a2cff5..0bbc94e1a7 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionClose.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionClose.java @@ -23,6 +23,7 @@ import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.messages.responses.Session; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; +import reactor.core.publisher.Mono; import java.util.concurrent.CompletionStage; @@ -47,6 +48,13 @@ public CompletionStage processAsync( TestkitState testkitState .thenApply( ignored -> createResponse() ); } + @Override + public Mono processRx( TestkitState testkitState ) + { + return Mono.fromDirect( testkitState.getRxSessionStates().get( data.getSessionId() ).getSession().close() ) + .then( Mono.just( createResponse() ) ); + } + private Session createResponse() { return Session.builder().data( Session.SessionBody.builder().id( data.getSessionId() ).build() ).build(); diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionLastBookmarks.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionLastBookmarks.java index 1016a3ca41..5d4bedd22f 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionLastBookmarks.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionLastBookmarks.java @@ -24,6 +24,7 @@ import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.messages.responses.Bookmarks; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; +import reactor.core.publisher.Mono; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -57,6 +58,13 @@ public CompletionStage processAsync( TestkitState testkitState return CompletableFuture.completedFuture( createResponse( bookmark ) ); } + @Override + public Mono processRx( TestkitState testkitState ) + { + Bookmark bookmark = testkitState.getRxSessionStates().get( data.getSessionId() ).getSession().lastBookmark(); + return Mono.just( createResponse( bookmark ) ); + } + private Bookmarks createResponse( Bookmark bookmark ) { return Bookmarks.builder().data( Bookmarks.BookmarksBody.builder().bookmarks( bookmark ).build() ).build(); diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionReadTransaction.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionReadTransaction.java index 134dfd83bd..284f7a1f56 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionReadTransaction.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionReadTransaction.java @@ -21,11 +21,14 @@ import lombok.Getter; import lombok.Setter; import neo4j.org.testkit.backend.AsyncSessionState; +import neo4j.org.testkit.backend.RxSessionState; import neo4j.org.testkit.backend.SessionState; import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.messages.responses.RetryableDone; import neo4j.org.testkit.backend.messages.responses.RetryableTry; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Mono; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -37,6 +40,7 @@ import org.neo4j.driver.async.AsyncSession; import org.neo4j.driver.async.AsyncTransactionWork; import org.neo4j.driver.exceptions.Neo4jException; +import org.neo4j.driver.reactive.RxTransactionWork; @Setter @Getter @@ -75,6 +79,23 @@ public CompletionStage processAsync( TestkitState testkitState .thenApply( nothing -> retryableDone() ); } + @Override + public Mono processRx( TestkitState testkitState ) + { + RxSessionState sessionState = testkitState.getRxSessionStates().get( data.getSessionId() ); + RxTransactionWork> workWrapper = tx -> + { + String txId = testkitState.addRxTransaction( tx ); + testkitState.getResponseWriter().accept( retryableTry( txId ) ); + CompletableFuture tryResult = new CompletableFuture<>(); + sessionState.setTxWorkFuture( tryResult ); + return Mono.fromCompletionStage( tryResult ); + }; + + return Mono.fromDirect( sessionState.getSession().readTransaction( workWrapper ) ) + .then( Mono.just( retryableDone() ) ); + } + private TransactionWork handle( TestkitState testkitState, SessionState sessionState ) { return tx -> diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionRun.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionRun.java index 5122793911..3928c5b44c 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionRun.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionRun.java @@ -25,6 +25,7 @@ import neo4j.org.testkit.backend.messages.requests.deserializer.TestkitCypherParamDeserializer; import neo4j.org.testkit.backend.messages.responses.Result; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; +import reactor.core.publisher.Mono; import java.time.Duration; import java.util.Map; @@ -35,6 +36,8 @@ import org.neo4j.driver.Session; import org.neo4j.driver.TransactionConfig; import org.neo4j.driver.async.AsyncSession; +import org.neo4j.driver.reactive.RxResult; +import org.neo4j.driver.reactive.RxSession; @Setter @Getter @@ -53,10 +56,10 @@ public TestkitResponse process( TestkitState testkitState ) Optional.ofNullable( data.getTxMeta() ).ifPresent( transactionConfig::withMetadata ); Optional.ofNullable( data.getTimeout() ).ifPresent( to -> transactionConfig.withTimeout( Duration.ofMillis( to ) ) ); org.neo4j.driver.Result result = session.run( query, transactionConfig.build() ); - String newId = testkitState.newId(); - testkitState.getResults().put( newId, result ); + String id = testkitState.newId(); + testkitState.getResults().put( id, result ); - return Result.builder().data( Result.ResultBody.builder().id( newId ).build() ).build(); + return createResponse( id ); } @Override @@ -73,12 +76,38 @@ public CompletionStage processAsync( TestkitState testkitState return session.runAsync( query, transactionConfig.build() ) .thenApply( resultCursor -> { - String newId = testkitState.newId(); - testkitState.getResultCursors().put( newId, resultCursor ); - return Result.builder().data( Result.ResultBody.builder().id( newId ).build() ).build(); + String id = testkitState.newId(); + testkitState.getResultCursors().put( id, resultCursor ); + return createResponse( id ); } ); } + @Override + public Mono processRx( TestkitState testkitState ) + { + RxSession session = testkitState.getRxSessionStates().get( data.getSessionId() ).getSession(); + Query query = Optional.ofNullable( data.params ) + .map( params -> new Query( data.cypher, data.params ) ) + .orElseGet( () -> new Query( data.cypher ) ); + TransactionConfig.Builder transactionConfig = TransactionConfig.builder(); + Optional.ofNullable( data.getTxMeta() ).ifPresent( transactionConfig::withMetadata ); + Optional.ofNullable( data.getTimeout() ).ifPresent( to -> transactionConfig.withTimeout( Duration.ofMillis( to ) ) ); + + RxResult result = session.run( query, transactionConfig.build() ); + String id = testkitState.newId(); + testkitState.getRxResults().put( id, result ); + + // The keys() method causes RUN message exchange. + // However, it does not currently report errors. + return Mono.fromDirect( result.keys() ) + .map( ignored -> createResponse( id ) ); + } + + private Result createResponse( String resultId ) + { + return Result.builder().data( Result.ResultBody.builder().id( resultId ).build() ).build(); + } + @Setter @Getter public static class SessionRunBody diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionWriteTransaction.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionWriteTransaction.java index 4ceeac6a29..fda177afbd 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionWriteTransaction.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionWriteTransaction.java @@ -21,11 +21,14 @@ import lombok.Getter; import lombok.Setter; import neo4j.org.testkit.backend.AsyncSessionState; +import neo4j.org.testkit.backend.RxSessionState; import neo4j.org.testkit.backend.SessionState; import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.messages.responses.RetryableDone; import neo4j.org.testkit.backend.messages.responses.RetryableTry; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Mono; import java.util.Map; import java.util.Optional; @@ -38,6 +41,7 @@ import org.neo4j.driver.async.AsyncSession; import org.neo4j.driver.async.AsyncTransactionWork; import org.neo4j.driver.exceptions.Neo4jException; +import org.neo4j.driver.reactive.RxTransactionWork; @Setter @Getter @@ -77,6 +81,23 @@ public CompletionStage processAsync( TestkitState testkitState .thenApply( nothing -> retryableDone() ); } + @Override + public Mono processRx( TestkitState testkitState ) + { + RxSessionState sessionState = testkitState.getRxSessionStates().get( data.getSessionId() ); + RxTransactionWork> workWrapper = tx -> + { + String txId = testkitState.addRxTransaction( tx ); + testkitState.getResponseWriter().accept( retryableTry( txId ) ); + CompletableFuture tryResult = new CompletableFuture<>(); + sessionState.setTxWorkFuture( tryResult ); + return Mono.fromCompletionStage( tryResult ); + }; + + return Mono.fromDirect( sessionState.getSession().writeTransaction( workWrapper ) ) + .then( Mono.just( retryableDone() ) ); + } + private TransactionWork handle( TestkitState testkitState, SessionState sessionState ) { return tx -> diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java index ed701746c8..2bd9afc67d 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java @@ -24,6 +24,7 @@ import neo4j.org.testkit.backend.messages.responses.RunTest; import neo4j.org.testkit.backend.messages.responses.SkipTest; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; +import reactor.core.publisher.Mono; import java.util.HashMap; import java.util.Map; @@ -35,10 +36,75 @@ public class StartTest implements TestkitRequest { private static final Map ASYNC_SKIP_PATTERN_TO_REASON = new HashMap<>(); + private static final Map REACTIVE_SKIP_PATTERN_TO_REASON = new HashMap<>(); static { - ASYNC_SKIP_PATTERN_TO_REASON.put( "^.*.test_should_reject_server_using_verify_connectivity_bolt_3x0$", "Does not error as expected" ); + ASYNC_SKIP_PATTERN_TO_REASON.put( "^.*\\.test_should_reject_server_using_verify_connectivity_bolt_3x0$", "Does not error as expected" ); + + // V3 tests + String skipMessage = "v3 is not applicable to reactive"; + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestAuthorizationV3\\..*$", skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestBookmarksV3\\..*$", skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.NoRoutingV3\\..*$", skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.RoutingV3\\..*$", skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestProtocolVersions\\.test_should_reject_server_using_verify_connectivity_bolt_3x0$", skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestProtocolVersions\\.test_supports_bolt_3x0", skipMessage ); + + // Current limitations (require further investigation or bug fixing) + skipMessage = "Does not report RUN FAILURE"; + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.Routing[^.]+\\.test_should_write_successfully_on_leader_switch_using_tx_function$", skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestDisconnects\\.test_disconnect_after_hello$", skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestDisconnects\\.test_disconnect_session_on_run$", skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestDisconnects\\.test_disconnect_on_tx_run$", skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestRetry\\..*$", "Unfinished results consumption" ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestRetryClustering\\..*$", "Unfinished results consumption" ); + skipMessage = "Does not support PULL pipelining"; + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.Routing[^.]+\\.test_should_retry_read_tx_and_rediscovery_until_success$", skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.Routing[^.]+\\.test_should_retry_read_tx_until_success_on_error$", skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.Routing[^.]+\\.test_should_retry_write_tx_and_rediscovery_until_success$", skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.Routing[^.]+\\.test_should_retry_write_tx_until_success_on_error$", skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.Routing[^.]+\\.test_should_fail_when_reading_from_unexpectedly_interrupting_reader_using_session_run$", + skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.Routing[^.]+\\.test_should_fail_when_reading_from_unexpectedly_interrupting_reader_using_tx_run$", + skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.Routing[^.]+\\.test_should_fail_when_writing_on_unexpectedly_interrupting_writer_using_session_run$", + skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.Routing[^.]+\\.test_should_fail_when_reading_from_unexpectedly_interrupting_readers_using_tx_function$", + skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.Routing[^.]+\\.test_should_fail_when_writing_to_unexpectedly_interrupting_writers_using_tx_function$", + skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.Routing[^.]+\\.test_should_retry_write_until_success_with_leader_change_using_tx_function$", + skipMessage ); + skipMessage = "Custom fetch size not supported"; + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.NoRouting[^.]+\\.test_should_accept_custom_fetch_size_using_driver_configuration$", skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.NoRouting[^.]+\\.test_should_pull_all_when_fetch_is_minus_one_using_driver_configuration", skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.NoRouting[^.]+\\.test_should_pull_custom_size_and_then_all_using_session_configuration$", skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.NoRouting[^.]+\\.test_should_accept_custom_fetch_size_using_session_configuration$", skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestIterationSessionRun\\..*$", skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestIterationTxRun\\..*$", skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRun\\..*$", skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestTxRun\\.test_rollback_tx_on_session_close_consumed_result$", skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.NoRouting[^.]+\\.test_should_error_on_database_shutdown_using_tx_run$", "Session close throws error" ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.Routing[^.]+\\.test_should_retry_write_until_success_with_leader_shutdown_during_tx_using_tx_function$", + "Commit failure leaks outside function" ); + skipMessage = "Requires investigation"; + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestProtocolVersions\\.test_server_agent", skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestProtocolVersions\\.test_server_version", skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestAuthorizationV4x1\\..*$", skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestAuthorizationV4x3\\..*$", skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestNoRoutingAuthorization\\..*$", skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestOptimizations\\..*$", skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestDirectConnectionRecvTimeout\\..*$", skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestRoutingConnectionRecvTimeout\\..*$", skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.Routing[^.]+\\.test_should_successfully_acquire_rt_when_router_ip_changes$", skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.Routing[^.]+\\.test_should_revert_to_initial_router_if_known_router_throws_protocol_errors$", skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestRoutingConnectionRecvTimeout\\.test_timeout$", skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestRoutingConnectionRecvTimeout\\.test_timeout_unmanaged_tx$", skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestDisconnects\\.test_disconnect_session_on_tx_commit$", skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRunParameters\\..*$", skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestTxRun\\.test_rollback_tx_on_session_close_unfinished_result$", skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestTxRun\\.test_rollback_tx_on_session_close_untouched_result$", skipMessage ); } private StartTestBody data; @@ -67,6 +133,24 @@ public CompletionStage processAsync( TestkitState testkitState return CompletableFuture.completedFuture( testkitResponse ); } + @Override + public Mono processRx( TestkitState testkitState ) + { + TestkitResponse testkitResponse = REACTIVE_SKIP_PATTERN_TO_REASON + .entrySet() + .stream() + .filter( entry -> data.getTestName().matches( entry.getKey() ) ) + .findFirst() + .map( entry -> (TestkitResponse) SkipTest.builder() + .data( SkipTest.SkipTestBody.builder() + .reason( entry.getValue() ) + .build() ) + .build() ) + .orElseGet( () -> RunTest.builder().build() ); + + return Mono.fromCompletionStage( CompletableFuture.completedFuture( testkitResponse ) ); + } + @Setter @Getter public static class StartTestBody diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitCallbackResult.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitCallbackResult.java index b622c21b97..0a9d0ca0ba 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitCallbackResult.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitCallbackResult.java @@ -21,6 +21,7 @@ import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.messages.responses.TestkitCallback; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; +import reactor.core.publisher.Mono; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -45,4 +46,11 @@ default CompletionStage processAsync( TestkitState testkitState testkitState.getCallbackIdToFuture().get( getCallbackId() ).complete( this ); return CompletableFuture.completedFuture( null ); } + + @Override + default Mono processRx( TestkitState testkitState ) + { + testkitState.getCallbackIdToFuture().get( getCallbackId() ).complete( this ); + return Mono.empty(); + } } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitRequest.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitRequest.java index e9348fd94e..96ba563352 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitRequest.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitRequest.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; +import reactor.core.publisher.Mono; import java.util.concurrent.CompletionStage; @@ -46,4 +47,6 @@ public interface TestkitRequest TestkitResponse process( TestkitState testkitState ); CompletionStage processAsync( TestkitState testkitState ); + + Mono processRx( TestkitState testkitState ); } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionClose.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionClose.java index 649d69cbe3..ef2ccddf13 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionClose.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionClose.java @@ -23,6 +23,7 @@ import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; import neo4j.org.testkit.backend.messages.responses.Transaction; +import reactor.core.publisher.Mono; import java.util.concurrent.CompletionStage; @@ -45,6 +46,12 @@ public CompletionStage processAsync( TestkitState testkitState throw new UnsupportedOperationException(); } + @Override + public Mono processRx( TestkitState testkitState ) + { + throw new UnsupportedOperationException( "Operation not supported" ); + } + private Transaction createResponse( String txId ) { return Transaction.builder().data( Transaction.TransactionBody.builder().id( txId ).build() ).build(); diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionCommit.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionCommit.java index 7cbc704157..eb76d4b650 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionCommit.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionCommit.java @@ -23,9 +23,12 @@ import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; import neo4j.org.testkit.backend.messages.responses.Transaction; +import reactor.core.publisher.Mono; import java.util.concurrent.CompletionStage; +import org.neo4j.driver.async.AsyncTransaction; + @Getter @Setter public class TransactionCommit implements TestkitRequest @@ -42,7 +45,16 @@ public TestkitResponse process( TestkitState testkitState ) @Override public CompletionStage processAsync( TestkitState testkitState ) { - return testkitState.getAsyncTransaction( data.getTxId() ).thenCompose( tx -> tx.commitAsync() ).thenApply( ignored -> createResponse( data.getTxId() ) ); + return testkitState.getAsyncTransaction( data.getTxId() ).thenCompose( AsyncTransaction::commitAsync ) + .thenApply( ignored -> createResponse( data.getTxId() ) ); + } + + @Override + public Mono processRx( TestkitState testkitState ) + { + return testkitState.getRxTransaction( data.getTxId() ) + .flatMap( tx -> Mono.fromDirect( tx.commit() ) ) + .then( Mono.just( createResponse( data.getTxId() ) ) ); } private Transaction createResponse( String txId ) diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRollback.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRollback.java index f9be772df5..c695f1d870 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRollback.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRollback.java @@ -23,9 +23,12 @@ import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; import neo4j.org.testkit.backend.messages.responses.Transaction; +import reactor.core.publisher.Mono; import java.util.concurrent.CompletionStage; +import org.neo4j.driver.async.AsyncTransaction; + @Getter @Setter public class TransactionRollback implements TestkitRequest @@ -42,7 +45,16 @@ public TestkitResponse process( TestkitState testkitState ) @Override public CompletionStage processAsync( TestkitState testkitState ) { - return testkitState.getAsyncTransaction( data.getTxId() ).thenCompose( tx -> tx.rollbackAsync() ).thenApply( ignored -> createResponse( data.getTxId() ) ); + return testkitState.getAsyncTransaction( data.getTxId() ).thenCompose( AsyncTransaction::rollbackAsync ) + .thenApply( ignored -> createResponse( data.getTxId() ) ); + } + + @Override + public Mono processRx( TestkitState testkitState ) + { + return testkitState.getRxTransaction( data.getTxId() ) + .flatMap( tx -> Mono.fromDirect( tx.rollback() ) ) + .then( Mono.just( createResponse( data.getTxId() ) ) ); } private Transaction createResponse( String txId ) diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRun.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRun.java index d25bb2cbaa..f61b2d7824 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRun.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRun.java @@ -25,16 +25,19 @@ import neo4j.org.testkit.backend.messages.requests.deserializer.TestkitCypherParamDeserializer; import neo4j.org.testkit.backend.messages.responses.Result; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; +import reactor.core.publisher.Mono; import java.util.Collections; import java.util.Map; import java.util.concurrent.CompletionStage; +import org.neo4j.driver.reactive.RxResult; + @Setter @Getter public class TransactionRun implements TestkitRequest { - private TransactionRunBody data; + protected TransactionRunBody data; @Override public TestkitResponse process( TestkitState testkitState ) @@ -50,16 +53,32 @@ public TestkitResponse process( TestkitState testkitState ) public CompletionStage processAsync( TestkitState testkitState ) { return testkitState.getAsyncTransaction( data.getTxId() ) - .thenCompose( tx -> tx.runAsync( data.getCypher(), data.getParams() != null ? data.getParams() : Collections.emptyMap() )) - .thenApply( resultCursor -> - { - String resultId = testkitState.newId(); - testkitState.getResultCursors().put( resultId, resultCursor ); - return createResponse( resultId ); - } ) ; + .thenCompose( tx -> tx.runAsync( data.getCypher(), data.getParams() != null ? data.getParams() : Collections.emptyMap() ) ) + .thenApply( resultCursor -> + { + String resultId = testkitState.newId(); + testkitState.getResultCursors().put( resultId, resultCursor ); + return createResponse( resultId ); + } ); + } + + @Override + public Mono processRx( TestkitState testkitState ) + { + String resultId = testkitState.newId(); + return testkitState.getRxTransaction( data.getTxId() ) + .flatMap( tx -> + { + RxResult result = tx.run( data.getCypher(), data.getParams() != null ? data.getParams() : Collections.emptyMap() ); + testkitState.getRxResults().put( resultId, result ); + // The keys() method causes RUN message exchange. + // However, it does not currently report errors. + return Mono.fromDirect( result.keys() ); + } ) + .map( ignored -> createResponse( resultId ) ); } - private Result createResponse( String resultId ) + protected Result createResponse( String resultId ) { return Result.builder().data( Result.ResultBody.builder().id( resultId ).build() ).build(); } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/VerifyConnectivity.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/VerifyConnectivity.java index 3cc2ba28ba..7a03d43ae8 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/VerifyConnectivity.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/VerifyConnectivity.java @@ -23,6 +23,7 @@ import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.messages.responses.Driver; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; +import reactor.core.publisher.Mono; import java.util.concurrent.CompletionStage; @@ -49,6 +50,12 @@ public CompletionStage processAsync( TestkitState testkitState .thenApply( ignored -> createResponse( id ) ); } + @Override + public Mono processRx( TestkitState testkitState ) + { + return Mono.fromCompletionStage( processAsync( testkitState ) ); + } + private Driver createResponse( String id ) { return Driver.builder().data( Driver.DriverBody.builder().id( id ).build() ).build(); diff --git a/testkit-tests/pom.xml b/testkit-tests/pom.xml index 83c5386cfc..3e421ea3dd 100644 --- a/testkit-tests/pom.xml +++ b/testkit-tests/pom.xml @@ -24,6 +24,9 @@ --tests TESTKIT_TESTS INTEGRATION_TESTS STUB_TESTS STRESS_TESTS TLS_TESTS 7200000 %a-async + %a-rx + + 0.36.1 true @@ -73,6 +76,7 @@ java ${rootDir} true + ${testkit.debug.reqres} @@ -137,6 +141,34 @@ + + + run-testkit-rx + integration-test + + + start + + + + + tklnchr + + ${testkit.rx.name.pattern} + + ${project.build.directory}/testkit-rx + reactive + + --configs 4.2-cluster,4.0-community,4.1-enterprise ${testkit.args} + + + ${testkit.rx.name.pattern}> + + + + + + remove-testkit-launcher post-integration-test