diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/AsyncBackendServer.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/AsyncBackendServer.java deleted file mode 100644 index 495624e5a0..0000000000 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/AsyncBackendServer.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright (c) "Neo4j" - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package neo4j.org.testkit.backend; - -import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioServerSocketChannel; -import neo4j.org.testkit.backend.channel.handler.TestkitMessageInboundHandler; -import neo4j.org.testkit.backend.channel.handler.TestkitMessageOutboundHandler; -import neo4j.org.testkit.backend.channel.handler.TestkitRequestProcessorHandler; -import neo4j.org.testkit.backend.channel.handler.TestkitRequestResponseMapperHandler; - -public class AsyncBackendServer -{ - public void run() throws InterruptedException - { - EventLoopGroup group = new NioEventLoopGroup(); - try - { - ServerBootstrap bootstrap = new ServerBootstrap(); - bootstrap.group( group ) - .channel( NioServerSocketChannel.class ) - .localAddress( 9876 ) - .childHandler( new ChannelInitializer() - { - @Override - protected void initChannel( SocketChannel channel ) - { - channel.pipeline().addLast( new TestkitMessageInboundHandler() ); - channel.pipeline().addLast( new TestkitMessageOutboundHandler() ); - channel.pipeline().addLast( new TestkitRequestResponseMapperHandler() ); - channel.pipeline().addLast( new TestkitRequestProcessorHandler() ); - } - } ); - ChannelFuture server = bootstrap.bind().sync(); - server.channel().closeFuture().sync(); - } - finally - { - group.shutdownGracefully().sync(); - } - } -} diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/BackendServer.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/BackendServer.java deleted file mode 100644 index c095517a8e..0000000000 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/BackendServer.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright (c) "Neo4j" - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package neo4j.org.testkit.backend; - -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStreamWriter; -import java.io.UncheckedIOException; -import java.net.ServerSocket; -import java.net.Socket; -import java.util.concurrent.CompletableFuture; - -public class BackendServer -{ - public void run() throws IOException - { - ServerSocket serverSocket = new ServerSocket( 9876 ); - - System.out.println( "Java TestKit Backend Started on port: " + serverSocket.getLocalPort() ); - - while ( true ) - { - final Socket clientSocket = serverSocket.accept(); - CompletableFuture.runAsync( () -> handleClient( clientSocket ) ); - } - } - - private void handleClient( Socket clientSocket ) - { - try - { - System.out.println( "Handling connection from: " + clientSocket.getRemoteSocketAddress() ); - BufferedReader in = new BufferedReader( new InputStreamReader( clientSocket.getInputStream() ) ); - BufferedWriter out = new BufferedWriter( new OutputStreamWriter( clientSocket.getOutputStream() ) ); - CommandProcessor commandProcessor = new DefaultCommandProcessor( in, out ); - - boolean cont = true; - while ( cont ) - { - try - { - cont = commandProcessor.process(); - } - catch ( Exception e ) - { - e.printStackTrace(); - clientSocket.close(); - cont = false; - } - } - } - catch ( IOException ex ) - { - throw new UncheckedIOException( ex ); - } - } -} diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/CommandProcessor.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/CommandProcessor.java deleted file mode 100644 index 2669ec718d..0000000000 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/CommandProcessor.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright (c) "Neo4j" - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package neo4j.org.testkit.backend; - -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.InjectableValues; -import com.fasterxml.jackson.databind.ObjectMapper; -import neo4j.org.testkit.backend.messages.TestkitModule; - -import java.util.Collections; - -public interface CommandProcessor -{ - /** - * Used in ObjectMapper's injectable values. - */ - String COMMAND_PROCESSOR_ID = "commandProcessor"; - - /** - * Reads one request and writes the response. Returns false when not able to read anymore. - * - * @return False when there's nothing to read anymore. - */ - boolean process(); - - /** - * Create a new {@link ObjectMapper} configured with the appropriate testkit module and an injectable {@link CommandProcessor}. - * @param processor The processor supposed to be injectable - * @return A reusable object mapper instance - */ - static ObjectMapper newObjectMapperFor(CommandProcessor processor) { - - final ObjectMapper objectMapper = new ObjectMapper(); - - TestkitModule testkitModule = new TestkitModule(); - objectMapper.registerModule( testkitModule ); - objectMapper.disable( DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES ); - - InjectableValues injectableValues = new InjectableValues.Std( Collections.singletonMap( COMMAND_PROCESSOR_ID, processor ) ); - objectMapper.setInjectableValues( injectableValues ); - return objectMapper; - } -} diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/DefaultCommandProcessor.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/DefaultCommandProcessor.java deleted file mode 100644 index 292f1c2fbc..0000000000 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/DefaultCommandProcessor.java +++ /dev/null @@ -1,235 +0,0 @@ -/* - * Copyright (c) "Neo4j" - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package neo4j.org.testkit.backend; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import neo4j.org.testkit.backend.messages.requests.TestkitRequest; -import neo4j.org.testkit.backend.messages.responses.BackendError; -import neo4j.org.testkit.backend.messages.responses.DriverError; -import neo4j.org.testkit.backend.messages.responses.TestkitResponse; - -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.IOException; -import java.io.UncheckedIOException; - -import org.neo4j.driver.exceptions.Neo4jException; -import org.neo4j.driver.exceptions.UntrustedServerException; -import org.neo4j.driver.internal.async.pool.ConnectionPoolImpl; - -class DefaultCommandProcessor implements CommandProcessor -{ - private final TestkitState testkitState; - - private final ObjectMapper objectMapper; - - private final BufferedReader in; - private final BufferedWriter out; - - DefaultCommandProcessor( BufferedReader in, BufferedWriter out ) - { - this.in = in; - this.out = out; - this.objectMapper = CommandProcessor.newObjectMapperFor( this ); - this.testkitState = new TestkitState( this::writeResponse ); - } - - private String readLine() - { - try - { - return this.in.readLine(); - } - catch ( IOException e ) - { - throw new UncheckedIOException( e ); - } - } - - private void write( String s ) - { - try - { - this.out.write( s ); - } - catch ( IOException e ) - { - throw new UncheckedIOException( e ); - } - } - - // Logs to frontend - private void log( String s ) - { - try - { - this.out.write( s + "\n" ); - this.out.flush(); - } - catch ( IOException e ) - { - } - System.out.println( s ); - } - - private void flush() - { - try - { - this.out.flush(); - } - catch ( IOException e ) - { - throw new UncheckedIOException( e ); - } - } - - @Override - public boolean process() - { - boolean inRequest = false; - StringBuilder request = new StringBuilder(); - - log( "Waiting for request" ); - - while ( true ) - { - String currentLine = readLine(); - // End of stream - if ( currentLine == null ) - { - return false; - } - - if ( currentLine.equals( "#request begin" ) ) - { - inRequest = true; - } - else if ( currentLine.equals( "#request end" ) ) - { - if ( !inRequest ) - { - throw new RuntimeException( "Request end not expected" ); - } - try - { - processRequest( request.toString() ); - } - catch ( Exception e ) - { - if ( e instanceof Neo4jException ) - { - // Error to track - String id = testkitState.newId(); - testkitState.getErrors().put( id, (Neo4jException) e ); - writeResponse( driverError( id, (Neo4jException) e ) ); - System.out.println( "Neo4jException: " + e ); - } - else if ( isConnectionPoolClosedException( e ) || e instanceof UntrustedServerException ) - { - String id = testkitState.newId(); - DriverError driverError = DriverError.builder() - .data( - DriverError.DriverErrorBody.builder() - .id( id ) - .errorType( e.getClass().getName() ) - .msg( e.getMessage() ) - .build() - ) - .build(); - writeResponse( driverError ); - } - else - { - // Unknown error, interpret this as a backend error. - // Report to frontend and rethrow, note that if socket been - // closed the writing will throw itself... - writeResponse( BackendError.builder().data( BackendError.BackendErrorBody.builder().msg( e.toString() ).build() ).build() ); - // This won't print if there was an IO exception since line above will rethrow - e.printStackTrace(); - throw e; - } - } - return true; - } - else - { - if ( !inRequest ) - { - throw new RuntimeException( "Command Received whilst not in request" ); - } - request.append( currentLine ); - } - } - } - - private DriverError driverError( String id, Neo4jException e ) - { - return DriverError.builder().data( - DriverError.DriverErrorBody.builder() - .id( id ) - .errorType( e.getClass().getName() ) - .code( e.code() ) - .msg( e.getMessage() ) - .build() ) - .build(); - } - - public void processRequest( String request ) - { - System.out.println( "request = " + request + ", in = " + in + ", out = " + out ); - try - { - TestkitRequest testkitMessage = objectMapper.readValue( request, TestkitRequest.class ); - TestkitResponse response = testkitMessage.process( testkitState ); - if ( response != null ) - { - writeResponse( response ); - } - } - catch ( IOException e ) - { - throw new UncheckedIOException( e ); - } - } - - private void writeResponse( TestkitResponse response ) - { - try - { - String responseStr = objectMapper.writeValueAsString( response ); - System.out.println("response = " + responseStr + ", in = " + in + ", out = " + out); - write( "#response begin\n" ); - write( responseStr + "\n" ); - write( "#response end\n" ); - flush(); - } - catch ( JsonProcessingException ex ) - { - throw new RuntimeException( "Error writing response", ex ); - } - } - - private boolean isConnectionPoolClosedException( Exception e ) - { - return e instanceof IllegalStateException && e.getMessage() != null && - e.getMessage().equals( ConnectionPoolImpl.CONNECTION_POOL_CLOSED_ERROR_MESSAGE ); - } -} diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/Runner.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/Runner.java index 2c60fb8cd2..a35986683a 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/Runner.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/Runner.java @@ -18,19 +18,47 @@ */ package neo4j.org.testkit.backend; -import java.io.IOException; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import neo4j.org.testkit.backend.channel.handler.TestkitMessageInboundHandler; +import neo4j.org.testkit.backend.channel.handler.TestkitMessageOutboundHandler; +import neo4j.org.testkit.backend.channel.handler.TestkitRequestProcessorHandler; +import neo4j.org.testkit.backend.channel.handler.TestkitRequestResponseMapperHandler; public class Runner { - public static void main( String[] args ) throws IOException, InterruptedException + public static void main( String[] args ) throws InterruptedException { - if ( args.length > 0 && args[0].equals( "async" ) ) + boolean asyncMode = args.length > 0 && args[0].equals( "async" ); + EventLoopGroup group = new NioEventLoopGroup(); + try { - new AsyncBackendServer().run(); + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group( group ) + .channel( NioServerSocketChannel.class ) + .localAddress( 9876 ) + .childHandler( new ChannelInitializer() + { + @Override + protected void initChannel( SocketChannel channel ) + { + channel.pipeline().addLast( new TestkitMessageInboundHandler() ); + channel.pipeline().addLast( new TestkitMessageOutboundHandler() ); + channel.pipeline().addLast( new TestkitRequestResponseMapperHandler() ); + channel.pipeline().addLast( new TestkitRequestProcessorHandler( asyncMode ) ); + } + } ); + ChannelFuture server = bootstrap.bind().sync(); + server.channel().closeFuture().sync(); } - else + finally { - new BackendServer().run(); + group.shutdownGracefully().sync(); } } } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/SessionState.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/SessionState.java index d0bfe16733..9fc142b251 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/SessionState.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/SessionState.java @@ -21,6 +21,8 @@ import lombok.Getter; import lombok.Setter; +import java.util.concurrent.CompletableFuture; + import org.neo4j.driver.Session; @Getter @@ -28,12 +30,9 @@ public class SessionState { public Session session; - public int retryableState; - public String retryableErrorId; + public CompletableFuture txWorkFuture; public SessionState(Session session) { this.session = session; - this.retryableState = 0; - this.retryableErrorId = ""; } } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/TestkitState.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/TestkitState.java index 0b2184f553..535b177e2e 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/TestkitState.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/TestkitState.java @@ -40,13 +40,17 @@ @Getter public class TestkitState { + private static final String TRANSACTION_NOT_FOUND_MESSAGE = "Could not find transaction"; + private final Map drivers = new HashMap<>(); private final Map routingTableRegistry = new HashMap<>(); private final Map sessionStates = new HashMap<>(); private final Map asyncSessionStates = new HashMap<>(); private final Map results = new HashMap<>(); private final Map resultCursors = new HashMap<>(); + @Getter( AccessLevel.NONE ) private final Map transactions = new HashMap<>(); + @Getter( AccessLevel.NONE ) private final Map asyncTransactions = new HashMap<>(); private final Map errors = new HashMap<>(); @Getter( AccessLevel.NONE ) @@ -63,4 +67,38 @@ public String newId() { return String.valueOf( idGenerator.getAndIncrement() ); } + + public String addTransaction( Transaction transaction ) + { + String id = newId(); + this.transactions.put( id, transaction ); + return id; + } + + public Transaction getTransaction( String id ) + { + if ( !this.transactions.containsKey( id ) ) + { + throw new RuntimeException( TRANSACTION_NOT_FOUND_MESSAGE ); + } + return this.transactions.get( id ); + } + + public String addAsyncTransaction( AsyncTransaction transaction ) + { + String id = newId(); + this.asyncTransactions.put( id, transaction ); + return id; + } + + public CompletableFuture getAsyncTransaction( String id ) + { + if ( !this.asyncTransactions.containsKey( id ) ) + { + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally( new RuntimeException( TRANSACTION_NOT_FOUND_MESSAGE ) ); + return future; + } + return CompletableFuture.completedFuture( asyncTransactions.get( id ) ); + } } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestProcessorHandler.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestProcessorHandler.java index 99027e36e3..5060bf5428 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestProcessorHandler.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestProcessorHandler.java @@ -29,6 +29,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; +import java.util.function.BiFunction; import org.neo4j.driver.exceptions.Neo4jException; import org.neo4j.driver.exceptions.UntrustedServerException; @@ -37,8 +39,18 @@ public class TestkitRequestProcessorHandler extends ChannelInboundHandlerAdapter { private final TestkitState testkitState = new TestkitState( this::writeAndFlush ); + private final BiFunction> processorImpl; private Channel channel; + public TestkitRequestProcessorHandler( boolean asyncMode ) + { + if (asyncMode) { + processorImpl = (request, state) -> request.processAsync( state ); + } else { + processorImpl = TestkitRequestProcessorHandler::wrapSyncRequest; + } + } + @Override public void channelRegistered( ChannelHandlerContext ctx ) throws Exception { @@ -49,26 +61,35 @@ public void channelRegistered( ChannelHandlerContext ctx ) throws Exception @Override public void channelRead( ChannelHandlerContext ctx, Object msg ) { - TestkitRequest testkitRequest = (TestkitRequest) msg; // Processing is done in a separate thread to avoid blocking EventLoop because some testing logic, like resolvers support, is blocking. - CompletableFuture.runAsync( - () -> + CompletableFuture.supplyAsync( () -> (TestkitRequest) msg ) + .thenCompose( request -> processorImpl.apply( request, testkitState ) ) + .thenApply( response -> { - try + if ( response != null ) { - testkitRequest.processAsync( testkitState ) - .thenAccept( responseOpt -> responseOpt.ifPresent( ctx::writeAndFlush ) ) - .exceptionally( throwable -> - { - ctx.writeAndFlush( createErrorResponse( throwable ) ); - return null; - } ); + ctx.writeAndFlush( response ); } - catch ( Throwable throwable ) - { - ctx.writeAndFlush( createErrorResponse( throwable ) ); - } - } ); + return null; + } ).exceptionally( throwable -> + { + ctx.writeAndFlush( createErrorResponse( throwable ) ); + return null; + } ); + } + + private static CompletionStage wrapSyncRequest( TestkitRequest testkitRequest, TestkitState testkitState ) + { + CompletableFuture result = new CompletableFuture<>(); + try + { + result.complete( testkitRequest.process( testkitState ) ); + } + catch ( Throwable t ) + { + result.completeExceptionally( t ); + } + return result; } private TestkitResponse createErrorResponse( Throwable throwable ) diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestResponseMapperHandler.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestResponseMapperHandler.java index 9556e4b5a6..d599257a76 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestResponseMapperHandler.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestResponseMapperHandler.java @@ -19,17 +19,18 @@ package neo4j.org.testkit.backend.channel.handler; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; -import neo4j.org.testkit.backend.CommandProcessor; +import neo4j.org.testkit.backend.messages.TestkitModule; import neo4j.org.testkit.backend.messages.requests.TestkitRequest; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; public class TestkitRequestResponseMapperHandler extends ChannelDuplexHandler { - private final ObjectMapper objectMapper = CommandProcessor.newObjectMapperFor( () -> true ); + private final ObjectMapper objectMapper = newObjectMapper(); @Override public void channelRead( ChannelHandlerContext ctx, Object msg ) @@ -54,4 +55,13 @@ public void write( ChannelHandlerContext ctx, Object msg, ChannelPromise promise String responseStr = objectMapper.writeValueAsString( testkitResponse ); ctx.writeAndFlush( responseStr, promise ); } + + public static ObjectMapper newObjectMapper() + { + ObjectMapper objectMapper = new ObjectMapper(); + TestkitModule testkitModule = new TestkitModule(); + objectMapper.registerModule( testkitModule ); + objectMapper.disable( DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES ); + return objectMapper; + } } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/CheckMultiDBSupport.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/CheckMultiDBSupport.java index 82557ee62b..7ecfecf886 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/CheckMultiDBSupport.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/CheckMultiDBSupport.java @@ -24,7 +24,6 @@ import neo4j.org.testkit.backend.messages.responses.MultiDBSupport; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; -import java.util.Optional; import java.util.concurrent.CompletionStage; @Setter @@ -42,12 +41,11 @@ public TestkitResponse process( TestkitState testkitState ) } @Override - public CompletionStage> processAsync( TestkitState testkitState ) + public CompletionStage processAsync( TestkitState testkitState ) { return testkitState.getDrivers().get( data.getDriverId() ) .supportsMultiDbAsync() - .thenApply( this::createResponse ) - .thenApply( Optional::of ); + .thenApply( this::createResponse ); } private MultiDBSupport createResponse( boolean available ) diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/DriverClose.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/DriverClose.java index a9aadc4b8a..1017e159ec 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/DriverClose.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/DriverClose.java @@ -24,7 +24,6 @@ import neo4j.org.testkit.backend.messages.responses.Driver; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; -import java.util.Optional; import java.util.concurrent.CompletionStage; @Setter @@ -41,12 +40,11 @@ public TestkitResponse process( TestkitState testkitState ) } @Override - public CompletionStage> processAsync( TestkitState testkitState ) + public CompletionStage processAsync( TestkitState testkitState ) { return testkitState.getDrivers().get( data.getDriverId() ) .closeAsync() - .thenApply( ignored -> createResponse() ) - .thenApply( Optional::of ); + .thenApply( ignored -> createResponse() ); } private Driver createResponse() diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java index d7eb69b8a9..7389f69d8d 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java @@ -27,7 +27,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashSet; -import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -58,9 +57,9 @@ public TestkitResponse process( TestkitState testkitState ) } @Override - public CompletionStage> processAsync( TestkitState testkitState ) + public CompletionStage processAsync( TestkitState testkitState ) { - return CompletableFuture.completedFuture( Optional.of( createResponse( COMMON_FEATURES ) ) ); + return CompletableFuture.completedFuture( createResponse( COMMON_FEATURES ) ); } private FeatureList createResponse( Set features ) diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java index bdcaf5eda1..2448c1a45d 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java @@ -26,7 +26,6 @@ import java.util.Arrays; import java.util.List; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.function.Function; @@ -77,9 +76,9 @@ public TestkitResponse process( TestkitState testkitState ) } @Override - public CompletionStage> processAsync( TestkitState testkitState ) + public CompletionStage processAsync( TestkitState testkitState ) { - return CompletableFuture.completedFuture( Optional.of( process( testkitState ) ) ); + return CompletableFuture.completedFuture( process( testkitState ) ) ; } @Setter diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java index 1937ddfaf0..eaddb1f147 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java @@ -18,11 +18,9 @@ */ package neo4j.org.testkit.backend.messages.requests; -import com.fasterxml.jackson.annotation.JacksonInject; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.Setter; -import neo4j.org.testkit.backend.CommandProcessor; import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.messages.responses.BackendError; import neo4j.org.testkit.backend.messages.responses.DomainNameResolutionRequired; @@ -60,13 +58,6 @@ @Getter public class NewDriver implements TestkitRequest { - private final CommandProcessor commandProcessor; - - public NewDriver( @JacksonInject( CommandProcessor.COMMAND_PROCESSOR_ID ) CommandProcessor commandProcessor ) - { - this.commandProcessor = commandProcessor; - } - private NewDriverBody data; @Override @@ -100,7 +91,7 @@ public TestkitResponse process( TestkitState testkitState ) DomainNameResolver domainNameResolver = DefaultDomainNameResolver.getInstance(); if ( data.isDomainNameResolverRegistered() ) { - domainNameResolver = callbackDomainNameResolver( commandProcessor, testkitState ); + domainNameResolver = callbackDomainNameResolver( testkitState ); } Optional.ofNullable( data.userAgent ).ifPresent( configBuilder::withUserAgent ); Optional.ofNullable( data.connectionTimeoutMs ).ifPresent( timeout -> configBuilder.withConnectionTimeout( timeout, TimeUnit.MILLISECONDS ) ); @@ -122,9 +113,9 @@ public TestkitResponse process( TestkitState testkitState ) } @Override - public CompletionStage> processAsync( TestkitState testkitState ) + public CompletionStage processAsync( TestkitState testkitState ) { - return CompletableFuture.completedFuture( Optional.of( process( testkitState ) ) ); + return CompletableFuture.completedFuture( process( testkitState ) ); } private ServerAddressResolver callbackResolver( TestkitState testkitState ) @@ -141,7 +132,7 @@ private ServerAddressResolver callbackResolver( TestkitState testkitState ) ResolverResolutionRequired.builder() .data( body ) .build(); - CompletionStage c = dispatchTestkitCallback( commandProcessor, testkitState, response ); + CompletionStage c = dispatchTestkitCallback( testkitState, response ); ResolverResolutionCompleted resolutionCompleted; try { @@ -158,7 +149,7 @@ private ServerAddressResolver callbackResolver( TestkitState testkitState ) }; } - private DomainNameResolver callbackDomainNameResolver( CommandProcessor commandProcessor, TestkitState testkitState ) + private DomainNameResolver callbackDomainNameResolver( TestkitState testkitState ) { return address -> { @@ -173,7 +164,7 @@ private DomainNameResolver callbackDomainNameResolver( CommandProcessor commandP .data( body ) .build(); - CompletionStage callbackStage = dispatchTestkitCallback( commandProcessor, testkitState, callback ); + CompletionStage callbackStage = dispatchTestkitCallback( testkitState, callback ); DomainNameResolutionCompleted resolutionCompleted; try { @@ -202,14 +193,11 @@ private DomainNameResolver callbackDomainNameResolver( CommandProcessor commandP }; } - private CompletionStage dispatchTestkitCallback( CommandProcessor commandProcessor, TestkitState testkitState, - TestkitCallback response ) + private CompletionStage dispatchTestkitCallback( TestkitState testkitState, TestkitCallback response ) { CompletableFuture future = new CompletableFuture<>(); testkitState.getCallbackIdToFuture().put( response.getCallbackId(), future ); testkitState.getResponseWriter().accept( response ); - // This is required for sync backend, but should be removed during migration to Netty implementation. - commandProcessor.process(); return future; } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewSession.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewSession.java index 39d98a1bc2..889d18377c 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewSession.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewSession.java @@ -52,10 +52,9 @@ public TestkitResponse process( TestkitState testkitState ) } @Override - public CompletionStage> processAsync( TestkitState testkitState ) + public CompletionStage processAsync( TestkitState testkitState ) { - return CompletableFuture.completedFuture( - Optional.of( createSessionStateAndResponse( testkitState, this::createAsyncSessionState, testkitState.getAsyncSessionStates() ) ) ); + return CompletableFuture.completedFuture( createSessionStateAndResponse( testkitState, this::createAsyncSessionState, testkitState.getAsyncSessionStates() ) ); } private TestkitResponse createSessionStateAndResponse( TestkitState testkitState, BiFunction sessionStateProducer, diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultConsume.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultConsume.java index 1cc74f6d57..11778a077b 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultConsume.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultConsume.java @@ -25,7 +25,6 @@ import neo4j.org.testkit.backend.messages.responses.Summary; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; -import java.util.Optional; import java.util.concurrent.CompletionStage; import org.neo4j.driver.Result; @@ -52,12 +51,11 @@ public TestkitResponse process( TestkitState testkitState ) } @Override - public CompletionStage> processAsync( TestkitState testkitState ) + public CompletionStage processAsync( TestkitState testkitState ) { return testkitState.getResultCursors().get( data.getResultId() ) .consumeAsync() - .thenApply( this::createResponse ) - .thenApply( Optional::of ); + .thenApply( this::createResponse ); } private Summary createResponse( org.neo4j.driver.summary.ResultSummary summary ) diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultList.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultList.java index b5a50730a2..07a3e62e2e 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultList.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultList.java @@ -26,7 +26,6 @@ import neo4j.org.testkit.backend.messages.responses.TestkitResponse; import java.util.List; -import java.util.Optional; import java.util.concurrent.CompletionStage; import java.util.stream.Collectors; @@ -43,12 +42,11 @@ public TestkitResponse process( TestkitState testkitState ) } @Override - public CompletionStage> processAsync( TestkitState testkitState ) + public CompletionStage processAsync( TestkitState testkitState ) { return testkitState.getResultCursors().get( data.getResultId() ) .listAsync() - .thenApply( this::createResponse ) - .thenApply( Optional::of ); + .thenApply( this::createResponse ); } private RecordList createResponse( List records ) diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultNext.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultNext.java index 2d8026989e..d79e422f53 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultNext.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultNext.java @@ -25,7 +25,6 @@ import neo4j.org.testkit.backend.messages.responses.Record; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; -import java.util.Optional; import java.util.concurrent.CompletionStage; import org.neo4j.driver.Result; @@ -52,12 +51,11 @@ public TestkitResponse process( TestkitState testkitState ) } @Override - public CompletionStage> processAsync( TestkitState testkitState ) + public CompletionStage processAsync( TestkitState testkitState ) { return testkitState.getResultCursors().get( data.getResultId() ) .nextAsync() - .thenApply( record -> record != null ? createResponse( record ) : NullRecord.builder().build() ) - .thenApply( Optional::of ); + .thenApply( record -> record != null ? createResponse( record ) : NullRecord.builder().build() ); } private Record createResponse( org.neo4j.driver.Record record ) diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryableNegative.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryableNegative.java index ef75acc530..bde0bff0ac 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryableNegative.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryableNegative.java @@ -25,7 +25,6 @@ import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -43,13 +42,21 @@ public TestkitResponse process( TestkitState testkitState ) { throw new RuntimeException( "Could not find session" ); } - sessionState.setRetryableState( -1 ); - sessionState.setRetryableErrorId( data.errorId ); + Throwable throwable; + if ( !"".equals( data.getErrorId() ) ) + { + throwable = testkitState.getErrors().get( data.getErrorId() ); + } + else + { + throwable = new RuntimeException( "Error from client in retryable tx" ); + } + sessionState.getTxWorkFuture().completeExceptionally( throwable ); return null; } @Override - public CompletionStage> processAsync( TestkitState testkitState ) + public CompletionStage processAsync( TestkitState testkitState ) { AsyncSessionState sessionState = testkitState.getAsyncSessionStates().get( data.getSessionId() ); Throwable throwable; @@ -62,7 +69,7 @@ public CompletionStage> processAsync( TestkitState tes throwable = new RuntimeException( "Error from client in retryable tx" ); } sessionState.getTxWorkFuture().completeExceptionally( throwable ); - return CompletableFuture.completedFuture( Optional.empty() ); + return CompletableFuture.completedFuture( null ); } @Setter diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryablePositive.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryablePositive.java index af04a8f290..2f0676ea91 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryablePositive.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryablePositive.java @@ -24,7 +24,6 @@ import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -42,15 +41,15 @@ public TestkitResponse process( TestkitState testkitState ) { throw new RuntimeException( "Could not find session" ); } - sessionState.setRetryableState( 1 ); + sessionState.getTxWorkFuture().complete( null ); return null; } @Override - public CompletionStage> processAsync( TestkitState testkitState ) + public CompletionStage processAsync( TestkitState testkitState ) { testkitState.getAsyncSessionStates().get( data.getSessionId() ).getTxWorkFuture().complete( null ); - return CompletableFuture.completedFuture( Optional.empty() ); + return CompletableFuture.completedFuture( null ); } @Setter diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionBeginTransaction.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionBeginTransaction.java index 064a9980b7..c9a75aaa5e 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionBeginTransaction.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionBeginTransaction.java @@ -29,6 +29,7 @@ import java.time.Duration; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import org.neo4j.driver.TransactionConfig; @@ -55,16 +56,13 @@ public TestkitResponse process( TestkitState testkitState ) builder.withTimeout( Duration.ofMillis( data.getTimeout() ) ); } - String txId = testkitState.newId(); - org.neo4j.driver.Transaction tx = session.beginTransaction( builder.build() ); - testkitState.getTransactions().put( txId, tx ); - return transaction( txId ); + return transaction( testkitState.addTransaction( session.beginTransaction( builder.build() ) ) ); } ) .orElseThrow( () -> new RuntimeException( "Could not find session" ) ); } @Override - public CompletionStage> processAsync( TestkitState testkitState ) + public CompletionStage processAsync( TestkitState testkitState ) { AsyncSessionState sessionState = testkitState.getAsyncSessionStates().get( data.getSessionId() ); if ( sessionState != null ) @@ -78,18 +76,13 @@ public CompletionStage> processAsync( TestkitState tes builder.withTimeout( Duration.ofMillis( data.getTimeout() ) ); } - String txId = testkitState.newId(); - return session.beginTransactionAsync( builder.build() ) - .thenApply( tx -> - { - testkitState.getAsyncTransactions().put( txId, tx ); - return transaction( txId ); - } ) - .thenApply( Optional::of ); + return session.beginTransactionAsync( builder.build() ).thenApply( tx -> transaction( testkitState.addAsyncTransaction( tx ) ) ); } else { - return null; + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally( new RuntimeException( "Could not find session" ) ); + return future; } } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionClose.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionClose.java index 433315453d..d942a2cff5 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionClose.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionClose.java @@ -24,7 +24,6 @@ import neo4j.org.testkit.backend.messages.responses.Session; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; -import java.util.Optional; import java.util.concurrent.CompletionStage; @Setter @@ -41,12 +40,11 @@ public TestkitResponse process( TestkitState testkitState ) } @Override - public CompletionStage> processAsync( TestkitState testkitState ) + public CompletionStage processAsync( TestkitState testkitState ) { return testkitState.getAsyncSessionStates().get( data.getSessionId() ).getSession() .closeAsync() - .thenApply( ignored -> createResponse() ) - .thenApply( Optional::of ); + .thenApply( ignored -> createResponse() ); } private Session createResponse() diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionLastBookmarks.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionLastBookmarks.java index 14db5bf067..1016a3ca41 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionLastBookmarks.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionLastBookmarks.java @@ -51,10 +51,10 @@ public TestkitResponse process( TestkitState testkitState ) } @Override - public CompletionStage> processAsync( TestkitState testkitState ) + public CompletionStage processAsync( TestkitState testkitState ) { Bookmark bookmark = testkitState.getAsyncSessionStates().get( data.getSessionId() ).getSession().lastBookmark(); - return CompletableFuture.completedFuture( Optional.of( createResponse( bookmark ) ) ); + return CompletableFuture.completedFuture( createResponse( bookmark ) ); } private Bookmarks createResponse( Bookmark bookmark ) diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionReadTransaction.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionReadTransaction.java index d7ff9d0911..134dfd83bd 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionReadTransaction.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionReadTransaction.java @@ -18,11 +18,9 @@ */ package neo4j.org.testkit.backend.messages.requests; -import com.fasterxml.jackson.annotation.JacksonInject; import lombok.Getter; import lombok.Setter; import neo4j.org.testkit.backend.AsyncSessionState; -import neo4j.org.testkit.backend.CommandProcessor; import neo4j.org.testkit.backend.SessionState; import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.messages.responses.RetryableDone; @@ -32,25 +30,20 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; import org.neo4j.driver.Session; import org.neo4j.driver.TransactionWork; import org.neo4j.driver.async.AsyncSession; import org.neo4j.driver.async.AsyncTransactionWork; +import org.neo4j.driver.exceptions.Neo4jException; @Setter @Getter public class SessionReadTransaction implements TestkitRequest { - private final CommandProcessor commandProcessor; - private SessionReadTransactionBody data; - public SessionReadTransaction( @JacksonInject( CommandProcessor.COMMAND_PROCESSOR_ID ) CommandProcessor commandProcessor ) - { - this.commandProcessor = commandProcessor; - } - @Override public TestkitResponse process( TestkitState testkitState ) { @@ -64,15 +57,14 @@ public TestkitResponse process( TestkitState testkitState ) } @Override - public CompletionStage> processAsync( TestkitState testkitState ) + public CompletionStage processAsync( TestkitState testkitState ) { AsyncSessionState sessionState = testkitState.getAsyncSessionStates().get( data.getSessionId() ); AsyncSession session = sessionState.getSession(); AsyncTransactionWork> workWrapper = tx -> { - String txId = testkitState.newId(); - testkitState.getAsyncTransactions().put( txId, tx ); + String txId = testkitState.addAsyncTransaction( tx ); testkitState.getResponseWriter().accept( retryableTry( txId ) ); CompletableFuture txWorkFuture = new CompletableFuture<>(); sessionState.setTxWorkFuture( txWorkFuture ); @@ -80,44 +72,36 @@ public CompletionStage> processAsync( TestkitState tes }; return session.readTransactionAsync( workWrapper ) - .thenApply( nothing -> retryableDone() ) - .thenApply( Optional::of ); + .thenApply( nothing -> retryableDone() ); } - private TransactionWork handle( TestkitState testkitState, SessionState sessionState ) + private TransactionWork handle( TestkitState testkitState, SessionState sessionState ) { return tx -> { - System.out.println( "Start" ); - sessionState.setRetryableState( 0 ); - String txId = testkitState.newId(); - testkitState.getTransactions().put( txId, tx ); + String txId = testkitState.addTransaction( tx ); testkitState.getResponseWriter().accept( retryableTry( txId ) ); + CompletableFuture txWorkFuture = new CompletableFuture<>(); + sessionState.setTxWorkFuture( txWorkFuture ); - while ( true ) + try { - // Process commands as usual but blocking in here - commandProcessor.process(); - - // Check if state changed on session - switch ( sessionState.retryableState ) + return txWorkFuture.get(); + } + catch ( Throwable throwable ) + { + Throwable workThrowable = throwable; + if ( workThrowable instanceof ExecutionException ) + { + workThrowable = workThrowable.getCause(); + } + if ( workThrowable instanceof Neo4jException ) + { + throw (Neo4jException) workThrowable; + } + else { - case 0: - // Nothing happened to session state while processing command - break; - case 1: - // Client is happy to commit - return 0; - case -1: - // Client wants to rollback - if ( !"".equals( sessionState.retryableErrorId ) ) - { - throw testkitState.getErrors().get( sessionState.retryableErrorId ); - } - else - { - throw new RuntimeException( "Error from client in retryable tx" ); - } + throw new RuntimeException( "Unexpected exception occurred in transaction work function", workThrowable ); } } }; diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionRun.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionRun.java index 1ad46e4310..5122793911 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionRun.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionRun.java @@ -60,7 +60,7 @@ public TestkitResponse process( TestkitState testkitState ) } @Override - public CompletionStage> processAsync( TestkitState testkitState ) + public CompletionStage processAsync( TestkitState testkitState ) { AsyncSession session = testkitState.getAsyncSessionStates().get( data.getSessionId() ).getSession(); Query query = Optional.ofNullable( data.params ) @@ -76,8 +76,7 @@ public CompletionStage> processAsync( TestkitState tes String newId = testkitState.newId(); testkitState.getResultCursors().put( newId, resultCursor ); return Result.builder().data( Result.ResultBody.builder().id( newId ).build() ).build(); - } ) - .thenApply( Optional::of ); + } ); } @Setter diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionWriteTransaction.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionWriteTransaction.java index f7516f3eb4..4ceeac6a29 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionWriteTransaction.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionWriteTransaction.java @@ -18,11 +18,9 @@ */ package neo4j.org.testkit.backend.messages.requests; -import com.fasterxml.jackson.annotation.JacksonInject; import lombok.Getter; import lombok.Setter; import neo4j.org.testkit.backend.AsyncSessionState; -import neo4j.org.testkit.backend.CommandProcessor; import neo4j.org.testkit.backend.SessionState; import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.messages.responses.RetryableDone; @@ -33,25 +31,20 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; import org.neo4j.driver.Session; import org.neo4j.driver.TransactionWork; import org.neo4j.driver.async.AsyncSession; import org.neo4j.driver.async.AsyncTransactionWork; +import org.neo4j.driver.exceptions.Neo4jException; @Setter @Getter public class SessionWriteTransaction implements TestkitRequest { - private final CommandProcessor commandProcessor; - private SessionWriteTransactionBody data; - public SessionWriteTransaction( @JacksonInject( CommandProcessor.COMMAND_PROCESSOR_ID ) CommandProcessor commandProcessor ) - { - this.commandProcessor = commandProcessor; - } - @Override public TestkitResponse process( TestkitState testkitState ) { @@ -65,7 +58,7 @@ public TestkitResponse process( TestkitState testkitState ) } @Override - public CompletionStage> processAsync( TestkitState testkitState ) + public CompletionStage processAsync( TestkitState testkitState ) { AsyncSessionState sessionState = testkitState.getAsyncSessionStates().get( data.getSessionId() ); AsyncSession session = sessionState.getSession(); @@ -73,8 +66,7 @@ public CompletionStage> processAsync( TestkitState tes AsyncTransactionWork> workWrapper = tx -> { - String txId = testkitState.newId(); - testkitState.getAsyncTransactions().put( txId, tx ); + String txId = testkitState.addAsyncTransaction( tx ); testkitState.getResponseWriter().accept( retryableTry( txId ) ); CompletableFuture tryResult = new CompletableFuture<>(); sessionState.setTxWorkFuture( tryResult ); @@ -82,44 +74,36 @@ public CompletionStage> processAsync( TestkitState tes }; return session.writeTransactionAsync( workWrapper ) - .thenApply( nothing -> retryableDone() ) - .thenApply( Optional::of ); + .thenApply( nothing -> retryableDone() ); } - private TransactionWork handle( TestkitState testkitState, SessionState sessionState ) + private TransactionWork handle( TestkitState testkitState, SessionState sessionState ) { return tx -> { - System.out.println( "Start" ); - sessionState.setRetryableState( 0 ); - String txId = testkitState.newId(); - testkitState.getTransactions().put( txId, tx ); + String txId = testkitState.addTransaction( tx ); testkitState.getResponseWriter().accept( retryableTry( txId ) ); + CompletableFuture txWorkFuture = new CompletableFuture<>(); + sessionState.setTxWorkFuture( txWorkFuture ); - while ( true ) + try { - // Process commands as usual but blocking in here - commandProcessor.process(); - - // Check if state changed on session - switch ( sessionState.retryableState ) + return txWorkFuture.get(); + } + catch ( Throwable throwable ) + { + Throwable workThrowable = throwable; + if ( workThrowable instanceof ExecutionException ) + { + workThrowable = workThrowable.getCause(); + } + if ( workThrowable instanceof Neo4jException ) + { + throw (Neo4jException) workThrowable; + } + else { - case 0: - // Nothing happened to session state while processing command - break; - case 1: - // Client is happy to commit - return 0; - case -1: - // Client wants to rollback - if ( !"".equals( sessionState.retryableErrorId ) ) - { - throw testkitState.getErrors().get( sessionState.retryableErrorId ); - } - else - { - throw new RuntimeException( "Error from client in retryable tx" ); - } + throw new RuntimeException( "Unexpected exception occurred in transaction work function", workThrowable ); } } }; diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java index 0e70291144..ed701746c8 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java @@ -27,7 +27,6 @@ import java.util.HashMap; import java.util.Map; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -51,7 +50,7 @@ public TestkitResponse process( TestkitState testkitState ) } @Override - public CompletionStage> processAsync( TestkitState testkitState ) + public CompletionStage processAsync( TestkitState testkitState ) { TestkitResponse testkitResponse = ASYNC_SKIP_PATTERN_TO_REASON .entrySet() @@ -65,7 +64,7 @@ public CompletionStage> processAsync( TestkitState tes .build() ) .orElseGet( () -> RunTest.builder().build() ); - return CompletableFuture.completedFuture( Optional.of( testkitResponse ) ); + return CompletableFuture.completedFuture( testkitResponse ); } @Setter diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitCallbackResult.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitCallbackResult.java index 7a4cfbba6a..b622c21b97 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitCallbackResult.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitCallbackResult.java @@ -22,7 +22,6 @@ import neo4j.org.testkit.backend.messages.responses.TestkitCallback; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -41,9 +40,9 @@ default TestkitResponse process( TestkitState testkitState ) } @Override - default CompletionStage> processAsync( TestkitState testkitState ) + default CompletionStage processAsync( TestkitState testkitState ) { testkitState.getCallbackIdToFuture().get( getCallbackId() ).complete( this ); - return CompletableFuture.completedFuture( Optional.empty() ); + return CompletableFuture.completedFuture( null ); } } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitRequest.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitRequest.java index fb3c54c97d..e9348fd94e 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitRequest.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitRequest.java @@ -23,7 +23,6 @@ import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; -import java.util.Optional; import java.util.concurrent.CompletionStage; @JsonTypeInfo( use = JsonTypeInfo.Id.NAME, property = "name" ) @@ -46,5 +45,5 @@ public interface TestkitRequest { TestkitResponse process( TestkitState testkitState ); - CompletionStage> processAsync( TestkitState testkitState ); + CompletionStage processAsync( TestkitState testkitState ); } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionClose.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionClose.java index 906bf580e9..649d69cbe3 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionClose.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionClose.java @@ -24,7 +24,6 @@ import neo4j.org.testkit.backend.messages.responses.TestkitResponse; import neo4j.org.testkit.backend.messages.responses.Transaction; -import java.util.Optional; import java.util.concurrent.CompletionStage; @Setter @@ -36,17 +35,12 @@ public class TransactionClose implements TestkitRequest @Override public TestkitResponse process( TestkitState testkitState ) { - return Optional.ofNullable( testkitState.getTransactions().get( data.getTxId() ) ) - .map( tx -> - { - tx.close(); - return createResponse( data.getTxId() ); - } ) - .orElseThrow( () -> new RuntimeException( "Could not find transaction" ) ); + testkitState.getTransaction( data.getTxId() ).close(); + return createResponse( data.getTxId() ); } @Override - public CompletionStage> processAsync( TestkitState testkitState ) + public CompletionStage processAsync( TestkitState testkitState ) { throw new UnsupportedOperationException(); } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionCommit.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionCommit.java index a692c4d662..7cbc704157 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionCommit.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionCommit.java @@ -24,11 +24,9 @@ import neo4j.org.testkit.backend.messages.responses.TestkitResponse; import neo4j.org.testkit.backend.messages.responses.Transaction; -import java.util.Optional; import java.util.concurrent.CompletionStage; @Getter - @Setter public class TransactionCommit implements TestkitRequest { @@ -37,22 +35,14 @@ public class TransactionCommit implements TestkitRequest @Override public TestkitResponse process( TestkitState testkitState ) { - return Optional.ofNullable( testkitState.getTransactions().get( data.getTxId() ) ) - .map( tx -> - { - tx.commit(); - return createResponse( data.getTxId() ); - } ) - .orElseThrow( () -> new RuntimeException( "Could not find transaction" ) ); + testkitState.getTransaction( data.getTxId() ).commit(); + return createResponse( data.getTxId() ); } @Override - public CompletionStage> processAsync( TestkitState testkitState ) + public CompletionStage processAsync( TestkitState testkitState ) { - return testkitState.getAsyncTransactions().get( data.getTxId() ) - .commitAsync() - .thenApply( ignored -> createResponse( data.getTxId() ) ) - .thenApply( Optional::of ); + return testkitState.getAsyncTransaction( data.getTxId() ).thenCompose( tx -> tx.commitAsync() ).thenApply( ignored -> createResponse( data.getTxId() ) ); } private Transaction createResponse( String txId ) diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRollback.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRollback.java index e010942288..f9be772df5 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRollback.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRollback.java @@ -24,11 +24,9 @@ import neo4j.org.testkit.backend.messages.responses.TestkitResponse; import neo4j.org.testkit.backend.messages.responses.Transaction; -import java.util.Optional; import java.util.concurrent.CompletionStage; @Getter - @Setter public class TransactionRollback implements TestkitRequest { @@ -37,22 +35,14 @@ public class TransactionRollback implements TestkitRequest @Override public TestkitResponse process( TestkitState testkitState ) { - return Optional.ofNullable( testkitState.getTransactions().get( data.getTxId() ) ) - .map( tx -> - { - tx.rollback(); - return createResponse( data.getTxId() ); - } ) - .orElseThrow( () -> new RuntimeException( "Could not find transaction" ) ); + testkitState.getTransaction( data.getTxId() ).rollback(); + return createResponse( data.getTxId() ); } @Override - public CompletionStage> processAsync( TestkitState testkitState ) + public CompletionStage processAsync( TestkitState testkitState ) { - return testkitState.getAsyncTransactions().get( data.getTxId() ) - .rollbackAsync() - .thenApply( ignored -> createResponse( data.getTxId() ) ) - .thenApply( Optional::of ); + return testkitState.getAsyncTransaction( data.getTxId() ).thenCompose( tx -> tx.rollbackAsync() ).thenApply( ignored -> createResponse( data.getTxId() ) ); } private Transaction createResponse( String txId ) diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRun.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRun.java index 029fb15a11..d25bb2cbaa 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRun.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRun.java @@ -28,7 +28,6 @@ import java.util.Collections; import java.util.Map; -import java.util.Optional; import java.util.concurrent.CompletionStage; @Setter @@ -40,30 +39,24 @@ public class TransactionRun implements TestkitRequest @Override public TestkitResponse process( TestkitState testkitState ) { - return Optional.ofNullable( testkitState.getTransactions().get( data.getTxId() ) ) - .map( tx -> - tx.run( data.getCypher(), data.getParams() != null ? data.getParams() : Collections.emptyMap() ) ) - .map( result -> - { - String resultId = testkitState.newId(); - testkitState.getResults().put( resultId, result ); - return createResponse( resultId ); - } ) - .orElseThrow( () -> new RuntimeException( "Could not find transaction" ) ); + org.neo4j.driver.Result result = + testkitState.getTransaction( data.getTxId() ).run( data.getCypher(), data.getParams() != null ? data.getParams() : Collections.emptyMap() ); + String resultId = testkitState.newId(); + testkitState.getResults().put( resultId, result ); + return createResponse( resultId ); } @Override - public CompletionStage> processAsync( TestkitState testkitState ) + public CompletionStage processAsync( TestkitState testkitState ) { - return testkitState.getAsyncTransactions().get( data.getTxId() ) - .runAsync( data.getCypher(), data.getParams() != null ? data.getParams() : Collections.emptyMap() ) - .thenApply( resultCursor -> - { - String resultId = testkitState.newId(); - testkitState.getResultCursors().put( resultId, resultCursor ); - return createResponse( resultId ); - } ) - .thenApply( Optional::of ); + return testkitState.getAsyncTransaction( data.getTxId() ) + .thenCompose( tx -> tx.runAsync( data.getCypher(), data.getParams() != null ? data.getParams() : Collections.emptyMap() )) + .thenApply( resultCursor -> + { + String resultId = testkitState.newId(); + testkitState.getResultCursors().put( resultId, resultCursor ); + return createResponse( resultId ); + } ) ; } private Result createResponse( String resultId ) diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/VerifyConnectivity.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/VerifyConnectivity.java index b157907221..3cc2ba28ba 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/VerifyConnectivity.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/VerifyConnectivity.java @@ -24,7 +24,6 @@ import neo4j.org.testkit.backend.messages.responses.Driver; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; -import java.util.Optional; import java.util.concurrent.CompletionStage; @Setter @@ -42,13 +41,12 @@ public TestkitResponse process( TestkitState testkitState ) } @Override - public CompletionStage> processAsync( TestkitState testkitState ) + public CompletionStage processAsync( TestkitState testkitState ) { String id = data.getDriverId(); return testkitState.getDrivers().get( id ) .verifyConnectivityAsync() - .thenApply( ignored -> createResponse( id ) ) - .thenApply( Optional::of ); + .thenApply( ignored -> createResponse( id ) ); } private Driver createResponse( String id ) diff --git a/testkit-backend/src/test/java/neo4j/org/testkit/backend/MessageDeserializerTest.java b/testkit-backend/src/test/java/neo4j/org/testkit/backend/MessageDeserializerTest.java index 477f7c4abf..81f309d469 100644 --- a/testkit-backend/src/test/java/neo4j/org/testkit/backend/MessageDeserializerTest.java +++ b/testkit-backend/src/test/java/neo4j/org/testkit/backend/MessageDeserializerTest.java @@ -20,6 +20,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import neo4j.org.testkit.backend.channel.handler.TestkitRequestResponseMapperHandler; import neo4j.org.testkit.backend.messages.requests.NewDriver; import neo4j.org.testkit.backend.messages.requests.NewSession; import neo4j.org.testkit.backend.messages.requests.SessionRun; @@ -32,7 +33,7 @@ class MessageDeserializerTest { - private static final ObjectMapper mapper = CommandProcessor.newObjectMapperFor( () -> false ); + private static final ObjectMapper mapper = TestkitRequestResponseMapperHandler.newObjectMapper(); @Test void testDeserializeNewDriver() throws JsonProcessingException diff --git a/testkit-backend/src/test/java/neo4j/org/testkit/backend/MessageSerializerTest.java b/testkit-backend/src/test/java/neo4j/org/testkit/backend/MessageSerializerTest.java index b898f44b21..6e0311d784 100644 --- a/testkit-backend/src/test/java/neo4j/org/testkit/backend/MessageSerializerTest.java +++ b/testkit-backend/src/test/java/neo4j/org/testkit/backend/MessageSerializerTest.java @@ -20,6 +20,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import neo4j.org.testkit.backend.channel.handler.TestkitRequestResponseMapperHandler; import neo4j.org.testkit.backend.messages.responses.Driver; import org.junit.jupiter.api.Test; @@ -28,7 +29,7 @@ public class MessageSerializerTest { - private static final ObjectMapper mapper = CommandProcessor.newObjectMapperFor( () -> false ); + private static final ObjectMapper mapper = TestkitRequestResponseMapperHandler.newObjectMapper(); @Test void shouldSerializerNewDriverResponse() throws JsonProcessingException