From 1f68f5da3e6180d9cadb1f0b9c05a297eb394955 Mon Sep 17 00:00:00 2001 From: lutovich Date: Thu, 12 Oct 2017 12:00:48 +0200 Subject: [PATCH 1/4] Mute ACK_FAILURE when sending RESET It's needed because RESET makes server terminate all running messages and then moves state machine to idle READY state. That's why it does not expect clients to ACK_FAILURE for terminated messages. Absence of this muting caused server to sometimes fail saying: "ACK_FAILURE cannot be handled by a session in the READY state". Also removed some unused blocking API classes and better naming for connection methods. --- .../internal/DirectConnectionProvider.java | 2 +- .../driver/internal/ExplicitTransaction.java | 2 +- .../neo4j/driver/internal/NetworkSession.java | 3 +- .../async/HandshakeCompletedListener.java | 4 +- .../internal/async/NettyConnection.java | 33 ++++--- .../internal/async/RoutingConnection.java | 8 +- .../inbound/InboundMessageDispatcher.java | 47 +++++++++- .../handlers/AsyncInitResponseHandler.java | 93 ------------------- .../handlers/InitResponseHandler.java | 51 ++++++++-- .../handlers/ResetAsyncResponseHandler.java | 50 ---------- .../ResetResponseHandler.java} | 23 +++-- .../SessionPullAllResponseHandler.java | 4 +- .../neo4j/driver/internal/spi/Connection.java | 4 +- .../internal/ExplicitTransactionTest.java | 6 +- .../driver/internal/NetworkSessionTest.java | 14 +-- .../async/HandshakeCompletedListenerTest.java | 6 +- .../internal/async/NettyConnectionTest.java | 4 +- .../async/pool/ConnectionPoolImplTest.java | 4 +- ...Test.java => InitResponseHandlerTest.java} | 12 +-- .../ResetResponseHandlerTest.java} | 46 ++++++--- .../v1/integration/ConnectionHandlingIT.java | 42 ++++----- 21 files changed, 210 insertions(+), 248 deletions(-) delete mode 100644 driver/src/main/java/org/neo4j/driver/internal/handlers/AsyncInitResponseHandler.java delete mode 100644 driver/src/main/java/org/neo4j/driver/internal/handlers/ResetAsyncResponseHandler.java rename driver/src/main/java/org/neo4j/driver/internal/{async/ReleaseChannelHandler.java => handlers/ResetResponseHandler.java} (70%) rename driver/src/test/java/org/neo4j/driver/internal/handlers/{AsyncInitResponseHandlerTest.java => InitResponseHandlerTest.java} (90%) rename driver/src/test/java/org/neo4j/driver/internal/{async/ReleaseChannelHandlerTest.java => handlers/ResetResponseHandlerTest.java} (64%) diff --git a/driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java b/driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java index 2d1cd7e6aa..bd2de5282e 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java +++ b/driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java @@ -52,7 +52,7 @@ public CompletionStage acquireConnection( AccessMode mode ) @Override public CompletionStage verifyConnectivity() { - return acquireConnection( READ ).thenCompose( Connection::forceRelease ); + return acquireConnection( READ ).thenCompose( Connection::releaseNow ); } @Override diff --git a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java index 7739f1fc92..25ddd09bbb 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java @@ -192,7 +192,7 @@ private BiConsumer transactionClosed( State newState ) { // todo: test that this state transition always happens when commit or rollback state = newState; - connection.release(); + connection.releaseInBackground(); session.setBookmark( bookmark ); }; } diff --git a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java index bec2d47859..2f17f3ba7e 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java @@ -492,11 +492,12 @@ private CompletionStage forceReleaseConnection() { if ( connection != null ) { - return connection.forceRelease(); + return connection.releaseNow(); } return completedFuture( null ); } ).exceptionally( error -> { + // todo: this log message looks wrong, should it go to #rollbackTransaction() ? logger.error( "Failed to rollback active transaction", error ); return null; } ); diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/HandshakeCompletedListener.java b/driver/src/main/java/org/neo4j/driver/internal/async/HandshakeCompletedListener.java index b5ec6257ed..0d765f577a 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/HandshakeCompletedListener.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/HandshakeCompletedListener.java @@ -25,7 +25,7 @@ import java.util.Map; -import org.neo4j.driver.internal.handlers.AsyncInitResponseHandler; +import org.neo4j.driver.internal.handlers.InitResponseHandler; import org.neo4j.driver.internal.messaging.InitMessage; import org.neo4j.driver.v1.Value; @@ -54,7 +54,7 @@ public void operationComplete( ChannelFuture future ) Channel channel = future.channel(); InitMessage message = new InitMessage( userAgent, authToken ); - AsyncInitResponseHandler handler = new AsyncInitResponseHandler( connectionInitializedPromise ); + InitResponseHandler handler = new InitResponseHandler( connectionInitializedPromise ); messageDispatcher( channel ).queue( handler ); channel.writeAndFlush( message, channel.voidPromise() ); diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java b/driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java index 87a011c6b1..ffdb0e1165 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java @@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher; +import org.neo4j.driver.internal.handlers.ResetResponseHandler; import org.neo4j.driver.internal.messaging.Message; import org.neo4j.driver.internal.messaging.PullAllMessage; import org.neo4j.driver.internal.messaging.ResetMessage; @@ -61,6 +62,12 @@ public NettyConnection( Channel channel, ChannelPool channelPool, Clock clock ) this.clock = clock; } + @Override + public boolean isInUse() + { + return state.isInUse(); + } + @Override public boolean tryMarkInUse() { @@ -99,29 +106,22 @@ public void runAndFlush( String statement, Map parameters, Respons run( statement, parameters, runHandler, pullAllHandler, true ); } - // TODO change this to return future or say that it does stuff in the background in the method name? @Override - public void release() + public void releaseInBackground() { if ( state.release() ) { - reset( new ReleaseChannelHandler( channel, channelPool, clock ) ); + reset( new ResetResponseHandler( channel, channelPool, messageDispatcher, clock ) ); } } @Override - public boolean isInUse() - { - return state.isInUse(); - } - - @Override - public CompletionStage forceRelease() + public CompletionStage releaseNow() { if ( state.forceRelease() ) { Promise releasePromise = channel.eventLoop().newPromise(); - reset( new ReleaseChannelHandler( channel, channelPool, clock, releasePromise ) ); + reset( new ResetResponseHandler( channel, channelPool, messageDispatcher, clock, releasePromise ) ); return asCompletionStage( releasePromise ); } else @@ -151,7 +151,11 @@ private void run( String statement, Map parameters, ResponseHandle private void reset( ResponseHandler resetHandler ) { - writeAndFlushMessageInEventLoop( ResetMessage.RESET, resetHandler ); + channel.eventLoop().execute( () -> + { + messageDispatcher.muteAckFailure(); + writeAndFlushMessage( ResetMessage.RESET, resetHandler ); + } ); } private void writeMessagesInEventLoop( Message message1, ResponseHandler handler1, Message message2, @@ -160,11 +164,6 @@ private void writeMessagesInEventLoop( Message message1, ResponseHandler handler channel.eventLoop().execute( () -> writeMessages( message1, handler1, message2, handler2, flush ) ); } - private void writeAndFlushMessageInEventLoop( Message message, ResponseHandler handler ) - { - channel.eventLoop().execute( () -> writeAndFlushMessage( message, handler ) ); - } - private void writeMessages( Message message1, ResponseHandler handler1, Message message2, ResponseHandler handler2, boolean flush ) { diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/RoutingConnection.java b/driver/src/main/java/org/neo4j/driver/internal/async/RoutingConnection.java index bcd3e7468d..b2753c486e 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/RoutingConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/RoutingConnection.java @@ -76,9 +76,9 @@ public void runAndFlush( String statement, Map parameters, Respons } @Override - public void release() + public void releaseInBackground() { - delegate.release(); + delegate.releaseInBackground(); } @Override @@ -88,9 +88,9 @@ public boolean isInUse() } @Override - public CompletionStage forceRelease() + public CompletionStage releaseNow() { - return delegate.forceRelease(); + return delegate.releaseNow(); } @Override diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java index af6e594f4e..6e07f194ed 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java @@ -26,7 +26,6 @@ import java.util.Queue; import org.neo4j.driver.internal.handlers.AckFailureResponseHandler; -import org.neo4j.driver.internal.messaging.AckFailureMessage; import org.neo4j.driver.internal.messaging.MessageHandler; import org.neo4j.driver.internal.spi.ResponseHandler; import org.neo4j.driver.internal.util.ErrorUtil; @@ -35,6 +34,7 @@ import org.neo4j.driver.v1.Value; import static java.util.Objects.requireNonNull; +import static org.neo4j.driver.internal.messaging.AckFailureMessage.ACK_FAILURE; public class InboundMessageDispatcher implements MessageHandler { @@ -44,6 +44,7 @@ public class InboundMessageDispatcher implements MessageHandler private Throwable currentError; private boolean fatalErrorOccurred; + private boolean ackFailureMuted; public InboundMessageDispatcher( Channel channel, Logging logging ) { @@ -133,9 +134,8 @@ public void handleFailureMessage( String code, String message ) log.debug( "Received FAILURE message with code '%s' and message '%s'", code, message ); currentError = ErrorUtil.newNeo4jError( code, message ); - // queue ACK_FAILURE before notifying the next response handler - queue( new AckFailureResponseHandler( this ) ); - channel.writeAndFlush( AckFailureMessage.ACK_FAILURE, channel.voidPromise() ); + // try to write ACK_FAILURE before notifying the next response handler + ackFailureIfNeeded(); ResponseHandler handler = handlers.remove(); handler.onFailure( currentError ); @@ -179,4 +179,43 @@ public Throwable currentError() { return currentError; } + + /** + * Makes this message dispatcher not send ACK_FAILURE in response to FAILURE until it's un-muted using + * {@link #unMuteAckFailure()}. Muting ACK_FAILURE is needed only when sending RESET message. RESET "jumps" + * over all queued messages on server and makes them fail. Received failures do not need to be acknowledge because + * RESET moves server's state machine to READY state. + *

+ * This method is not thread-safe and should only be executed by the event loop thread. + */ + public void muteAckFailure() + { + ackFailureMuted = true; + } + + /** + * Makes this message dispatcher send ACK_FAILURE in response to FAILURE. Should be used in combination with + * {@link #muteAckFailure()} when sending RESET message. + *

+ * This method is not thread-safe and should only be executed by the event loop thread. + * + * @throws IllegalStateException if ACK_FAILURE is not muted right now. + */ + public void unMuteAckFailure() + { + if ( !ackFailureMuted ) + { + throw new IllegalStateException( "Can't un-mute ACK_FAILURE because it's not muted" ); + } + ackFailureMuted = false; + } + + private void ackFailureIfNeeded() + { + if ( !ackFailureMuted ) + { + queue( new AckFailureResponseHandler( this ) ); + channel.writeAndFlush( ACK_FAILURE, channel.voidPromise() ); + } + } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/AsyncInitResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/AsyncInitResponseHandler.java deleted file mode 100644 index eaabb497a9..0000000000 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/AsyncInitResponseHandler.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Copyright (c) 2002-2017 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.driver.internal.handlers; - -import io.netty.channel.Channel; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelPromise; - -import java.util.Map; - -import org.neo4j.driver.internal.async.outbound.OutboundMessageHandler; -import org.neo4j.driver.internal.spi.ResponseHandler; -import org.neo4j.driver.internal.util.ServerVersion; -import org.neo4j.driver.v1.Value; - -import static org.neo4j.driver.internal.async.ChannelAttributes.setServerVersion; - -public class AsyncInitResponseHandler implements ResponseHandler -{ - private final ChannelPromise connectionInitializedPromise; - private final Channel channel; - - public AsyncInitResponseHandler( ChannelPromise connectionInitializedPromise ) - { - this.connectionInitializedPromise = connectionInitializedPromise; - this.channel = connectionInitializedPromise.channel(); - } - - @Override - public void onSuccess( Map metadata ) - { - try - { - ServerVersion serverVersion = extractServerVersion( metadata ); - setServerVersion( channel, serverVersion ); - updatePipelineIfNeeded( serverVersion, channel.pipeline() ); - connectionInitializedPromise.setSuccess(); - } - catch ( Throwable error ) - { - connectionInitializedPromise.setFailure( error ); - throw error; - } - } - - @Override - public void onFailure( Throwable error ) - { - channel.close().addListener( future -> connectionInitializedPromise.setFailure( error ) ); - } - - @Override - public void onRecord( Value[] fields ) - { - throw new UnsupportedOperationException(); - } - - private static ServerVersion extractServerVersion( Map metadata ) - { - Value versionValue = metadata.get( "server" ); - boolean versionAbsent = versionValue == null || versionValue.isNull(); - return versionAbsent ? ServerVersion.v3_0_0 : ServerVersion.version( versionValue.asString() ); - } - - private static void updatePipelineIfNeeded( ServerVersion serverVersion, ChannelPipeline pipeline ) - { - if ( serverVersion.lessThan( ServerVersion.v3_2_0 ) ) - { - OutboundMessageHandler outboundHandler = pipeline.get( OutboundMessageHandler.class ); - if ( outboundHandler == null ) - { - throw new IllegalStateException( "Can't find " + OutboundMessageHandler.NAME + " in the pipeline" ); - } - pipeline.replace( outboundHandler, OutboundMessageHandler.NAME, outboundHandler.withoutByteArraySupport() ); - } - } -} diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/InitResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/InitResponseHandler.java index c055d38638..a4cfb59197 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/InitResponseHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/InitResponseHandler.java @@ -18,37 +18,76 @@ */ package org.neo4j.driver.internal.handlers; +import io.netty.channel.Channel; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelPromise; + import java.util.Map; +import org.neo4j.driver.internal.async.outbound.OutboundMessageHandler; import org.neo4j.driver.internal.spi.ResponseHandler; +import org.neo4j.driver.internal.util.ServerVersion; import org.neo4j.driver.v1.Value; +import static org.neo4j.driver.internal.async.ChannelAttributes.setServerVersion; + public class InitResponseHandler implements ResponseHandler { - private String serverVersion; + private final ChannelPromise connectionInitializedPromise; + private final Channel channel; + + public InitResponseHandler( ChannelPromise connectionInitializedPromise ) + { + this.connectionInitializedPromise = connectionInitializedPromise; + this.channel = connectionInitializedPromise.channel(); + } @Override public void onSuccess( Map metadata ) { - Value versionValue = metadata.get( "server" ); - if ( versionValue != null ) + try + { + ServerVersion serverVersion = extractServerVersion( metadata ); + setServerVersion( channel, serverVersion ); + updatePipelineIfNeeded( serverVersion, channel.pipeline() ); + connectionInitializedPromise.setSuccess(); + } + catch ( Throwable error ) { - serverVersion = versionValue.asString(); + connectionInitializedPromise.setFailure( error ); + throw error; } } @Override public void onFailure( Throwable error ) { + channel.close().addListener( future -> connectionInitializedPromise.setFailure( error ) ); } @Override public void onRecord( Value[] fields ) { + throw new UnsupportedOperationException(); } - public String serverVersion() + private static ServerVersion extractServerVersion( Map metadata ) { - return serverVersion; + Value versionValue = metadata.get( "server" ); + boolean versionAbsent = versionValue == null || versionValue.isNull(); + return versionAbsent ? ServerVersion.v3_0_0 : ServerVersion.version( versionValue.asString() ); + } + + private static void updatePipelineIfNeeded( ServerVersion serverVersion, ChannelPipeline pipeline ) + { + if ( serverVersion.lessThan( ServerVersion.v3_2_0 ) ) + { + OutboundMessageHandler outboundHandler = pipeline.get( OutboundMessageHandler.class ); + if ( outboundHandler == null ) + { + throw new IllegalStateException( "Can't find " + OutboundMessageHandler.NAME + " in the pipeline" ); + } + pipeline.replace( outboundHandler, OutboundMessageHandler.NAME, outboundHandler.withoutByteArraySupport() ); + } } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/ResetAsyncResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/ResetAsyncResponseHandler.java deleted file mode 100644 index 484e08bc51..0000000000 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/ResetAsyncResponseHandler.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright (c) 2002-2017 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.driver.internal.handlers; - -import java.util.Map; - -import org.neo4j.driver.internal.spi.ResponseHandler; -import org.neo4j.driver.v1.Value; - -public class ResetAsyncResponseHandler implements ResponseHandler -{ - private final Runnable successCallback; - - public ResetAsyncResponseHandler( Runnable successCallback ) - { - this.successCallback = successCallback; - } - - @Override - public void onSuccess( Map metadata ) - { - successCallback.run(); - } - - @Override - public void onFailure( Throwable error ) - { - } - - @Override - public void onRecord( Value[] fields ) - { - } -} diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/ReleaseChannelHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/ResetResponseHandler.java similarity index 70% rename from driver/src/main/java/org/neo4j/driver/internal/async/ReleaseChannelHandler.java rename to driver/src/main/java/org/neo4j/driver/internal/handlers/ResetResponseHandler.java index 3cba57577a..2247fbf48e 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/ReleaseChannelHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/ResetResponseHandler.java @@ -16,7 +16,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.neo4j.driver.internal.async; +package org.neo4j.driver.internal.handlers; import io.netty.channel.Channel; import io.netty.channel.pool.ChannelPool; @@ -24,30 +24,34 @@ import java.util.Map; +import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher; import org.neo4j.driver.internal.spi.ResponseHandler; import org.neo4j.driver.internal.util.Clock; import org.neo4j.driver.v1.Value; -import static java.util.Objects.requireNonNull; import static org.neo4j.driver.internal.async.ChannelAttributes.setLastUsedTimestamp; -public class ReleaseChannelHandler implements ResponseHandler +public class ResetResponseHandler implements ResponseHandler { private final Channel channel; private final ChannelPool pool; + private final InboundMessageDispatcher messageDispatcher; private final Clock clock; private final Promise releasePromise; - public ReleaseChannelHandler( Channel channel, ChannelPool pool, Clock clock ) + public ResetResponseHandler( Channel channel, ChannelPool pool, InboundMessageDispatcher messageDispatcher, + Clock clock ) { - this( channel, pool, clock, null ); + this( channel, pool, messageDispatcher, clock, null ); } - public ReleaseChannelHandler( Channel channel, ChannelPool pool, Clock clock, Promise releasePromise ) + public ResetResponseHandler( Channel channel, ChannelPool pool, InboundMessageDispatcher messageDispatcher, + Clock clock, Promise releasePromise ) { - this.channel = requireNonNull( channel ); - this.pool = requireNonNull( pool ); - this.clock = requireNonNull( clock ); + this.channel = channel; + this.pool = pool; + this.messageDispatcher = messageDispatcher; + this.clock = clock; this.releasePromise = releasePromise; } @@ -71,6 +75,7 @@ public void onRecord( Value[] fields ) private void releaseChannel() { + messageDispatcher.unMuteAckFailure(); setLastUsedTimestamp( channel, clock.millis() ); if ( releasePromise == null ) diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/SessionPullAllResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/SessionPullAllResponseHandler.java index 426455cfa3..0710881618 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/SessionPullAllResponseHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/SessionPullAllResponseHandler.java @@ -32,12 +32,12 @@ public SessionPullAllResponseHandler( Statement statement, RunResponseHandler ru @Override protected void afterSuccess() { - connection.release(); + connection.releaseInBackground(); } @Override protected void afterFailure( Throwable error ) { - connection.release(); + connection.releaseInBackground(); } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java b/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java index 577a6d9cf5..66e988062a 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java @@ -41,9 +41,9 @@ void run( String statement, Map parameters, ResponseHandler runHan void runAndFlush( String statement, Map parameters, ResponseHandler runHandler, ResponseHandler pullAllHandler ); - void release(); + void releaseInBackground(); - CompletionStage forceRelease(); + CompletionStage releaseNow(); BoltServerAddress serverAddress(); diff --git a/driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java b/driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java index 30fdd049c1..8a8c48d0d6 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java @@ -52,7 +52,7 @@ public void shouldRollbackOnImplicitFailure() InOrder order = inOrder( connection ); order.verify( connection ).run( eq( "BEGIN" ), any(), any(), any() ); order.verify( connection ).runAndFlush( eq( "ROLLBACK" ), any(), any(), any() ); - order.verify( connection ).release(); + order.verify( connection ).releaseInBackground(); } @Test @@ -71,7 +71,7 @@ public void shouldRollbackOnExplicitFailure() InOrder order = inOrder( connection ); order.verify( connection ).run( eq( "BEGIN" ), any(), any(), any() ); order.verify( connection ).runAndFlush( eq( "ROLLBACK" ), any(), any(), any() ); - order.verify( connection ).release(); + order.verify( connection ).releaseInBackground(); } @Test @@ -89,7 +89,7 @@ public void shouldCommitOnSuccess() InOrder order = inOrder( connection ); order.verify( connection ).run( eq( "BEGIN" ), any(), any(), any() ); order.verify( connection ).runAndFlush( eq( "COMMIT" ), any(), any(), any() ); - order.verify( connection ).release(); + order.verify( connection ).releaseInBackground(); } @Test diff --git a/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java index 57c0ddc37a..06053347f9 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java @@ -87,7 +87,7 @@ public class NetworkSessionTest public void setUp() { connection = connectionMock(); - when( connection.forceRelease() ).thenReturn( completedFuture( null ) ); + when( connection.releaseNow() ).thenReturn( completedFuture( null ) ); connectionProvider = mock( ConnectionProvider.class ); when( connectionProvider.acquireConnection( any( AccessMode.class ) ) ) .thenReturn( completedFuture( connection ) ); @@ -255,7 +255,7 @@ public void forceReleasesOpenConnectionUsedForRunWhenSessionIsClosed() InOrder inOrder = inOrder( connection ); inOrder.verify( connection ).runAndFlush( eq( "RETURN 1" ), any(), any(), any() ); - inOrder.verify( connection ).forceRelease(); + inOrder.verify( connection ).releaseNow(); } @SuppressWarnings( "deprecation" ) @@ -358,7 +358,7 @@ public void releasesConnectionWhenTxIsClosed() verify( connection ).runAndFlush( eq( "RETURN 1" ), any(), any(), any() ); tx.close(); - verify( connection ).release(); + verify( connection ).releaseInBackground(); } @Test @@ -574,12 +574,12 @@ public void connectionShouldBeReleasedAfterSessionReset() NetworkSession session = newSession( connectionProvider, READ ); session.run( "RETURN 1" ); - verify( connection, never() ).release(); - verify( connection, never() ).forceRelease(); + verify( connection, never() ).releaseInBackground(); + verify( connection, never() ).releaseNow(); session.reset(); - verify( connection, never() ).release(); - verify( connection ).forceRelease(); + verify( connection, never() ).releaseInBackground(); + verify( connection ).releaseNow(); } @Test diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/HandshakeCompletedListenerTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/HandshakeCompletedListenerTest.java index f6c54c1756..9de415b6ce 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/HandshakeCompletedListenerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/HandshakeCompletedListenerTest.java @@ -28,7 +28,7 @@ import java.util.Map; import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher; -import org.neo4j.driver.internal.handlers.AsyncInitResponseHandler; +import org.neo4j.driver.internal.handlers.InitResponseHandler; import org.neo4j.driver.internal.messaging.InitMessage; import org.neo4j.driver.v1.Value; @@ -94,7 +94,7 @@ public void shouldWriteInitMessageWhenHandshakeCompleted() listener.operationComplete( handshakeCompletedPromise ); assertTrue( channel.finish() ); - verify( messageDispatcher ).queue( any( AsyncInitResponseHandler.class ) ); + verify( messageDispatcher ).queue( any( InitResponseHandler.class ) ); Object outboundMessage = channel.readOutbound(); assertThat( outboundMessage, instanceOf( InitMessage.class ) ); InitMessage initMessage = (InitMessage) outboundMessage; @@ -106,7 +106,7 @@ private static Map authToken() { Map authToken = new HashMap<>(); authToken.put( "username", value( "neo4j" ) ); - authToken.put( "username", value( "secret" ) ); + authToken.put( "password", value( "secret" ) ); return authToken; } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/NettyConnectionTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/NettyConnectionTest.java index 4bac90376c..6f4e87af66 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/NettyConnectionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/NettyConnectionTest.java @@ -80,13 +80,13 @@ public void shouldWriteRunAndFlushInEventLoopThread() throws Exception @Test public void shouldWriteReleaseInEventLoopThread() throws Exception { - testWriteInEventLoop( "ReleaseTestEventLoop", NettyConnection::release ); + testWriteInEventLoop( "ReleaseTestEventLoop", NettyConnection::releaseInBackground ); } @Test public void shouldWriteForceReleaseInEventLoopThread() throws Exception { - testWriteInEventLoop( "ForceReleaseTestEventLoop", NettyConnection::forceRelease ); + testWriteInEventLoop( "ReleaseNowTestEventLoop", NettyConnection::releaseNow ); } private void testWriteInEventLoop( String threadName, Consumer action ) throws Exception diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImplTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImplTest.java index ad3fd666fe..1954fbb634 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImplTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImplTest.java @@ -77,7 +77,7 @@ public void shouldAcquireConnectionWhenPoolIsEmpty() throws Exception public void shouldAcquireIdleConnection() throws Exception { Connection connection1 = await( pool.acquire( neo4j.address() ) ); - await( connection1.forceRelease() ); + await( connection1.releaseNow() ); Connection connection2 = await( pool.acquire( neo4j.address() ) ); assertNotNull( connection2 ); @@ -102,7 +102,7 @@ public void shouldFailToAcquireConnectionToWrongAddress() throws Exception public void shouldFailToAcquireWhenPoolClosed() throws Exception { Connection connection = await( pool.acquire( neo4j.address() ) ); - await( connection.forceRelease() ); + await( connection.releaseNow() ); await( pool.close() ); try diff --git a/driver/src/test/java/org/neo4j/driver/internal/handlers/AsyncInitResponseHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/handlers/InitResponseHandlerTest.java similarity index 90% rename from driver/src/test/java/org/neo4j/driver/internal/handlers/AsyncInitResponseHandlerTest.java rename to driver/src/test/java/org/neo4j/driver/internal/handlers/InitResponseHandlerTest.java index 8d47beb84f..d9a24d1c5a 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/handlers/AsyncInitResponseHandlerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/handlers/InitResponseHandlerTest.java @@ -50,7 +50,7 @@ import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; import static org.neo4j.driver.v1.Values.value; -public class AsyncInitResponseHandlerTest +public class InitResponseHandlerTest { private final EmbeddedChannel channel = new EmbeddedChannel(); @@ -67,7 +67,7 @@ public void setUp() public void shouldSetServerVersionOnChannel() { ChannelPromise channelPromise = channel.newPromise(); - AsyncInitResponseHandler handler = new AsyncInitResponseHandler( channelPromise ); + InitResponseHandler handler = new InitResponseHandler( channelPromise ); Map metadata = singletonMap( "server", value( ServerVersion.v3_2_0.toString() ) ); handler.onSuccess( metadata ); @@ -80,7 +80,7 @@ public void shouldSetServerVersionOnChannel() public void shouldSetServerVersionToDefaultValueWhenUnknown() { ChannelPromise channelPromise = channel.newPromise(); - AsyncInitResponseHandler handler = new AsyncInitResponseHandler( channelPromise ); + InitResponseHandler handler = new InitResponseHandler( channelPromise ); Map metadata = singletonMap( "server", Values.NULL ); handler.onSuccess( metadata ); @@ -92,7 +92,7 @@ public void shouldSetServerVersionToDefaultValueWhenUnknown() @Test public void shouldAllowByteArraysForNewerVersions() { - AsyncInitResponseHandler handler = new AsyncInitResponseHandler( channel.newPromise() ); + InitResponseHandler handler = new InitResponseHandler( channel.newPromise() ); Map metadata = singletonMap( "server", value( ServerVersion.v3_2_0.toString() ) ); handler.onSuccess( metadata ); @@ -105,7 +105,7 @@ public void shouldAllowByteArraysForNewerVersions() @Test public void shouldNotAllowByteArraysForOldVersions() { - AsyncInitResponseHandler handler = new AsyncInitResponseHandler( channel.newPromise() ); + InitResponseHandler handler = new InitResponseHandler( channel.newPromise() ); Map metadata = singletonMap( "server", value( ServerVersion.v3_0_0.toString() ) ); handler.onSuccess( metadata ); @@ -126,7 +126,7 @@ public void shouldNotAllowByteArraysForOldVersions() public void shouldCloseChannelOnFailure() throws Exception { ChannelPromise channelPromise = channel.newPromise(); - AsyncInitResponseHandler handler = new AsyncInitResponseHandler( channelPromise ); + InitResponseHandler handler = new InitResponseHandler( channelPromise ); RuntimeException error = new RuntimeException( "Hi!" ); handler.onFailure( error ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/ReleaseChannelHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/handlers/ResetResponseHandlerTest.java similarity index 64% rename from driver/src/test/java/org/neo4j/driver/internal/async/ReleaseChannelHandlerTest.java rename to driver/src/test/java/org/neo4j/driver/internal/handlers/ResetResponseHandlerTest.java index 9812fdd2d0..f55306fe2d 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/ReleaseChannelHandlerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/handlers/ResetResponseHandlerTest.java @@ -16,7 +16,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.neo4j.driver.internal.async; +package org.neo4j.driver.internal.handlers; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.channel.pool.ChannelPool; @@ -24,22 +24,22 @@ import org.junit.After; import org.junit.Test; -import java.util.Collections; - +import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher; import org.neo4j.driver.internal.util.FakeClock; -import org.neo4j.driver.v1.Value; +import static java.util.Collections.emptyMap; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.neo4j.driver.internal.async.ChannelAttributes.lastUsedTimestamp; -public class ReleaseChannelHandlerTest +public class ResetResponseHandlerTest { private final EmbeddedChannel channel = new EmbeddedChannel(); + private final InboundMessageDispatcher messageDispatcher = mock( InboundMessageDispatcher.class ); @After - public void tearDown() throws Exception + public void tearDown() { channel.close(); } @@ -50,9 +50,9 @@ public void shouldReleaseChannelOnSuccess() ChannelPool pool = mock( ChannelPool.class ); FakeClock clock = new FakeClock(); clock.progress( 5 ); - ReleaseChannelHandler handler = new ReleaseChannelHandler( channel, pool, clock ); + ResetResponseHandler handler = new ResetResponseHandler( channel, pool, messageDispatcher, clock ); - handler.onSuccess( Collections.emptyMap() ); + handler.onSuccess( emptyMap() ); verifyLastUsedTimestamp( 5 ); verify( pool ).release( channel ); @@ -65,9 +65,9 @@ public void shouldReleaseChannelWithPromiseOnSuccess() FakeClock clock = new FakeClock(); clock.progress( 42 ); Promise promise = channel.newPromise(); - ReleaseChannelHandler handler = new ReleaseChannelHandler( channel, pool, clock, promise ); + ResetResponseHandler handler = new ResetResponseHandler( channel, pool, messageDispatcher, clock, promise ); - handler.onSuccess( Collections.emptyMap() ); + handler.onSuccess( emptyMap() ); verifyLastUsedTimestamp( 42 ); verify( pool ).release( channel, promise ); @@ -79,7 +79,7 @@ public void shouldReleaseChannelOnFailure() ChannelPool pool = mock( ChannelPool.class ); FakeClock clock = new FakeClock(); clock.progress( 100 ); - ReleaseChannelHandler handler = new ReleaseChannelHandler( channel, pool, clock ); + ResetResponseHandler handler = new ResetResponseHandler( channel, pool, messageDispatcher, clock ); handler.onFailure( new RuntimeException() ); @@ -94,7 +94,7 @@ public void shouldReleaseChannelWithPromiseOnFailure() FakeClock clock = new FakeClock(); clock.progress( 99 ); Promise promise = channel.newPromise(); - ReleaseChannelHandler handler = new ReleaseChannelHandler( channel, pool, clock, promise ); + ResetResponseHandler handler = new ResetResponseHandler( channel, pool, messageDispatcher, clock, promise ); handler.onFailure( new RuntimeException() ); @@ -102,6 +102,28 @@ public void shouldReleaseChannelWithPromiseOnFailure() verify( pool ).release( channel, promise ); } + @Test + public void shouldUnMuteAckFailureOnSuccess() + { + ChannelPool pool = mock( ChannelPool.class ); + ResetResponseHandler handler = new ResetResponseHandler( channel, pool, messageDispatcher, new FakeClock() ); + + handler.onSuccess( emptyMap() ); + + verify( messageDispatcher ).unMuteAckFailure(); + } + + @Test + public void shouldUnMuteAckFailureOnFailure() + { + ChannelPool pool = mock( ChannelPool.class ); + ResetResponseHandler handler = new ResetResponseHandler( channel, pool, messageDispatcher, new FakeClock() ); + + handler.onFailure( new RuntimeException() ); + + verify( messageDispatcher ).unMuteAckFailure(); + } + private void verifyLastUsedTimestamp( int expectedValue ) { assertEquals( expectedValue, lastUsedTimestamp( channel ).intValue() ); diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/ConnectionHandlingIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/ConnectionHandlingIT.java index 29df4b0e54..a123236cdd 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/ConnectionHandlingIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/ConnectionHandlingIT.java @@ -101,13 +101,13 @@ public void connectionUsedForSessionRunReturnedToThePoolWhenResultConsumed() StatementResult result = createNodesInNewSession( 12 ); Connection connection1 = connectionPool.lastAcquiredConnectionSpy; - verify( connection1, never() ).release(); + verify( connection1, never() ).releaseInBackground(); result.consume(); Connection connection2 = connectionPool.lastAcquiredConnectionSpy; assertSame( connection1, connection2 ); - verify( connection1 ).release(); + verify( connection1 ).releaseInBackground(); } @Test @@ -116,14 +116,14 @@ public void connectionUsedForSessionRunReturnedToThePoolWhenResultSummaryObtaine StatementResult result = createNodesInNewSession( 5 ); Connection connection1 = connectionPool.lastAcquiredConnectionSpy; - verify( connection1, never() ).release(); + verify( connection1, never() ).releaseInBackground(); ResultSummary summary = result.summary(); assertEquals( 5, summary.counters().nodesCreated() ); Connection connection2 = connectionPool.lastAcquiredConnectionSpy; assertSame( connection1, connection2 ); - verify( connection1 ).release(); + verify( connection1 ).releaseInBackground(); } @Test @@ -132,14 +132,14 @@ public void connectionUsedForSessionRunReturnedToThePoolWhenResultFetchedInList( StatementResult result = createNodesInNewSession( 2 ); Connection connection1 = connectionPool.lastAcquiredConnectionSpy; - verify( connection1, never() ).release(); + verify( connection1, never() ).releaseInBackground(); List records = result.list(); assertEquals( 2, records.size() ); Connection connection2 = connectionPool.lastAcquiredConnectionSpy; assertSame( connection1, connection2 ); - verify( connection1 ).release(); + verify( connection1 ).releaseInBackground(); } @Test @@ -148,13 +148,13 @@ public void connectionUsedForSessionRunReturnedToThePoolWhenSingleRecordFetched( StatementResult result = createNodesInNewSession( 1 ); Connection connection1 = connectionPool.lastAcquiredConnectionSpy; - verify( connection1, never() ).release(); + verify( connection1, never() ).releaseInBackground(); assertNotNull( result.single() ); Connection connection2 = connectionPool.lastAcquiredConnectionSpy; assertSame( connection1, connection2 ); - verify( connection1 ).release(); + verify( connection1 ).releaseInBackground(); } @Test @@ -163,7 +163,7 @@ public void connectionUsedForSessionRunReturnedToThePoolWhenResultFetchedAsItera StatementResult result = createNodesInNewSession( 6 ); Connection connection1 = connectionPool.lastAcquiredConnectionSpy; - verify( connection1, never() ).release(); + verify( connection1, never() ).releaseInBackground(); int seenRecords = 0; while ( result.hasNext() ) @@ -175,7 +175,7 @@ public void connectionUsedForSessionRunReturnedToThePoolWhenResultFetchedAsItera Connection connection2 = connectionPool.lastAcquiredConnectionSpy; assertSame( connection1, connection2 ); - verify( connection1 ).release(); + verify( connection1 ).releaseInBackground(); } @Test @@ -186,7 +186,7 @@ public void connectionUsedForSessionRunReturnedToThePoolWhenServerErrorDuringRes StatementResult result = session.run( "UNWIND range(10, 0, -1) AS i CREATE (n {index: 10/i}) RETURN n" ); Connection connection1 = connectionPool.lastAcquiredConnectionSpy; - verify( connection1, never() ).release(); + verify( connection1, never() ).releaseInBackground(); try { @@ -200,7 +200,7 @@ public void connectionUsedForSessionRunReturnedToThePoolWhenServerErrorDuringRes Connection connection2 = connectionPool.lastAcquiredConnectionSpy; assertSame( connection1, connection2 ); - verify( connection1 ).release(); + verify( connection1 ).releaseInBackground(); } @Test @@ -217,7 +217,7 @@ public void activeConnectionFromSessionRunCanBeReusedForNextSessionRun() assertEquals( 2, result2.list().size() ); verify( connection1 ).tryMarkInUse(); - verify( connection1, times( 2 ) ).release(); + verify( connection1, times( 2 ) ).releaseInBackground(); } @Test @@ -233,7 +233,7 @@ public void activeConnectionFromSessionRunCanBeReusedForNewTransaction() assertEquals( 3, result1.list().size() ); verify( connection1 ).tryMarkInUse(); - verify( connection1 ).release(); + verify( connection1 ).releaseInBackground(); } @Test @@ -244,7 +244,7 @@ public void connectionUsedForTransactionReturnedToThePoolWhenTransactionCommitte Transaction tx = session.beginTransaction(); Connection connection1 = connectionPool.lastAcquiredConnectionSpy; - verify( connection1, never() ).release(); + verify( connection1, never() ).releaseInBackground(); StatementResult result = createNodes( 5, tx ); tx.success(); @@ -252,7 +252,7 @@ public void connectionUsedForTransactionReturnedToThePoolWhenTransactionCommitte Connection connection2 = connectionPool.lastAcquiredConnectionSpy; assertSame( connection1, connection2 ); - verify( connection1 ).release(); + verify( connection1 ).releaseInBackground(); assertEquals( 5, result.list().size() ); } @@ -265,7 +265,7 @@ public void connectionUsedForTransactionReturnedToThePoolWhenTransactionRolledBa Transaction tx = session.beginTransaction(); Connection connection1 = connectionPool.lastAcquiredConnectionSpy; - verify( connection1, never() ).release(); + verify( connection1, never() ).releaseInBackground(); StatementResult result = createNodes( 8, tx ); tx.failure(); @@ -273,7 +273,7 @@ public void connectionUsedForTransactionReturnedToThePoolWhenTransactionRolledBa Connection connection2 = connectionPool.lastAcquiredConnectionSpy; assertSame( connection1, connection2 ); - verify( connection1 ).release(); + verify( connection1 ).releaseInBackground(); assertEquals( 8, result.list().size() ); } @@ -287,12 +287,12 @@ public void connectionUsedForTransactionReturnedToThePoolWhenTransactionFailsToC } Connection connection1 = connectionPool.lastAcquiredConnectionSpy; - verify( connection1 ).release(); // connection used for constraint creation + verify( connection1 ).releaseInBackground(); // connection used for constraint creation Session session = driver.session(); Transaction tx = session.beginTransaction(); Connection connection2 = connectionPool.lastAcquiredConnectionSpy; - verify( connection2, never() ).release(); + verify( connection2, never() ).releaseInBackground(); // property existence constraints are verified on commit, try to violate it tx.run( "CREATE (:Book)" ); @@ -309,7 +309,7 @@ public void connectionUsedForTransactionReturnedToThePoolWhenTransactionFailsToC } // connection should have been released after failed node creation - verify( connection2 ).release(); + verify( connection2 ).releaseInBackground(); } private StatementResult createNodesInNewSession( int nodesToCreate ) From d5ba93a3e49becce92e35385dd28b1f9ae29a614 Mon Sep 17 00:00:00 2001 From: lutovich Date: Thu, 12 Oct 2017 18:11:13 +0200 Subject: [PATCH 2/4] Fixed Session#reset() It got broken after moving all blocking APIs to use async ones underneath. This commit makes session send RESET message on the active connection and move existing transaction to TERMINATED state. Transactions in this state will disallow running new queries and committing. All existing tests for `Session#reset()` are now un-ignored and couple new unit tests are added. --- .../driver/internal/ExplicitTransaction.java | 59 +++-- .../neo4j/driver/internal/NetworkSession.java | 97 +++++---- .../TransactionPullAllResponseHandler.java | 2 +- .../neo4j/driver/internal/util/ErrorUtil.java | 1 + .../internal/ExplicitTransactionTest.java | 10 +- .../SessionPullAllResponseHandlerTest.java | 70 ++++++ ...TransactionPullAllResponseHandlerTest.java | 49 +++++ .../driver/v1/integration/SessionIT.java | 54 +++-- .../v1/integration/TransactionAsyncIT.java | 202 +++++++++++++++++- .../driver/v1/integration/TransactionIT.java | 10 +- 10 files changed, 441 insertions(+), 113 deletions(-) create mode 100644 driver/src/test/java/org/neo4j/driver/internal/handlers/SessionPullAllResponseHandlerTest.java create mode 100644 driver/src/test/java/org/neo4j/driver/internal/handlers/TransactionPullAllResponseHandlerTest.java diff --git a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java index 25ddd09bbb..6b2f9c7fda 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java @@ -31,6 +31,7 @@ import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.types.InternalTypeSystem; import org.neo4j.driver.v1.Record; +import org.neo4j.driver.v1.Session; import org.neo4j.driver.v1.Statement; import org.neo4j.driver.v1.StatementResult; import org.neo4j.driver.v1.StatementResultCursor; @@ -42,7 +43,6 @@ import static java.util.Collections.emptyMap; import static java.util.concurrent.CompletableFuture.completedFuture; -import static org.neo4j.driver.internal.util.ErrorUtil.isRecoverable; import static org.neo4j.driver.internal.util.Futures.failedFuture; import static org.neo4j.driver.internal.util.Futures.getBlocking; import static org.neo4j.driver.v1.Values.value; @@ -65,10 +65,9 @@ private enum State MARKED_FAILED, /** - * An error has occurred, transaction can no longer be used and no more messages will be sent for this - * transaction. + * This transaction has been explicitly terminated by calling {@link Session#reset()}. */ - FAILED, + TERMINATED, /** This transaction has successfully committed */ COMMITTED, @@ -135,17 +134,10 @@ CompletionStage closeAsync() { return commitAsync(); } - else if ( state == State.MARKED_FAILED || state == State.ACTIVE ) + else if ( state == State.ACTIVE || state == State.MARKED_FAILED || state == State.TERMINATED ) { return rollbackAsync(); } - else if ( state == State.FAILED ) - { - // unrecoverable error happened, transaction should've been rolled back on the server - // update state so that this transaction does not remain open - state = State.ROLLED_BACK; - return completedFuture( null ); - } else { return completedFuture( null ); @@ -161,7 +153,12 @@ public CompletionStage commitAsync() } else if ( state == State.ROLLED_BACK ) { - return failedFuture( new ClientException( "Can't commit, transaction has already been rolled back" ) ); + return failedFuture( new ClientException( "Can't commit, transaction has been rolled back" ) ); + } + else if ( state == State.TERMINATED ) + { + return failedFuture( + new ClientException( "Can't commit, transaction has been terminated by `Session#reset()`" ) ); } else { @@ -174,12 +171,18 @@ public CompletionStage rollbackAsync() { if ( state == State.COMMITTED ) { - return failedFuture( new ClientException( "Can't rollback, transaction has already been committed" ) ); + return failedFuture( new ClientException( "Can't rollback, transaction has been committed" ) ); } else if ( state == State.ROLLED_BACK ) { return completedFuture( null ); } + else if ( state == State.TERMINATED ) + { + // transaction has been terminated by RESET and should be rolled back by the database + state = State.ROLLED_BACK; + return completedFuture( null ); + } else { return doRollbackAsync().whenComplete( transactionClosed( State.ROLLED_BACK ) ); @@ -190,7 +193,6 @@ private BiConsumer transactionClosed( State newState ) { return ( ignore, error ) -> { - // todo: test that this state transition always happens when commit or rollback state = newState; connection.releaseInBackground(); session.setBookmark( bookmark ); @@ -280,18 +282,18 @@ public StatementResult run( Statement statement ) public CompletionStage runAsync( Statement statement ) { ensureCanRunQueries(); + //noinspection unchecked return (CompletionStage) QueryRunner.runAsAsync( connection, statement, this ); } @Override public boolean isOpen() { - return state != State.COMMITTED && state != State.ROLLED_BACK; + return state != State.COMMITTED && state != State.ROLLED_BACK && state != State.TERMINATED; } private void ensureCanRunQueries() { - // todo: test these two new branches if ( state == State.COMMITTED ) { throw new ClientException( "Cannot run more statements in this transaction, it has been committed" ); @@ -300,7 +302,7 @@ else if ( state == State.ROLLED_BACK ) { throw new ClientException( "Cannot run more statements in this transaction, it has been rolled back" ); } - else if ( state == State.FAILED || state == State.MARKED_FAILED ) + else if ( state == State.MARKED_FAILED ) { throw new ClientException( "Cannot run more statements in this transaction, because previous statements in the " + @@ -308,6 +310,11 @@ else if ( state == State.FAILED || state == State.MARKED_FAILED ) "transaction to run another statement." ); } + else if ( state == State.TERMINATED ) + { + throw new ClientException( + "Cannot run more statements in this transaction, it has been terminated by `Session#reset()`" ); + } } @Override @@ -316,21 +323,9 @@ public TypeSystem typeSystem() return InternalTypeSystem.TYPE_SYSTEM; } - public void resultFailed( Throwable error ) - { - if ( isRecoverable( error ) ) - { - failure(); - } - else - { - markToClose(); - } - } - - public void markToClose() + public void markTerminated() { - state = State.FAILED; + state = State.TERMINATED; } public Bookmark bookmark() diff --git a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java index 2f17f3ba7e..33c32414b2 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java @@ -160,7 +160,7 @@ public void close() getBlocking( lastResultStage .exceptionally( error -> null ) .thenCompose( this::ensureBuffered ) - .thenCompose( error -> forceReleaseResources().thenApply( ignore -> + .thenCompose( error -> releaseResources().thenApply( ignore -> { if ( error != null ) { @@ -177,7 +177,7 @@ public CompletionStage closeAsync() // todo: wait for buffered result? if ( open.compareAndSet( true, false ) ) { - return forceReleaseResources(); + return releaseResources(); } return completedFuture( null ); } @@ -254,7 +254,19 @@ public String lastBookmark() @Override public void reset() { - getBlocking( forceReleaseResources() ); + getBlocking( resetAsync() ); + } + + private CompletionStage resetAsync() + { + return releaseConnectionNow().thenCompose( ignore -> existingTransactionOrNull() ) + .thenAccept( tx -> + { + if ( tx != null ) + { + tx.markTerminated(); + } + } ); } @Override @@ -465,42 +477,38 @@ private CompletionStage acquireConnection( AccessMode mode ) return connectionStage; } - private CompletionStage forceReleaseResources() + private CompletionStage releaseResources() { - return rollbackTransaction().thenCompose( ignore -> forceReleaseConnection() ); + return rollbackTransaction().thenCompose( ignore -> releaseConnectionNow() ); } private CompletionStage rollbackTransaction() { - return transactionStage - .exceptionally( error -> null ) // handle previous acquisition failures - .thenCompose( tx -> - { - if ( tx != null && tx.isOpen() ) - { - return tx.rollbackAsync(); - } - return completedFuture( null ); - } ); + return existingTransactionOrNull().thenCompose( tx -> + { + if ( tx != null ) + { + return tx.rollbackAsync(); + } + return completedFuture( null ); + } ).exceptionally( error -> + { + Throwable cause = Futures.completionErrorCause( error ); + logger.error( "Failed to rollback active transaction", cause ); + return null; + } ); } - private CompletionStage forceReleaseConnection() + private CompletionStage releaseConnectionNow() { - return connectionStage - .exceptionally( error -> null ) // handle previous acquisition failures - .thenCompose( connection -> - { - if ( connection != null ) - { - return connection.releaseNow(); - } - return completedFuture( null ); - } ).exceptionally( error -> - { - // todo: this log message looks wrong, should it go to #rollbackTransaction() ? - logger.error( "Failed to rollback active transaction", error ); - return null; - } ); + return existingConnectionOrNull().thenCompose( connection -> + { + if ( connection != null ) + { + return connection.releaseNow(); + } + return completedFuture( null ); + } ); } private CompletionStage ensureNoOpenTxBeforeRunningQuery() @@ -517,14 +525,25 @@ private CompletionStage ensureNoOpenTxBeforeStartingTx() private CompletionStage ensureNoOpenTx( String errorMessage ) { - return transactionStage.exceptionally( error -> null ) - .thenAccept( tx -> - { - if ( tx != null && tx.isOpen() ) - { - throw new ClientException( errorMessage ); - } - } ); + return existingTransactionOrNull().thenAccept( tx -> + { + if ( tx != null ) + { + throw new ClientException( errorMessage ); + } + } ); + } + + private CompletionStage existingTransactionOrNull() + { + return transactionStage + .exceptionally( error -> null ) // handle previous acquisition failures + .thenApply( tx -> tx != null && tx.isOpen() ? tx : null ); + } + + private CompletionStage existingConnectionOrNull() + { + return connectionStage.exceptionally( error -> null ); // handle previous acquisition failures } private void ensureSessionIsOpen() diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/TransactionPullAllResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/TransactionPullAllResponseHandler.java index fb07783b99..e3ad9e6b92 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/TransactionPullAllResponseHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/TransactionPullAllResponseHandler.java @@ -43,6 +43,6 @@ protected void afterSuccess() @Override protected void afterFailure( Throwable error ) { - tx.resultFailed( error ); + tx.failure(); } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/util/ErrorUtil.java b/driver/src/main/java/org/neo4j/driver/internal/util/ErrorUtil.java index c19c92525a..bd17864cf0 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/util/ErrorUtil.java +++ b/driver/src/main/java/org/neo4j/driver/internal/util/ErrorUtil.java @@ -51,6 +51,7 @@ public static Neo4jException newNeo4jError( String code, String message ) } } + // todo: use this method and close channel after unrecoverable error public static boolean isRecoverable( Throwable error ) { if ( error instanceof Neo4jException ) diff --git a/driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java b/driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java index 8a8c48d0d6..3779b0d28d 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java @@ -144,13 +144,13 @@ public void shouldBeOpenWhenMarkedForFailure() } @Test - public void shouldBeOpenWhenMarkedToClose() + public void shouldBeClosedWhenMarkedAsTerminated() { ExplicitTransaction tx = beginTx( connectionMock() ); - tx.markToClose(); + tx.markTerminated(); - assertTrue( tx.isOpen() ); + assertFalse( tx.isOpen() ); } @Test @@ -176,11 +176,11 @@ public void shouldBeClosedAfterRollback() } @Test - public void shouldBeClosedWhenMarkedToCloseAndClosed() + public void shouldBeClosedWhenMarkedTerminatedAndClosed() { ExplicitTransaction tx = beginTx( connectionMock() ); - tx.markToClose(); + tx.markTerminated(); tx.close(); assertFalse( tx.isOpen() ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/handlers/SessionPullAllResponseHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/handlers/SessionPullAllResponseHandlerTest.java new file mode 100644 index 0000000000..a57f3984ff --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/handlers/SessionPullAllResponseHandlerTest.java @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.handlers; + +import org.junit.Test; + +import java.util.concurrent.CompletableFuture; + +import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.internal.util.ServerVersion; +import org.neo4j.driver.v1.Statement; + +import static java.util.Collections.emptyMap; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class SessionPullAllResponseHandlerTest +{ + @Test + public void shouldReleaseConnectionOnSuccess() + { + Connection connection = newConnectionMock(); + SessionPullAllResponseHandler handler = newHandler( connection ); + + handler.onSuccess( emptyMap() ); + + verify( connection ).releaseInBackground(); + } + + @Test + public void shouldReleaseConnectionOnFailure() + { + Connection connection = newConnectionMock(); + SessionPullAllResponseHandler handler = newHandler( connection ); + + handler.onFailure( new RuntimeException() ); + + verify( connection ).releaseInBackground(); + } + + private SessionPullAllResponseHandler newHandler( Connection connection ) + { + return new SessionPullAllResponseHandler( new Statement( "RETURN 1" ), + new RunResponseHandler( new CompletableFuture<>() ), connection ); + } + + private static Connection newConnectionMock() + { + Connection connection = mock( Connection.class ); + when( connection.serverVersion() ).thenReturn( ServerVersion.v3_2_0 ); + return connection; + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/handlers/TransactionPullAllResponseHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/handlers/TransactionPullAllResponseHandlerTest.java new file mode 100644 index 0000000000..057801d85c --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/handlers/TransactionPullAllResponseHandlerTest.java @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.handlers; + +import org.junit.Test; + +import java.util.concurrent.CompletableFuture; + +import org.neo4j.driver.internal.ExplicitTransaction; +import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.internal.util.ServerVersion; +import org.neo4j.driver.v1.Statement; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TransactionPullAllResponseHandlerTest +{ + @Test + public void shouldMarkTransactionAsFailedOnFailure() + { + Connection connection = mock( Connection.class ); + when( connection.serverVersion() ).thenReturn( ServerVersion.v3_2_0 ); + ExplicitTransaction tx = mock( ExplicitTransaction.class ); + TransactionPullAllResponseHandler handler = new TransactionPullAllResponseHandler( new Statement( "RETURN 1" ), + new RunResponseHandler( new CompletableFuture<>() ), connection, tx ); + + handler.onFailure( new RuntimeException() ); + + verify( tx ).failure(); + } +} diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java index 211ee16621..2b17d2417e 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java @@ -20,10 +20,11 @@ import org.hamcrest.MatcherAssert; import org.junit.After; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import org.junit.rules.RuleChain; +import org.junit.rules.Timeout; import java.util.HashSet; import java.util.List; @@ -90,14 +91,13 @@ import static org.neo4j.driver.v1.util.DaemonThreadFactory.daemon; import static org.neo4j.driver.v1.util.Neo4jRunner.DEFAULT_AUTH_TOKEN; -// todo: unignore and fix all `Session#reset()` tests! public class SessionIT { - @Rule - public TestNeo4j neo4j = new TestNeo4j(); + private final TestNeo4j neo4j = new TestNeo4j(); + private final ExpectedException exception = ExpectedException.none(); @Rule - public ExpectedException exception = ExpectedException.none(); + public final RuleChain ruleChain = RuleChain.outerRule( neo4j ).around( exception ).around( Timeout.seconds( 60 ) ); private Driver driver; private ExecutorService executor; @@ -237,8 +237,7 @@ public void shouldKillLongStreamingResult() throws Throwable @SuppressWarnings( "deprecation" ) @Test - @Ignore - public void shouldNotAllowBeginTxIfResetFailureIsNotConsumed() throws Throwable + public void shouldAllowBeginTxIfResetFailureIsNotConsumed() throws Throwable { // Given neo4j.ensureProcedures( "longRunningStatement.jar" ); @@ -246,26 +245,29 @@ public void shouldNotAllowBeginTxIfResetFailureIsNotConsumed() throws Throwable try ( Session session = driver.session() ) { - Transaction tx = session.beginTransaction(); + Transaction tx1 = session.beginTransaction(); - tx.run( "CALL test.driver.longRunningStatement({seconds})", + tx1.run( "CALL test.driver.longRunningStatement({seconds})", parameters( "seconds", 10 ) ); Thread.sleep( 1000 ); session.reset(); - exception.expect( ClientException.class ); - exception.expectMessage( startsWith( - "An error has occurred due to the cancellation of executing a previous statement." ) ); + // When + Transaction tx2 = session.beginTransaction(); + + // Then + assertThat( tx2, notNullValue() ); - // When & Then - tx = session.beginTransaction(); - assertThat( tx, notNullValue() ); + exception.expect( ClientException.class ); // errors differ depending of neo4j version + exception.expectMessage( + "Cannot run more statements in this transaction, it has been terminated by `Session#reset()`" ); + + tx1.run( "RETURN 1" ); } } @SuppressWarnings( "deprecation" ) @Test - @Ignore public void shouldThrowExceptionOnCloseIfResetFailureIsNotConsumed() throws Throwable { // Given @@ -278,9 +280,8 @@ public void shouldThrowExceptionOnCloseIfResetFailureIsNotConsumed() throws Thro Thread.sleep( 1000 ); session.reset(); - exception.expect( ClientException.class ); - exception.expectMessage( startsWith( - "An error has occurred due to the cancellation of executing a previous statement." ) ); + exception.expect( Neo4jException.class ); // errors differ depending of neo4j version + exception.expectMessage( containsString( "The transaction has been terminated" ) ); // When & Then session.close(); @@ -288,7 +289,6 @@ public void shouldThrowExceptionOnCloseIfResetFailureIsNotConsumed() throws Thro @SuppressWarnings( "deprecation" ) @Test - @Ignore public void shouldBeAbleToBeginTxAfterResetFailureIsConsumed() throws Throwable { // Given @@ -357,7 +357,6 @@ public void run() @SuppressWarnings( "deprecation" ) @Test - @Ignore public void shouldAllowMoreStatementAfterSessionReset() { // Given @@ -377,7 +376,6 @@ public void shouldAllowMoreStatementAfterSessionReset() @SuppressWarnings( "deprecation" ) @Test - @Ignore public void shouldAllowMoreTxAfterSessionReset() { // Given @@ -945,7 +943,7 @@ public Integer execute( Transaction tx ) } } - @Test( timeout = 20_000 ) + @Test public void resetShouldStopQueryWaitingForALock() throws Exception { assumeServerIs31OrLater(); @@ -966,8 +964,7 @@ void performUpdate( Driver driver, int nodeId, int newNodeId, } ); } - @Test( timeout = 20_000 ) - @Ignore + @Test public void resetShouldStopTransactionWaitingForALock() throws Exception { assumeServerIs31OrLater(); @@ -989,8 +986,7 @@ public void performUpdate( Driver driver, int nodeId, int newNodeId, } ); } - @Test( timeout = 20_000 ) - @Ignore + @Test public void resetShouldStopWriteTransactionWaitingForALock() throws Exception { assumeServerIs31OrLater(); @@ -1025,7 +1021,7 @@ public Void execute( Transaction tx ) assertEquals( 1, invocationsOfWork.get() ); } - @Test( timeout = 20_000 ) + @Test public void transactionRunShouldFailOnDeadlocks() throws Exception { final int nodeId1 = 42; @@ -1101,7 +1097,7 @@ public Void call() throws Exception } } - @Test( timeout = 20_000 ) + @Test public void writeTransactionFunctionShouldRetryDeadlocks() throws Exception { final int nodeId1 = 42; diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java index 8e7ae98559..90c936f41b 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java @@ -35,6 +35,7 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.atomic.AtomicInteger; +import org.neo4j.driver.internal.ExplicitTransaction; import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Session; import org.neo4j.driver.v1.Statement; @@ -65,6 +66,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.junit.Assume.assumeTrue; +import static org.neo4j.driver.internal.util.Futures.getBlocking; import static org.neo4j.driver.internal.util.Iterables.single; import static org.neo4j.driver.internal.util.Matchers.containsResultAvailableAfterAndResultConsumedAfter; import static org.neo4j.driver.internal.util.Matchers.syntaxError; @@ -417,7 +419,7 @@ public void shouldFailToCommitWhenRolledBack() catch ( Exception e ) { assertThat( e, instanceOf( ClientException.class ) ); - assertThat( e.getMessage(), containsString( "transaction has already been rolled back" ) ); + assertThat( e.getMessage(), containsString( "transaction has been rolled back" ) ); } } @@ -437,7 +439,7 @@ public void shouldFailToRollbackWhenCommitted() catch ( Exception e ) { assertThat( e, instanceOf( ClientException.class ) ); - assertThat( e.getMessage(), containsString( "transaction has already been committed" ) ); + assertThat( e.getMessage(), containsString( "transaction has been committed" ) ); } } @@ -755,6 +757,202 @@ public void shouldConsumeNonEmptyCursor() testConsume( "RETURN 42" ); } + @Test + public void shouldDoNothingWhenCommittedSecondTime() + { + Transaction tx = getBlocking( session.beginTransactionAsync() ); + + assertNull( getBlocking( tx.commitAsync() ) ); + + assertTrue( tx.commitAsync().toCompletableFuture().isDone() ); + assertFalse( tx.isOpen() ); + } + + @Test + public void shouldFailToCommitAfterRollback() + { + Transaction tx = getBlocking( session.beginTransactionAsync() ); + + assertNull( getBlocking( tx.rollbackAsync() ) ); + + try + { + getBlocking( tx.commitAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertEquals( "Can't commit, transaction has been rolled back", e.getMessage() ); + } + assertFalse( tx.isOpen() ); + } + + @Test + public void shouldFailToCommitAfterTermination() + { + Transaction tx = getBlocking( session.beginTransactionAsync() ); + + ((ExplicitTransaction) tx).markTerminated(); + + try + { + getBlocking( tx.commitAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertEquals( "Can't commit, transaction has been terminated by `Session#reset()`", e.getMessage() ); + } + assertFalse( tx.isOpen() ); + } + + @Test + public void shouldDoNothingWhenRolledBackSecondTime() + { + Transaction tx = getBlocking( session.beginTransactionAsync() ); + + assertNull( getBlocking( tx.rollbackAsync() ) ); + + assertTrue( tx.rollbackAsync().toCompletableFuture().isDone() ); + assertFalse( tx.isOpen() ); + } + + @Test + public void shouldFailToRollbackAfterCommit() + { + Transaction tx = getBlocking( session.beginTransactionAsync() ); + + assertNull( getBlocking( tx.commitAsync() ) ); + + try + { + getBlocking( tx.rollbackAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertEquals( "Can't rollback, transaction has been committed", e.getMessage() ); + } + assertFalse( tx.isOpen() ); + } + + @Test + public void shouldRollbackAfterTermination() + { + Transaction tx = getBlocking( session.beginTransactionAsync() ); + + ((ExplicitTransaction) tx).markTerminated(); + + assertNull( getBlocking( tx.rollbackAsync() ) ); + assertFalse( tx.isOpen() ); + } + + @Test + public void shouldFailToRunQueryAfterCommit() + { + Transaction tx = getBlocking( session.beginTransactionAsync() ); + tx.runAsync( "CREATE (:MyLabel)" ); + assertNull( getBlocking( tx.commitAsync() ) ); + + assertEquals( 1, session.run( "MATCH (n:MyLabel) RETURN count(n)" ).single().get( 0 ).asInt() ); + + try + { + getBlocking( tx.runAsync( "CREATE (:MyOtherLabel)" ) ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertEquals( "Cannot run more statements in this transaction, it has been committed", e.getMessage() ); + } + } + + @Test + public void shouldFailToRunQueryAfterRollback() + { + Transaction tx = getBlocking( session.beginTransactionAsync() ); + tx.runAsync( "CREATE (:MyLabel)" ); + assertNull( getBlocking( tx.rollbackAsync() ) ); + + assertEquals( 0, session.run( "MATCH (n:MyLabel) RETURN count(n)" ).single().get( 0 ).asInt() ); + + try + { + getBlocking( tx.runAsync( "CREATE (:MyOtherLabel)" ) ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertEquals( "Cannot run more statements in this transaction, it has been rolled back", e.getMessage() ); + } + } + + @Test + public void shouldFailToRunQueryWhenMarkedForFailure() + { + Transaction tx = getBlocking( session.beginTransactionAsync() ); + tx.runAsync( "CREATE (:MyLabel)" ); + tx.failure(); + + try + { + getBlocking( tx.runAsync( "CREATE (:MyOtherLabel)" ) ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.getMessage(), startsWith( "Cannot run more statements in this transaction" ) ); + } + } + + @Test + public void shouldFailToRunQueryWhenTerminated() + { + Transaction tx = getBlocking( session.beginTransactionAsync() ); + tx.runAsync( "CREATE (:MyLabel)" ); + ((ExplicitTransaction) tx).markTerminated(); + + try + { + getBlocking( tx.runAsync( "CREATE (:MyOtherLabel)" ) ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertEquals( "Cannot run more statements in this transaction, it has been terminated by `Session#reset()`", + e.getMessage() ); + } + } + + @Test + public void shouldAllowQueriesWhenMarkedForSuccess() + { + Transaction tx = getBlocking( session.beginTransactionAsync() ); + tx.runAsync( "CREATE (:MyLabel)" ); + + tx.success(); + + tx.runAsync( "CREATE (:MyLabel)" ); + assertNull( getBlocking( tx.commitAsync() ) ); + + assertEquals( 2, session.run( "MATCH (n:MyLabel) RETURN count(n)" ).single().get( 0 ).asInt() ); + } + + @Test + public void shouldUpdateSessionBookmarkAfterCommit() + { + String bookmarkBefore = session.lastBookmark(); + + getBlocking( session.beginTransactionAsync() + .thenCompose( tx -> tx.runAsync( "CREATE (:MyNode)" ) + .thenCompose( ignore -> tx.commitAsync() ) ) ); + + String bookmarkAfter = session.lastBookmark(); + + assertNotNull( bookmarkAfter ); + assertNotEquals( bookmarkBefore, bookmarkAfter ); + } + private int countNodes( Object id ) { StatementResult result = session.run( "MATCH (n:Node {id: $id}) RETURN count(n)", parameters( "id", id ) ); diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionIT.java index 277fb98b20..f058485b3a 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionIT.java @@ -48,6 +48,8 @@ public class TransactionIT @Rule public TestNeo4jSession session = new TestNeo4jSession(); + private Transaction globalTx; + @Test public void shouldRunAndCommit() throws Throwable { @@ -242,15 +244,13 @@ public void shouldHandleResetBeforeRun() throws Throwable { // Expect exception.expect( ClientException.class ); - exception.expectMessage( "Cannot run more statements in this transaction, it has been rolled back" ); + exception.expectMessage( "Cannot run more statements in this transaction, it has been terminated by" ); // When Transaction tx = session.beginTransaction(); session.reset(); tx.run( "CREATE (n:FirstNode)" ); } - private Transaction globalTx = null; - @SuppressWarnings( "deprecation" ) @Test public void shouldHandleResetFromMultipleThreads() throws Throwable @@ -271,7 +271,7 @@ public void run() } catch ( InterruptedException e ) { - new AssertionError( e ); + throw new AssertionError( e ); } globalTx = session.beginTransaction(); @@ -293,7 +293,7 @@ public void run() } catch ( InterruptedException e ) { - new AssertionError( e ); + throw new AssertionError( e ); } session.reset(); From cdb9febf642999f5ea05e9716a5bf337ce682066 Mon Sep 17 00:00:00 2001 From: lutovich Date: Thu, 12 Oct 2017 18:45:08 +0200 Subject: [PATCH 3/4] Removed unused packstream methods Main those that do flushing. Now it's handled by netty channels. --- .../internal/async/inbound/ByteBufInput.java | 6 -- .../outbound/ChunkAwareByteBufOutput.java | 6 -- .../internal/messaging/MessageFormat.java | 8 -- .../messaging/PackStreamMessageFormatV1.java | 23 ------ .../driver/internal/packstream/PackInput.java | 3 - .../internal/packstream/PackOutput.java | 4 - .../internal/packstream/PackStream.java | 10 --- .../async/inbound/ByteBufInputTest.java | 12 --- .../packstream/BufferedChannelInput.java | 6 -- ...dChannelOutput.java => ChannelOutput.java} | 63 ++++----------- .../internal/packstream/PackStreamTest.java | 80 +------------------ .../internal/util/MessageToByteBufWriter.java | 18 ++--- 12 files changed, 25 insertions(+), 214 deletions(-) rename driver/src/test/java/org/neo4j/driver/internal/packstream/{BufferedChannelOutput.java => ChannelOutput.java} (58%) diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ByteBufInput.java b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ByteBufInput.java index bd07f7d17c..9c53177b4d 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ByteBufInput.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ByteBufInput.java @@ -39,12 +39,6 @@ public void stop() buf = null; } - @Override - public boolean hasMoreData() - { - return buf.isReadable(); - } - @Override public byte readByte() { diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/outbound/ChunkAwareByteBufOutput.java b/driver/src/main/java/org/neo4j/driver/internal/async/outbound/ChunkAwareByteBufOutput.java index 29585d4423..10e24a4044 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/outbound/ChunkAwareByteBufOutput.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/outbound/ChunkAwareByteBufOutput.java @@ -60,12 +60,6 @@ public void stop() currentChunkSize = 0; } - @Override - public PackOutput flush() - { - throw new UnsupportedOperationException( "Flush not supported, this output only writes to a buffer" ); - } - @Override public PackOutput writeByte( byte value ) { diff --git a/driver/src/main/java/org/neo4j/driver/internal/messaging/MessageFormat.java b/driver/src/main/java/org/neo4j/driver/internal/messaging/MessageFormat.java index 04d2be6117..1ee761b1dc 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/messaging/MessageFormat.java +++ b/driver/src/main/java/org/neo4j/driver/internal/messaging/MessageFormat.java @@ -28,19 +28,11 @@ public interface MessageFormat interface Writer { Writer write( Message msg ) throws IOException; - - Writer flush() throws IOException; } interface Reader { - /** - * Return true is there is another message in the underlying buffer - */ - boolean hasNext() throws IOException; - void read( MessageHandler handler ) throws IOException; - } Writer newWriter( PackOutput output, boolean byteArraySupportEnabled ); diff --git a/driver/src/main/java/org/neo4j/driver/internal/messaging/PackStreamMessageFormatV1.java b/driver/src/main/java/org/neo4j/driver/internal/messaging/PackStreamMessageFormatV1.java index d1f35dab54..5baa35cf1c 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/messaging/PackStreamMessageFormatV1.java +++ b/driver/src/main/java/org/neo4j/driver/internal/messaging/PackStreamMessageFormatV1.java @@ -326,13 +326,6 @@ private void packValue( Value value ) throws IOException } } - @Override - public Writer flush() throws IOException - { - packer.flush(); - return this; - } - @Override public Writer write( Message msg ) throws IOException { @@ -376,12 +369,6 @@ public Reader( PackInput input ) unpacker = new PackStream.Unpacker( input ); } - @Override - public boolean hasNext() throws IOException - { - return unpacker.hasNext(); - } - /** * Parse a single message into the given consumer. */ @@ -660,14 +647,4 @@ private Map unpackMap() throws IOException return map; } } - - public static class NoOpRunnable implements Runnable - { - @Override - public void run() - { - // no-op - } - } - } diff --git a/driver/src/main/java/org/neo4j/driver/internal/packstream/PackInput.java b/driver/src/main/java/org/neo4j/driver/internal/packstream/PackInput.java index 0cff68f9b7..1bfdc4edd6 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/packstream/PackInput.java +++ b/driver/src/main/java/org/neo4j/driver/internal/packstream/PackInput.java @@ -26,9 +26,6 @@ */ public interface PackInput { - /** True if there is at least one more consumable byte */ - boolean hasMoreData() throws IOException; - /** Consume one byte */ byte readByte() throws IOException; diff --git a/driver/src/main/java/org/neo4j/driver/internal/packstream/PackOutput.java b/driver/src/main/java/org/neo4j/driver/internal/packstream/PackOutput.java index fe387a6770..c062565275 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/packstream/PackOutput.java +++ b/driver/src/main/java/org/neo4j/driver/internal/packstream/PackOutput.java @@ -25,10 +25,6 @@ */ public interface PackOutput { - // todo: remove flush method - /** If implementation has been buffering data, it should flush those buffers now. */ - PackOutput flush() throws IOException; - /** Produce a single byte */ PackOutput writeByte( byte value ) throws IOException; diff --git a/driver/src/main/java/org/neo4j/driver/internal/packstream/PackStream.java b/driver/src/main/java/org/neo4j/driver/internal/packstream/PackStream.java index e5d67dbed6..dc109aea22 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/packstream/PackStream.java +++ b/driver/src/main/java/org/neo4j/driver/internal/packstream/PackStream.java @@ -157,11 +157,6 @@ public Packer( PackOutput out ) this.out = out; } - public void flush() throws IOException - { - out.flush(); - } - public void packRaw( byte[] data ) throws IOException { out.writeBytes( data ); @@ -425,11 +420,6 @@ public Unpacker( PackInput in ) this.in = in; } - public boolean hasNext() throws IOException - { - return in.hasMoreData(); - } - public long unpackStructHeader() throws IOException { final byte markerByte = in.readByte(); diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/inbound/ByteBufInputTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/inbound/ByteBufInputTest.java index 6b57de58de..d29697ad03 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/inbound/ByteBufInputTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/inbound/ByteBufInputTest.java @@ -24,7 +24,6 @@ import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.anyInt; import static org.mockito.Mockito.mock; @@ -66,17 +65,6 @@ public void shouldThrowWhenStartedTwice() } } - @Test - public void shouldDelegateHasMoreData() - { - ByteBufInput input = new ByteBufInput(); - ByteBuf buf = mock( ByteBuf.class ); - when( buf.isReadable() ).thenReturn( true ); - input.start( buf ); - - assertTrue( input.hasMoreData() ); - } - @Test public void shouldDelegateReadByte() { diff --git a/driver/src/test/java/org/neo4j/driver/internal/packstream/BufferedChannelInput.java b/driver/src/test/java/org/neo4j/driver/internal/packstream/BufferedChannelInput.java index e01585236a..f9bb7938dd 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/packstream/BufferedChannelInput.java +++ b/driver/src/test/java/org/neo4j/driver/internal/packstream/BufferedChannelInput.java @@ -44,12 +44,6 @@ public BufferedChannelInput( int bufferCapacity, ReadableByteChannel ch ) this.channel = ch; } - @Override - public boolean hasMoreData() throws IOException - { - return attempt( 1 ); - } - @Override public byte readByte() throws IOException { diff --git a/driver/src/test/java/org/neo4j/driver/internal/packstream/BufferedChannelOutput.java b/driver/src/test/java/org/neo4j/driver/internal/packstream/ChannelOutput.java similarity index 58% rename from driver/src/test/java/org/neo4j/driver/internal/packstream/BufferedChannelOutput.java rename to driver/src/test/java/org/neo4j/driver/internal/packstream/ChannelOutput.java index 6180b62daa..770540b641 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/packstream/BufferedChannelOutput.java +++ b/driver/src/test/java/org/neo4j/driver/internal/packstream/ChannelOutput.java @@ -20,99 +20,68 @@ import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.ByteOrder; import java.nio.channels.WritableByteChannel; -public class BufferedChannelOutput implements PackOutput +public class ChannelOutput implements PackOutput { - private final ByteBuffer buffer; private final WritableByteChannel channel; - public BufferedChannelOutput( WritableByteChannel channel ) + public ChannelOutput( WritableByteChannel channel ) { - this( channel, 1024 ); - } - - public BufferedChannelOutput( WritableByteChannel channel, int bufferSize ) - { - this.buffer = ByteBuffer.allocate( bufferSize ).order( ByteOrder.BIG_ENDIAN ); this.channel = channel; } - @Override - public BufferedChannelOutput flush() throws IOException - { - buffer.flip(); - do { channel.write( buffer ); } while ( buffer.remaining() > 0 ); - buffer.clear(); - return this; - } - @Override public PackOutput writeBytes( byte[] data ) throws IOException { - int length = data.length; - int index = 0; - while ( index < length ) - { - if ( buffer.remaining() == 0 ) - { - flush(); - } - - int amountToWrite = Math.min( buffer.remaining(), length - index ); - - buffer.put( data, index, amountToWrite ); - index += amountToWrite; - } + channel.write( ByteBuffer.wrap( data ) ); return this; } @Override public PackOutput writeByte( byte value ) throws IOException { - ensure( 1 ); - buffer.put( value ); + channel.write( ByteBuffer.wrap( new byte[]{value} ) ); return this; } @Override public PackOutput writeShort( short value ) throws IOException { - ensure( 2 ); + ByteBuffer buffer = ByteBuffer.allocate( Short.BYTES ); buffer.putShort( value ); + buffer.flip(); + channel.write( buffer ); return this; } @Override public PackOutput writeInt( int value ) throws IOException { - ensure( 4 ); + ByteBuffer buffer = ByteBuffer.allocate( Integer.BYTES ); buffer.putInt( value ); + buffer.flip(); + channel.write( buffer ); return this; } @Override public PackOutput writeLong( long value ) throws IOException { - ensure( 8 ); + ByteBuffer buffer = ByteBuffer.allocate( Long.BYTES ); buffer.putLong( value ); + buffer.flip(); + channel.write( buffer ); return this; } @Override public PackOutput writeDouble( double value ) throws IOException { - ensure( 8 ); + ByteBuffer buffer = ByteBuffer.allocate( Double.BYTES ); buffer.putDouble( value ); + buffer.flip(); + channel.write( buffer ); return this; } - - private void ensure( int size ) throws IOException - { - if ( buffer.remaining() < size ) - { - flush(); - } - } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/packstream/PackStreamTest.java b/driver/src/test/java/org/neo4j/driver/internal/packstream/PackStreamTest.java index e5d55cd211..f7e67765ee 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/packstream/PackStreamTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/packstream/PackStreamTest.java @@ -38,14 +38,11 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Arrays.asList; -import static junit.framework.TestCase.assertFalse; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - public class PackStreamTest { @@ -83,14 +80,7 @@ public Machine() { this.output = new ByteArrayOutputStream(); this.writable = Channels.newChannel( this.output ); - this.packer = new PackStream.Packer( new BufferedChannelOutput( this.writable ) ); - } - - public Machine( int bufferSize ) - { - this.output = new ByteArrayOutputStream(); - this.writable = Channels.newChannel( this.output ); - this.packer = new PackStream.Packer( new BufferedChannelOutput( this.writable, bufferSize ) ); + this.packer = new PackStream.Packer( new ChannelOutput( this.writable ) ); } public void reset() @@ -123,7 +113,6 @@ public void testCanPackAndUnpackNull() throws Throwable // When machine.packer().packNull(); - machine.packer().flush(); // Then byte[] bytes = machine.output(); @@ -146,7 +135,6 @@ public void testCanPackAndUnpackTrue() throws Throwable // When machine.packer().pack( true ); - machine.packer().flush(); // Then byte[] bytes = machine.output(); @@ -170,7 +158,6 @@ public void testCanPackAndUnpackFalse() throws Throwable // When machine.packer().pack( false ); - machine.packer().flush(); // Then byte[] bytes = machine.output(); @@ -197,7 +184,6 @@ public void testCanPackAndUnpackTinyIntegers() throws Throwable // When machine.reset(); machine.packer().pack( i ); - machine.packer().flush(); // Then byte[] bytes = machine.output(); @@ -224,7 +210,6 @@ public void testCanPackAndUnpackShortIntegers() throws Throwable // When machine.reset(); machine.packer().pack( i ); - machine.packer().flush(); // Then byte[] bytes = machine.output(); @@ -255,7 +240,6 @@ public void testCanPackAndUnpackPowersOfTwoAsIntegers() throws Throwable // When machine.reset(); machine.packer().pack( n ); - machine.packer().flush(); // Then PackStream.Unpacker unpacker = newUnpacker( machine.output() ); @@ -282,7 +266,6 @@ public void testCanPackAndUnpackPowersOfTwoPlusABitAsDoubles() throws Throwable // When machine.reset(); machine.packer().pack( n ); - machine.packer().flush(); // Then PackStream.Unpacker unpacker = newUnpacker( machine.output() ); @@ -309,7 +292,6 @@ public void testCanPackAndUnpackPowersOfTwoMinusABitAsDoubles() throws Throwable // When machine.reset(); machine.packer().pack( n ); - machine.packer().flush(); // Then PackStream.Unpacker unpacker = newUnpacker( machine.output() ); @@ -327,7 +309,7 @@ public void testCanPackAndUnpackPowersOfTwoMinusABitAsDoubles() throws Throwable public void testCanPackAndUnpackByteArrays() throws Throwable { // Given - Machine machine = new Machine( 17000000 ); + Machine machine = new Machine(); for ( int i = 0; i < 24; i++ ) { @@ -336,7 +318,6 @@ public void testCanPackAndUnpackByteArrays() throws Throwable // When machine.reset(); machine.packer().pack( array ); - machine.packer().flush(); // Then PackStream.Unpacker unpacker = newUnpacker( machine.output() ); @@ -352,7 +333,7 @@ public void testCanPackAndUnpackByteArrays() throws Throwable public void testCanPackAndUnpackStrings() throws Throwable { // Given - Machine machine = new Machine( 17000000 ); + Machine machine = new Machine(); for ( int i = 0; i < 24; i++ ) { @@ -361,7 +342,6 @@ public void testCanPackAndUnpackStrings() throws Throwable // When machine.reset(); machine.packer().pack( string ); - machine.packer().flush(); // Then PackStream.Unpacker unpacker = newUnpacker( machine.output() ); @@ -383,7 +363,6 @@ public void testCanPackAndUnpackBytes() throws Throwable // When PackStream.Packer packer = machine.packer(); packer.pack( "ABCDEFGHIJ".getBytes() ); - packer.flush(); // Then PackStream.Unpacker unpacker = newUnpacker( machine.output() ); @@ -404,7 +383,6 @@ public void testCanPackAndUnpackChar() throws Throwable // When PackStream.Packer packer = machine.packer(); packer.pack( 'A' ); - packer.flush(); // Then PackStream.Unpacker unpacker = newUnpacker( machine.output() ); @@ -424,7 +402,6 @@ public void testCanPackAndUnpackString() throws Throwable // When PackStream.Packer packer = machine.packer(); packer.pack( "ABCDEFGHIJ" ); - packer.flush(); // Then PackStream.Unpacker unpacker = newUnpacker( machine.output() ); @@ -446,7 +423,6 @@ public void testCanPackAndUnpackSpecialString() throws Throwable // When PackStream.Packer packer = machine.packer(); packer.pack( code ); - packer.flush(); // Then PackStream.Unpacker unpacker = newUnpacker( machine.output() ); @@ -466,7 +442,6 @@ public void testCanPackAndUnpackStringFromBytes() throws Throwable // When PackStream.Packer packer = machine.packer(); packer.packString( "ABCDEFGHIJ".getBytes() ); - packer.flush(); // Then PackStream.Unpacker unpacker = newUnpacker( machine.output() ); @@ -493,7 +468,6 @@ public void testCanPackAndUnpackSpecialStringFromBytes() throws Throwable assertThat( new String( bytes, UTF_8 ), equalTo( code ) ); packer.packString( bytes ); - packer.flush(); // Then PackStream.Unpacker unpacker = newUnpacker( machine.output() ); @@ -516,7 +490,6 @@ public void testCanPackAndUnpackListOneItemAtATime() throws Throwable packer.pack( 12 ); packer.pack( 13 ); packer.pack( 14 ); - packer.flush(); // Then PackStream.Unpacker unpacker = newUnpacker( machine.output() ); @@ -539,7 +512,6 @@ public void testCanPackAndUnpackListOfString() throws Throwable // When PackStream.Packer packer = machine.packer(); packer.pack( asList( "eins", "zwei", "drei" ) ); - packer.flush(); // Then PackStream.Unpacker unpacker = newUnpacker( machine.output() ); @@ -571,13 +543,9 @@ public void testCanPackAndUnpackListOfStringOneByOne() throws Throwable // When PackStream.Packer packer = machine.packer(); packer.packListHeader( 3 ); - packer.flush(); packer.pack( "eins" ); - packer.flush(); packer.pack( "zwei" ); - packer.flush(); packer.pack( "drei" ); - packer.flush(); // Then PackStream.Unpacker unpacker = newUnpacker( machine.output() ); @@ -600,13 +568,9 @@ public void testCanPackAndUnpackListOfSpecialStringOneByOne() throws Throwable // When PackStream.Packer packer = machine.packer(); packer.packListHeader( 3 ); - packer.flush(); packer.pack( "Mjölnir" ); - packer.flush(); packer.pack( "Mjölnir" ); - packer.flush(); packer.pack( "Mjölnir" ); - packer.flush(); // Then PackStream.Unpacker unpacker = newUnpacker( machine.output() ); @@ -641,7 +605,6 @@ public void testCanPackAndUnpackStruct() throws Throwable packer.pack( 12 ); packer.pack( asList( "Person", "Employee" ) ); packer.pack( asMap( "name", "Alice", "age", 33 ) ); - packer.flush(); // Then PackStream.Unpacker unpacker = newUnpacker( machine.output() ); @@ -683,7 +646,6 @@ public void testCanDoStreamingListUnpacking() throws Throwable Machine machine = new Machine(); PackStream.Packer packer = machine.packer(); packer.pack( asList(1,2,3,asList(4,5)) ); - packer.flush(); // When I unpack this value PackStream.Unpacker unpacker = newUnpacker( machine.output() ); @@ -719,7 +681,6 @@ public void testCanDoStreamingStructUnpacking() throws Throwable packer.pack( 2 ); packer.pack( 3 ); packer.pack( asList( 4,5 ) ); - packer.flush(); // When I unpack this value PackStream.Unpacker unpacker = newUnpacker( machine.output() ); @@ -757,7 +718,6 @@ public void testCanDoStreamingMapUnpacking() throws Throwable packer.pack( "Bob" ); packer.pack( "cat_ages" ); packer.pack( asList( 4.3, true ) ); - packer.flush(); // When I unpack this value PackStream.Unpacker unpacker = newUnpacker( machine.output() ); @@ -782,35 +742,6 @@ public void testCanDoStreamingMapUnpacking() throws Throwable assertEquals( true, e ); } - @Test - public void testHasNext() throws Throwable - { - // Given - Machine machine = new Machine(); - PackStream.Packer packer = machine.packer(); - packer.pack( "name" ); - packer.pack( 1 ); - packer.flush(); - - // When I start unpacking - PackStream.Unpacker unpacker = newUnpacker( machine.output() ); - - // Then - assertTrue( unpacker.hasNext() ); - - // When I unpack the first string - unpacker.unpackString(); - - // Then - assertTrue( unpacker.hasNext() ); - - // When I unpack the integer - unpacker.unpackLong(); - - // Then - assertFalse( unpacker.hasNext() ); - } - @Test public void handlesDataCrossingBufferBoundaries() throws Throwable { @@ -819,7 +750,6 @@ public void handlesDataCrossingBufferBoundaries() throws Throwable PackStream.Packer packer = machine.packer(); packer.pack( Long.MAX_VALUE ); packer.pack( Long.MAX_VALUE ); - packer.flush(); ReadableByteChannel ch = Channels.newChannel( new ByteArrayInputStream( machine.output() ) ); PackStream.Unpacker unpacker = new PackStream.Unpacker( new BufferedChannelInput( 11, ch ) ); @@ -873,7 +803,6 @@ void assertPeekType( PackType type, Object value ) throws IOException Machine machine = new Machine(); PackStream.Packer packer = machine.packer(); packer.pack( value ); - packer.flush(); PackStream.Unpacker unpacker = newUnpacker( machine.output() ); @@ -894,7 +823,6 @@ private void assertPackStringLists( int size, String value ) throws Throwable strings.add( i, value ); } packer.pack( strings ); - packer.flush(); // Then PackStream.Unpacker unpacker = newUnpacker( machine.output() ); @@ -921,7 +849,6 @@ private void assertMap( int size ) throws Throwable map.put( Integer.toString( i ), i ); } packer.pack( map ); - packer.flush(); // Then PackStream.Unpacker unpacker = newUnpacker( machine.output() ); @@ -951,7 +878,6 @@ private void assertStruct( int size ) throws Throwable packer.pack( i ); } - packer.flush(); // Then PackStream.Unpacker unpacker = newUnpacker( machine.output() ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/util/MessageToByteBufWriter.java b/driver/src/test/java/org/neo4j/driver/internal/util/MessageToByteBufWriter.java index d88cab5caa..ceb34a2219 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/util/MessageToByteBufWriter.java +++ b/driver/src/test/java/org/neo4j/driver/internal/util/MessageToByteBufWriter.java @@ -58,48 +58,42 @@ private static class ByteBufOutput implements PackOutput } @Override - public PackOutput flush() throws IOException - { - return this; - } - - @Override - public PackOutput writeByte( byte value ) throws IOException + public PackOutput writeByte( byte value ) { buf.writeByte( value ); return this; } @Override - public PackOutput writeBytes( byte[] data ) throws IOException + public PackOutput writeBytes( byte[] data ) { buf.writeBytes( data ); return this; } @Override - public PackOutput writeShort( short value ) throws IOException + public PackOutput writeShort( short value ) { buf.writeShort( value ); return this; } @Override - public PackOutput writeInt( int value ) throws IOException + public PackOutput writeInt( int value ) { buf.writeInt( value ); return this; } @Override - public PackOutput writeLong( long value ) throws IOException + public PackOutput writeLong( long value ) { buf.writeLong( value ); return this; } @Override - public PackOutput writeDouble( double value ) throws IOException + public PackOutput writeDouble( double value ) { buf.writeDouble( value ); return this; From b18d7ca6ace33f8c424ca93ef2db8276e9efc798 Mon Sep 17 00:00:00 2001 From: lutovich Date: Thu, 12 Oct 2017 18:51:47 +0200 Subject: [PATCH 4/4] Couple more unit tests for ACK_FAILURE handling --- .../inbound/InboundMessageDispatcherTest.java | 55 +++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcherTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcherTest.java index 067f456a04..3d5abfd82c 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcherTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcherTest.java @@ -41,6 +41,8 @@ import static org.mockito.Matchers.eq; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyZeroInteractions; import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; @@ -130,6 +132,59 @@ public void shouldSendAckFailureOnFailure() verify( channel ).writeAndFlush( eq( ACK_FAILURE ), any() ); } + @Test + public void shouldNotSendAckFailureOnFailureWhenMuted() + { + Channel channel = mock( Channel.class ); + InboundMessageDispatcher dispatcher = newDispatcher( channel ); + dispatcher.muteAckFailure(); + + dispatcher.queue( mock( ResponseHandler.class ) ); + assertEquals( 1, dispatcher.queuedHandlersCount() ); + + dispatcher.handleFailureMessage( FAILURE_CODE, FAILURE_MESSAGE ); + + verify( channel, never() ).writeAndFlush( eq( ACK_FAILURE ), any() ); + } + + @Test + public void shouldFailToUnMuteAckFailureWhenNotMuted() + { + InboundMessageDispatcher dispatcher = newDispatcher( mock( Channel.class ) ); + + try + { + dispatcher.unMuteAckFailure(); + fail( "Exception expected" ); + } + catch ( IllegalStateException e ) + { + assertEquals( "Can't un-mute ACK_FAILURE because it's not muted", e.getMessage() ); + } + } + + @Test + public void shouldSendAckFailureAfterUnMute() + { + Channel channel = mock( Channel.class ); + InboundMessageDispatcher dispatcher = newDispatcher( channel ); + dispatcher.muteAckFailure(); + + dispatcher.queue( mock( ResponseHandler.class ) ); + assertEquals( 1, dispatcher.queuedHandlersCount() ); + + dispatcher.handleFailureMessage( FAILURE_CODE, FAILURE_MESSAGE ); + verify( channel, never() ).writeAndFlush( eq( ACK_FAILURE ), any() ); + + dispatcher.unMuteAckFailure(); + + dispatcher.queue( mock( ResponseHandler.class ) ); + assertEquals( 1, dispatcher.queuedHandlersCount() ); + + dispatcher.handleFailureMessage( FAILURE_CODE, FAILURE_MESSAGE ); + verify( channel, times( 1 ) ).writeAndFlush( eq( ACK_FAILURE ), any() ); + } + @Test public void shouldClearFailureOnAckFailureSuccess() {