diff --git a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java index d851cf226b..00740f0177 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java @@ -21,7 +21,6 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; -import java.util.function.BiConsumer; import java.util.function.BiFunction; import org.neo4j.driver.internal.async.QueryRunner; @@ -60,31 +59,25 @@ public class ExplicitTransaction implements Transaction private enum State { /** The transaction is running with no explicit success or failure marked */ - ACTIVE( true ), + ACTIVE, /** Running, user marked for success, meaning it'll value committed */ - MARKED_SUCCESS( true ), + MARKED_SUCCESS, /** User marked as failed, meaning it'll be rolled back. */ - MARKED_FAILED( true ), + MARKED_FAILED, /** - * This transaction has been explicitly terminated by calling {@link Session#reset()}. + * This transaction has been terminated either because of explicit {@link Session#reset()} or because of a + * fatal connection error. */ - TERMINATED( false ), + TERMINATED, /** This transaction has successfully committed */ - COMMITTED( false ), + COMMITTED, /** This transaction has been rolled back */ - ROLLED_BACK( false ); - - final boolean txOpen; - - State( boolean txOpen ) - { - this.txOpen = txOpen; - } + ROLLED_BACK } private final Connection connection; @@ -158,7 +151,7 @@ CompletionStage closeAsync() { return commitAsync(); } - else if ( state == State.ACTIVE || state == State.MARKED_FAILED || state == State.TERMINATED ) + else if ( state != State.COMMITTED && state != State.ROLLED_BACK ) { return rollbackAsync(); } @@ -181,14 +174,14 @@ else if ( state == State.ROLLED_BACK ) } else if ( state == State.TERMINATED ) { - return failedFuture( - new ClientException( "Can't commit, transaction has been terminated by `Session#reset()`" ) ); + transactionClosed( State.ROLLED_BACK ); + return failedFuture( new ClientException( "Can't commit, transaction has been terminated" ) ); } else { return resultCursors.retrieveNotConsumedError() .thenCompose( error -> doCommitAsync().handle( handleCommitOrRollback( error ) ) ) - .whenComplete( transactionClosed( State.COMMITTED ) ); + .whenComplete( ( ignore, error ) -> transactionClosed( State.COMMITTED ) ); } } @@ -205,15 +198,15 @@ else if ( state == State.ROLLED_BACK ) } else if ( state == State.TERMINATED ) { - // transaction has been terminated by RESET and should be rolled back by the database - state = State.ROLLED_BACK; + // no need for explicit rollback, transaction should've been rolled back by the database + transactionClosed( State.ROLLED_BACK ); return completedWithNull(); } else { return resultCursors.retrieveNotConsumedError() .thenCompose( error -> doRollbackAsync().handle( handleCommitOrRollback( error ) ) ) - .whenComplete( transactionClosed( State.ROLLED_BACK ) ); + .whenComplete( ( ignore, error ) -> transactionClosed( State.ROLLED_BACK ) ); } } @@ -314,15 +307,14 @@ else if ( state == State.MARKED_FAILED ) } else if ( state == State.TERMINATED ) { - throw new ClientException( - "Cannot run more statements in this transaction, it has been terminated by `Session#reset()`" ); + throw new ClientException( "Cannot run more statements in this transaction, it has been terminated" ); } } @Override public boolean isOpen() { - return state.txOpen; + return state != State.COMMITTED && state != State.ROLLED_BACK; } @Override @@ -394,14 +386,11 @@ else if ( commitOrRollbackError != null ) }; } - private BiConsumer transactionClosed( State newState ) + private void transactionClosed( State newState ) { - return ( ignore, error ) -> - { - state = newState; - connection.release(); // release in background - session.setBookmark( bookmark ); - }; + state = newState; + connection.release(); // release in background + session.setBookmark( bookmark ); } private void terminateConnectionOnThreadInterrupt( String reason ) diff --git a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java index d5250e3848..1a90f3d5d7 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java @@ -46,6 +46,7 @@ import org.neo4j.driver.v1.exceptions.ClientException; import org.neo4j.driver.v1.types.TypeSystem; +import static java.util.concurrent.CompletableFuture.completedFuture; import static org.neo4j.driver.internal.util.Futures.completedWithNull; import static org.neo4j.driver.internal.util.Futures.failedFuture; import static org.neo4j.driver.v1.Values.value; @@ -257,13 +258,24 @@ public void reset() private CompletionStage resetAsync() { - return existingTransactionOrNull().thenAccept( tx -> - { - if ( tx != null ) - { - tx.markTerminated(); - } - } ).thenCompose( ignore -> releaseConnection() ); + return existingTransactionOrNull() + .thenAccept( tx -> + { + if ( tx != null ) + { + tx.markTerminated(); + } + } ) + .thenCompose( ignore -> connectionStage ) + .thenCompose( connection -> + { + if ( connection != null ) + { + // there exists an active connection, send a RESET message over it + return connection.reset(); + } + return completedWithNull(); + } ); } @Override @@ -436,7 +448,8 @@ private CompletionStage beginTransactionAsync( AccessMode m { ensureSessionIsOpen(); - transactionStage = ensureNoOpenTxBeforeStartingTx() + // create a chain that acquires connection and starts a transaction + CompletionStage newTransactionStage = ensureNoOpenTxBeforeStartingTx() .thenCompose( ignore -> acquireConnection( mode ) ) .thenCompose( connection -> { @@ -444,7 +457,23 @@ private CompletionStage beginTransactionAsync( AccessMode m return tx.beginAsync( bookmark ); } ); - return transactionStage; + // update the reference to the only known transaction + CompletionStage currentTransactionStage = transactionStage; + + transactionStage = newTransactionStage + .exceptionally( error -> null ) // ignore errors from starting new transaction + .thenCompose( tx -> + { + if ( tx == null ) + { + // failed to begin new transaction, keep reference to the existing one + return currentTransactionStage; + } + // new transaction started, keep reference to it + return completedFuture( tx ); + } ); + + return newTransactionStage; } private CompletionStage acquireConnection( AccessMode mode ) diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java b/driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java index cf3d2b2211..3d3bb0c9e6 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java @@ -28,6 +28,7 @@ import org.neo4j.driver.internal.BoltServerAddress; import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher; +import org.neo4j.driver.internal.handlers.ChannelReleasingResetResponseHandler; import org.neo4j.driver.internal.handlers.ResetResponseHandler; import org.neo4j.driver.internal.messaging.Message; import org.neo4j.driver.internal.messaging.PullAllMessage; @@ -39,6 +40,7 @@ import org.neo4j.driver.internal.util.ServerVersion; import org.neo4j.driver.v1.Value; +import static java.util.Collections.emptyMap; import static org.neo4j.driver.internal.async.ChannelAttributes.setTerminationReason; public class NettyConnection implements Connection @@ -108,15 +110,24 @@ public void runAndFlush( String statement, Map parameters, Respons } } + @Override + public CompletionStage reset() + { + CompletableFuture result = new CompletableFuture<>(); + ResetResponseHandler handler = new ResetResponseHandler( messageDispatcher, result ); + writeResetMessageIfNeeded( handler, true ); + return result; + } + @Override public CompletionStage release() { if ( status.compareAndSet( Status.OPEN, Status.RELEASED ) ) { - // auto-read could've been disabled, re-enable it to automatically receive response for RESET - setAutoRead( true ); + ChannelReleasingResetResponseHandler handler = new ChannelReleasingResetResponseHandler( channel, + channelPool, messageDispatcher, clock, releaseFuture ); - reset( new ResetResponseHandler( channel, channelPool, messageDispatcher, clock, releaseFuture ) ); + writeResetMessageIfNeeded( handler, false ); } return releaseFuture; } @@ -152,12 +163,21 @@ private void run( String statement, Map parameters, ResponseHandle pullAllHandler, flush ); } - private void reset( ResponseHandler resetHandler ) + private void writeResetMessageIfNeeded( ResponseHandler resetHandler, boolean isSessionReset ) { channel.eventLoop().execute( () -> { - messageDispatcher.muteAckFailure(); - writeAndFlushMessage( ResetMessage.RESET, resetHandler ); + if ( isSessionReset && !isOpen() ) + { + resetHandler.onSuccess( emptyMap() ); + } + else + { + messageDispatcher.muteAckFailure(); + // auto-read could've been disabled, re-enable it to automatically receive response for RESET + setAutoRead( true ); + writeAndFlushMessage( ResetMessage.RESET, resetHandler ); + } } ); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/RoutingConnection.java b/driver/src/main/java/org/neo4j/driver/internal/async/RoutingConnection.java index 264a4a61ad..b1f35ab26e 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/RoutingConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/RoutingConnection.java @@ -71,6 +71,12 @@ public void runAndFlush( String statement, Map parameters, Respons newRoutingResponseHandler( pullAllHandler ) ); } + @Override + public CompletionStage reset() + { + return delegate.reset(); + } + @Override public boolean isOpen() { diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java index 88df51a64b..a0571c6dad 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java @@ -33,6 +33,7 @@ import org.neo4j.driver.v1.Logger; import org.neo4j.driver.v1.Logging; import org.neo4j.driver.v1.Value; +import org.neo4j.driver.v1.exceptions.ClientException; import static java.util.Objects.requireNonNull; import static org.neo4j.driver.internal.messaging.AckFailureMessage.ACK_FAILURE; @@ -157,14 +158,24 @@ public void handleIgnoredMessage() log.debug( "S: IGNORED" ); ResponseHandler handler = handlers.remove(); + + Throwable error; if ( currentError != null ) { - handler.onFailure( currentError ); + error = currentError; + } + else if ( ackFailureMuted ) + { + error = new ClientException( "Database ignored the request because session has been reset" ); } else { - log.warn( "Received IGNORED message for handler %s but error is missing", handler ); + log.warn( "Received IGNORED message for handler %s but error is missing and RESET is not in progress. " + + "Current handlers %s", handler, handlers ); + + error = new ClientException( "Database ignored the request" ); } + handler.onFailure( error ); } public void handleFatalError( Throwable error ) @@ -212,15 +223,9 @@ public void muteAckFailure() * {@link #muteAckFailure()} when sending RESET message. *

* This method is not thread-safe and should only be executed by the event loop thread. - * - * @throws IllegalStateException if ACK_FAILURE is not muted right now. */ public void unMuteAckFailure() { - if ( !ackFailureMuted ) - { - throw new IllegalStateException( "Can't un-mute ACK_FAILURE because it's not muted" ); - } ackFailureMuted = false; } diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/ChannelReleasingResetResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/ChannelReleasingResetResponseHandler.java new file mode 100644 index 0000000000..5b2611240f --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/ChannelReleasingResetResponseHandler.java @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.handlers; + +import io.netty.channel.Channel; +import io.netty.channel.pool.ChannelPool; +import io.netty.util.concurrent.Future; + +import java.util.concurrent.CompletableFuture; + +import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher; +import org.neo4j.driver.internal.util.Clock; + +import static org.neo4j.driver.internal.async.ChannelAttributes.setLastUsedTimestamp; + +public class ChannelReleasingResetResponseHandler extends ResetResponseHandler +{ + private final Channel channel; + private final ChannelPool pool; + private final Clock clock; + + public ChannelReleasingResetResponseHandler( Channel channel, ChannelPool pool, + InboundMessageDispatcher messageDispatcher, Clock clock, CompletableFuture releaseFuture ) + { + super( messageDispatcher, releaseFuture ); + this.channel = channel; + this.pool = pool; + this.clock = clock; + } + + @Override + protected void resetCompleted( CompletableFuture completionFuture ) + { + setLastUsedTimestamp( channel, clock.millis() ); + + Future released = pool.release( channel ); + released.addListener( ignore -> completionFuture.complete( null ) ); + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/ResetResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/ResetResponseHandler.java index bc57e3f8c4..ad01eb85db 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/ResetResponseHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/ResetResponseHandler.java @@ -18,62 +18,50 @@ */ package org.neo4j.driver.internal.handlers; -import io.netty.channel.Channel; -import io.netty.channel.pool.ChannelPool; -import io.netty.util.concurrent.Future; - import java.util.Map; import java.util.concurrent.CompletableFuture; import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher; import org.neo4j.driver.internal.spi.ResponseHandler; -import org.neo4j.driver.internal.util.Clock; import org.neo4j.driver.v1.Value; -import static org.neo4j.driver.internal.async.ChannelAttributes.setLastUsedTimestamp; - public class ResetResponseHandler implements ResponseHandler { - private final Channel channel; - private final ChannelPool pool; private final InboundMessageDispatcher messageDispatcher; - private final Clock clock; - private final CompletableFuture releaseFuture; + private final CompletableFuture completionFuture; - public ResetResponseHandler( Channel channel, ChannelPool pool, InboundMessageDispatcher messageDispatcher, - Clock clock, CompletableFuture releaseFuture ) + public ResetResponseHandler( InboundMessageDispatcher messageDispatcher, CompletableFuture completionFuture ) { - this.channel = channel; - this.pool = pool; this.messageDispatcher = messageDispatcher; - this.clock = clock; - this.releaseFuture = releaseFuture; + this.completionFuture = completionFuture; } @Override - public void onSuccess( Map metadata ) + public final void onSuccess( Map metadata ) { - releaseChannel(); + resetCompleted(); } @Override - public void onFailure( Throwable error ) + public final void onFailure( Throwable error ) { - releaseChannel(); + resetCompleted(); } @Override - public void onRecord( Value[] fields ) + public final void onRecord( Value[] fields ) { throw new UnsupportedOperationException(); } - private void releaseChannel() + private void resetCompleted() { messageDispatcher.unMuteAckFailure(); - setLastUsedTimestamp( channel, clock.millis() ); + resetCompleted( completionFuture ); + } - Future released = pool.release( channel ); - released.addListener( ignore -> releaseFuture.complete( null ) ); + protected void resetCompleted( CompletableFuture completionFuture ) + { + completionFuture.complete( null ); } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/TransactionPullAllResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/TransactionPullAllResponseHandler.java index d0fc63f71c..d153601005 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/TransactionPullAllResponseHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/TransactionPullAllResponseHandler.java @@ -20,6 +20,7 @@ import org.neo4j.driver.internal.ExplicitTransaction; import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.internal.util.ErrorUtil; import org.neo4j.driver.v1.Statement; import static java.util.Objects.requireNonNull; @@ -43,6 +44,13 @@ protected void afterSuccess() @Override protected void afterFailure( Throwable error ) { - tx.failure(); + if ( ErrorUtil.isFatal( error ) ) + { + tx.markTerminated(); + } + else + { + tx.failure(); + } } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java b/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java index adbb83bb0f..d537f54360 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java @@ -39,6 +39,8 @@ void run( String statement, Map parameters, ResponseHandler runHan void runAndFlush( String statement, Map parameters, ResponseHandler runHandler, ResponseHandler pullAllHandler ); + CompletionStage reset(); + CompletionStage release(); void terminateAndRelease( String reason ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java b/driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java index 56f86753e5..4c3e42bbe6 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java @@ -26,6 +26,7 @@ import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ResponseHandler; import org.neo4j.driver.v1.Transaction; +import org.neo4j.driver.v1.exceptions.ClientException; import static java.util.Collections.emptyMap; import static org.junit.Assert.assertEquals; @@ -156,7 +157,7 @@ public void shouldBeClosedWhenMarkedAsTerminated() tx.markTerminated(); - assertFalse( tx.isOpen() ); + assertTrue( tx.isOpen() ); } @Test @@ -256,6 +257,38 @@ public void shouldNotReleaseConnectionWhenBeginSucceeds() verify( connection, never() ).release(); } + @Test + public void shouldReleaseConnectionWhenTerminatedAndCommitted() + { + Connection connection = connectionMock(); + ExplicitTransaction tx = new ExplicitTransaction( connection, mock( NetworkSession.class ) ); + + tx.markTerminated(); + try + { + await( tx.commitAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException ignore ) + { + } + + assertFalse( tx.isOpen() ); + verify( connection ).release(); + } + + @Test + public void shouldReleaseConnectionWhenTerminatedAndRolledBack() + { + Connection connection = connectionMock(); + ExplicitTransaction tx = new ExplicitTransaction( connection, mock( NetworkSession.class ) ); + + tx.markTerminated(); + await( tx.rollbackAsync() ); + + verify( connection ).release(); + } + private static ExplicitTransaction beginTx( Connection connection ) { return beginTx( connection, Bookmark.empty() ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java index 1f5bf6ba63..f1a4681760 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java @@ -51,7 +51,6 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; @@ -91,6 +90,7 @@ public void setUp() { connection = connectionMock(); when( connection.release() ).thenReturn( completedWithNull() ); + when( connection.reset() ).thenReturn( completedWithNull() ); when( connection.serverAddress() ).thenReturn( BoltServerAddress.LOCAL_DEFAULT ); when( connection.serverVersion() ).thenReturn( ServerVersion.v3_2_0 ); connectionProvider = mock( ConnectionProvider.class ); @@ -487,27 +487,17 @@ public void writeTxRetriedUntilFailureWhenTxCloseThrows() } @Test - public void connectionShouldBeReleasedAfterSessionReset() + public void connectionShouldBeResetAfterSessionReset() { NetworkSession session = newSession( connectionProvider, READ ); session.run( "RETURN 1" ); + verify( connection, never() ).reset(); verify( connection, never() ).release(); session.reset(); - verify( connection ).release(); - } - - @Test - public void transactionShouldBeRolledBackAfterSessionReset() - { - NetworkSession session = newSession( connectionProvider, READ ); - Transaction tx = session.beginTransaction(); - - assertTrue( tx.isOpen() ); - - session.reset(); - assertFalse( tx.isOpen() ); + verify( connection ).reset(); + verify( connection, never() ).release(); } @Test @@ -663,22 +653,64 @@ public void shouldBeginTxAfterRunFailureToAcquireConnection() } @Test - public void shouldMarkTransactionAsTerminatedAndThenReleaseConnectionOnReset() + public void shouldMarkTransactionAsTerminatedAndThenResetConnectionOnReset() { NetworkSession session = newSession( connectionProvider, READ ); Transaction tx = session.beginTransaction(); assertTrue( tx.isOpen() ); - when( connection.release() ).then( invocation -> - { - // verify that tx is not open when connection is released - assertFalse( tx.isOpen() ); - return completedWithNull(); - } ); + verify( connection, never() ).reset(); session.reset(); - verify( connection ).release(); + verify( connection ).reset(); + } + + @Test + public void shouldNotAllowStartingMultipleTransactions() + { + NetworkSession session = newSession( connectionProvider, READ ); + + Transaction tx = session.beginTransaction(); + assertNotNull( tx ); + + for ( int i = 0; i < 5; i++ ) + { + try + { + session.beginTransaction(); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.getMessage(), + containsString( "You cannot begin a transaction on a session with an open transaction" ) ); + } + } + } + + @Test + public void shouldAllowStartingTransactionAfterCurrentOneIsClosed() + { + NetworkSession session = newSession( connectionProvider, READ ); + + Transaction tx = session.beginTransaction(); + assertNotNull( tx ); + + try + { + session.beginTransaction(); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.getMessage(), + containsString( "You cannot begin a transaction on a session with an open transaction" ) ); + } + + tx.close(); + + assertNotNull( session.beginTransaction() ); } private void testConnectionAcquisition( AccessMode sessionMode, AccessMode transactionMode ) diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/ChannelConnectorImplTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/ChannelConnectorImplTest.java index 16d7a39003..883ee0e37b 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/ChannelConnectorImplTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/ChannelConnectorImplTest.java @@ -27,8 +27,6 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; -import org.junit.rules.RuleChain; -import org.junit.rules.Timeout; import java.io.IOException; import java.net.ConnectException; @@ -61,9 +59,8 @@ public class ChannelConnectorImplTest { - private final TestNeo4j neo4j = new TestNeo4j(); @Rule - public final RuleChain ruleChain = RuleChain.outerRule( Timeout.seconds( 60 ) ).around( neo4j ); + public final TestNeo4j neo4j = new TestNeo4j(); private Bootstrap bootstrap; diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/NettyConnectionTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/NettyConnectionTest.java index 471b9592cc..b96f39b6e8 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/NettyConnectionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/NettyConnectionTest.java @@ -29,6 +29,7 @@ import org.mockito.ArgumentCaptor; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -54,6 +55,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.neo4j.driver.internal.async.ChannelAttributes.messageDispatcher; import static org.neo4j.driver.internal.async.ChannelAttributes.setMessageDispatcher; import static org.neo4j.driver.internal.async.ChannelAttributes.terminationReason; import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; @@ -77,14 +79,14 @@ public void tearDown() throws Exception @Test public void shouldBeOpenAfterCreated() { - NettyConnection connection = newConnection( new EmbeddedChannel() ); + NettyConnection connection = newConnection( newChannel() ); assertTrue( connection.isOpen() ); } @Test public void shouldNotBeOpenAfterRelease() { - NettyConnection connection = newConnection( new EmbeddedChannel() ); + NettyConnection connection = newConnection( newChannel() ); connection.release(); assertFalse( connection.isOpen() ); } @@ -92,10 +94,7 @@ public void shouldNotBeOpenAfterRelease() @Test public void shouldSendResetOnRelease() { - EmbeddedChannel channel = new EmbeddedChannel(); - InboundMessageDispatcher messageDispatcher = new InboundMessageDispatcher( channel, DEV_NULL_LOGGING ); - ChannelAttributes.setMessageDispatcher( channel, messageDispatcher ); - + EmbeddedChannel channel = newChannel(); NettyConnection connection = newConnection( channel ); connection.release(); @@ -128,19 +127,21 @@ public void shouldWriteForceReleaseInEventLoopThread() throws Exception @Test public void shouldEnableAutoReadWhenReleased() { - EmbeddedChannel channel = new EmbeddedChannel(); + EmbeddedChannel channel = newChannel(); channel.config().setAutoRead( false ); NettyConnection connection = newConnection( channel ); connection.release(); + channel.runPendingTasks(); + assertTrue( channel.config().isAutoRead() ); } @Test public void shouldNotDisableAutoReadWhenReleased() { - EmbeddedChannel channel = new EmbeddedChannel(); + EmbeddedChannel channel = newChannel(); channel.config().setAutoRead( true ); NettyConnection connection = newConnection( channel ); @@ -155,7 +156,7 @@ public void shouldNotRunWhenReleased() { ResponseHandler runHandler = mock( ResponseHandler.class ); ResponseHandler pullAllHandler = mock( ResponseHandler.class ); - NettyConnection connection = newConnection( new EmbeddedChannel() ); + NettyConnection connection = newConnection( newChannel() ); connection.release(); connection.run( "RETURN 1", emptyMap(), runHandler, pullAllHandler ); @@ -170,7 +171,7 @@ public void shouldNotRunAndFlushWhenReleased() { ResponseHandler runHandler = mock( ResponseHandler.class ); ResponseHandler pullAllHandler = mock( ResponseHandler.class ); - NettyConnection connection = newConnection( new EmbeddedChannel() ); + NettyConnection connection = newConnection( newChannel() ); connection.release(); connection.runAndFlush( "RETURN 1", emptyMap(), runHandler, pullAllHandler ); @@ -185,7 +186,7 @@ public void shouldNotRunWhenTerminated() { ResponseHandler runHandler = mock( ResponseHandler.class ); ResponseHandler pullAllHandler = mock( ResponseHandler.class ); - NettyConnection connection = newConnection( new EmbeddedChannel() ); + NettyConnection connection = newConnection( newChannel() ); connection.terminateAndRelease( "42" ); connection.run( "RETURN 1", emptyMap(), runHandler, pullAllHandler ); @@ -200,7 +201,7 @@ public void shouldNotRunAndFlushWhenTerminated() { ResponseHandler runHandler = mock( ResponseHandler.class ); ResponseHandler pullAllHandler = mock( ResponseHandler.class ); - NettyConnection connection = newConnection( new EmbeddedChannel() ); + NettyConnection connection = newConnection( newChannel() ); connection.terminateAndRelease( "42" ); connection.runAndFlush( "RETURN 1", emptyMap(), runHandler, pullAllHandler ); @@ -213,7 +214,7 @@ public void shouldNotRunAndFlushWhenTerminated() @Test public void shouldReturnServerAddressWhenReleased() { - EmbeddedChannel channel = new EmbeddedChannel(); + EmbeddedChannel channel = newChannel(); BoltServerAddress address = new BoltServerAddress( "host", 4242 ); ChannelAttributes.setServerAddress( channel, address ); @@ -226,7 +227,7 @@ public void shouldReturnServerAddressWhenReleased() @Test public void shouldReturnServerVersionWhenReleased() { - EmbeddedChannel channel = new EmbeddedChannel(); + EmbeddedChannel channel = newChannel(); ServerVersion version = ServerVersion.v3_2_0; ChannelAttributes.setServerVersion( channel, version ); @@ -239,10 +240,7 @@ public void shouldReturnServerVersionWhenReleased() @Test public void shouldReturnSameCompletionStageFromRelease() { - EmbeddedChannel channel = new EmbeddedChannel(); - InboundMessageDispatcher messageDispatcher = new InboundMessageDispatcher( channel, DEV_NULL_LOGGING ); - ChannelAttributes.setMessageDispatcher( channel, messageDispatcher ); - + EmbeddedChannel channel = newChannel(); NettyConnection connection = newConnection( channel ); CompletionStage releaseStage1 = connection.release(); @@ -263,7 +261,7 @@ public void shouldReturnSameCompletionStageFromRelease() @Test public void shouldEnableAutoRead() { - EmbeddedChannel channel = new EmbeddedChannel(); + EmbeddedChannel channel = newChannel(); channel.config().setAutoRead( false ); NettyConnection connection = newConnection( channel ); @@ -275,7 +273,7 @@ public void shouldEnableAutoRead() @Test public void shouldDisableAutoRead() { - EmbeddedChannel channel = new EmbeddedChannel(); + EmbeddedChannel channel = newChannel(); channel.config().setAutoRead( true ); NettyConnection connection = newConnection( channel ); @@ -287,7 +285,7 @@ public void shouldDisableAutoRead() @Test public void shouldSetTerminationReasonOnChannelWhenTerminated() { - EmbeddedChannel channel = new EmbeddedChannel(); + EmbeddedChannel channel = newChannel(); NettyConnection connection = newConnection( channel ); String reason = "Something really bad has happened"; @@ -299,7 +297,7 @@ public void shouldSetTerminationReasonOnChannelWhenTerminated() @Test public void shouldCloseChannelWhenTerminated() { - EmbeddedChannel channel = new EmbeddedChannel(); + EmbeddedChannel channel = newChannel(); NettyConnection connection = newConnection( channel ); assertTrue( channel.isActive() ); @@ -311,7 +309,7 @@ public void shouldCloseChannelWhenTerminated() @Test public void shouldReleaseChannelWhenTerminated() { - EmbeddedChannel channel = new EmbeddedChannel(); + EmbeddedChannel channel = newChannel(); ChannelPool pool = mock( ChannelPool.class ); NettyConnection connection = newConnection( channel, pool ); verify( pool, never() ).release( any() ); @@ -324,7 +322,7 @@ public void shouldReleaseChannelWhenTerminated() @Test public void shouldNotReleaseChannelMultipleTimesWhenTerminatedMultipleTimes() { - EmbeddedChannel channel = new EmbeddedChannel(); + EmbeddedChannel channel = newChannel(); ChannelPool pool = mock( ChannelPool.class ); NettyConnection connection = newConnection( channel, pool ); verify( pool, never() ).release( any() ); @@ -342,7 +340,7 @@ public void shouldNotReleaseChannelMultipleTimesWhenTerminatedMultipleTimes() @Test public void shouldNotReleaseAfterTermination() { - EmbeddedChannel channel = new EmbeddedChannel(); + EmbeddedChannel channel = newChannel(); ChannelPool pool = mock( ChannelPool.class ); NettyConnection connection = newConnection( channel, pool ); verify( pool, never() ).release( any() ); @@ -356,6 +354,93 @@ public void shouldNotReleaseAfterTermination() verify( pool ).release( channel ); } + @Test + public void shouldSendResetMessageWhenReset() + { + EmbeddedChannel channel = newChannel(); + NettyConnection connection = newConnection( channel ); + + connection.reset(); + channel.runPendingTasks(); + + assertEquals( 1, channel.outboundMessages().size() ); + assertEquals( RESET, channel.readOutbound() ); + } + + @Test + public void shouldCompleteResetFutureWhenSuccessResponseArrives() + { + EmbeddedChannel channel = newChannel(); + NettyConnection connection = newConnection( channel ); + + CompletableFuture resetFuture = connection.reset().toCompletableFuture(); + channel.runPendingTasks(); + assertFalse( resetFuture.isDone() ); + + messageDispatcher( channel ).handleSuccessMessage( emptyMap() ); + assertTrue( resetFuture.isDone() ); + assertFalse( resetFuture.isCompletedExceptionally() ); + } + + @Test + public void shouldCompleteResetFutureWhenFailureResponseArrives() + { + EmbeddedChannel channel = newChannel(); + NettyConnection connection = newConnection( channel ); + + CompletableFuture resetFuture = connection.reset().toCompletableFuture(); + channel.runPendingTasks(); + assertFalse( resetFuture.isDone() ); + + messageDispatcher( channel ).handleFailureMessage( "Neo.TransientError.Transaction.Terminated", "Message" ); + assertTrue( resetFuture.isDone() ); + assertFalse( resetFuture.isCompletedExceptionally() ); + } + + @Test + public void shouldDoNothingInResetWhenClosed() + { + EmbeddedChannel channel = newChannel(); + NettyConnection connection = newConnection( channel ); + + connection.release(); + channel.runPendingTasks(); + + CompletableFuture resetFuture = connection.reset().toCompletableFuture(); + channel.runPendingTasks(); + + assertEquals( 1, channel.outboundMessages().size() ); + assertEquals( RESET, channel.readOutbound() ); + assertTrue( resetFuture.isDone() ); + assertFalse( resetFuture.isCompletedExceptionally() ); + } + + @Test + public void shouldMuteAckFailureWhenReset() + { + InboundMessageDispatcher messageDispatcher = mock( InboundMessageDispatcher.class ); + EmbeddedChannel channel = newChannel( messageDispatcher ); + NettyConnection connection = newConnection( channel ); + + connection.reset(); + channel.runPendingTasks(); + + verify( messageDispatcher ).muteAckFailure(); + } + + @Test + public void shouldEnableAutoReadWhenDoingReset() + { + EmbeddedChannel channel = newChannel(); + channel.config().setAutoRead( false ); + NettyConnection connection = newConnection( channel ); + + connection.reset(); + channel.runPendingTasks(); + + assertTrue( channel.config().isAutoRead() ); + } + private void testWriteInEventLoop( String threadName, Consumer action ) throws Exception { EmbeddedChannel channel = spy( new EmbeddedChannel() ); @@ -390,6 +475,21 @@ private void shutdownEventLoop() throws Exception } } + private static EmbeddedChannel newChannel() + { + EmbeddedChannel channel = new EmbeddedChannel(); + InboundMessageDispatcher messageDispatcher = new InboundMessageDispatcher( channel, DEV_NULL_LOGGING ); + ChannelAttributes.setMessageDispatcher( channel, messageDispatcher ); + return channel; + } + + private static EmbeddedChannel newChannel( InboundMessageDispatcher messageDispatcher ) + { + EmbeddedChannel channel = new EmbeddedChannel(); + ChannelAttributes.setMessageDispatcher( channel, messageDispatcher ); + return channel; + } + private static NettyConnection newConnection( Channel channel ) { return newConnection( channel, mock( ChannelPool.class ) ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcherTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcherTest.java index ca55f85006..104d72be3a 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcherTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcherTest.java @@ -30,8 +30,10 @@ import org.neo4j.driver.internal.spi.ResponseHandler; import org.neo4j.driver.internal.value.IntegerValue; import org.neo4j.driver.v1.Value; +import org.neo4j.driver.v1.exceptions.ClientException; import org.neo4j.driver.v1.exceptions.Neo4jException; +import static java.util.Collections.emptyMap; import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -148,19 +150,18 @@ public void shouldNotSendAckFailureOnFailureWhenMuted() } @Test - public void shouldFailToUnMuteAckFailureWhenNotMuted() + public void shouldUnMuteAckFailureWhenNotMuted() { - InboundMessageDispatcher dispatcher = newDispatcher( mock( Channel.class ) ); + Channel channel = mock( Channel.class ); + InboundMessageDispatcher dispatcher = newDispatcher( channel ); - try - { - dispatcher.unMuteAckFailure(); - fail( "Exception expected" ); - } - catch ( IllegalStateException e ) - { - assertEquals( "Can't un-mute ACK_FAILURE because it's not muted", e.getMessage() ); - } + dispatcher.unMuteAckFailure(); + + dispatcher.queue( mock( ResponseHandler.class ) ); + assertEquals( 1, dispatcher.queuedHandlersCount() ); + + dispatcher.handleFailureMessage( FAILURE_CODE, FAILURE_MESSAGE ); + verify( channel ).writeAndFlush( eq( ACK_FAILURE ), any() ); } @Test @@ -268,7 +269,49 @@ public void shouldDequeHandlerOnIgnored() dispatcher.handleIgnoredMessage(); assertEquals( 0, dispatcher.queuedHandlersCount() ); - verifyZeroInteractions( handler ); + } + + @Test + public void shouldFailHandlerOnIgnoredMessageWithExistingError() + { + InboundMessageDispatcher dispatcher = newDispatcher(); + ResponseHandler handler1 = mock( ResponseHandler.class ); + ResponseHandler handler2 = mock( ResponseHandler.class ); + + dispatcher.queue( handler1 ); + dispatcher.queue( handler2 ); + + dispatcher.handleFailureMessage( FAILURE_CODE, FAILURE_MESSAGE ); + verifyFailure( handler1 ); + verifyZeroInteractions( handler2 ); + + dispatcher.handleIgnoredMessage(); + verifyFailure( handler2 ); + } + + @Test + public void shouldFailHandlerOnIgnoredMessageWhenHandlingReset() + { + InboundMessageDispatcher dispatcher = newDispatcher(); + ResponseHandler handler = mock( ResponseHandler.class ); + dispatcher.queue( handler ); + + dispatcher.muteAckFailure(); + dispatcher.handleIgnoredMessage(); + + verify( handler ).onFailure( any( ClientException.class ) ); + } + + @Test + public void shouldFailHandlerOnIgnoredMessageWhenNoErrorAndNotHandlingReset() + { + InboundMessageDispatcher dispatcher = newDispatcher(); + ResponseHandler handler = mock( ResponseHandler.class ); + dispatcher.queue( handler ); + + dispatcher.handleIgnoredMessage(); + + verify( handler ).onFailure( any( ClientException.class ) ); } @Test @@ -296,7 +339,7 @@ public void shouldNotSupportInitMessage() try { - dispatcher.handleInitMessage( "Client", Collections.emptyMap() ); + dispatcher.handleInitMessage( "Client", emptyMap() ); fail( "Exception expected" ); } catch ( Exception e ) @@ -312,7 +355,7 @@ public void shouldNotSupportRunMessage() try { - dispatcher.handleRunMessage( "RETURN 1", Collections.emptyMap() ); + dispatcher.handleRunMessage( "RETURN 1", emptyMap() ); fail( "Exception expected" ); } catch ( Exception e ) diff --git a/driver/src/test/java/org/neo4j/driver/internal/handlers/ChannelReleasingResetResponseHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/handlers/ChannelReleasingResetResponseHandlerTest.java new file mode 100644 index 0000000000..1d2ef5001d --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/handlers/ChannelReleasingResetResponseHandlerTest.java @@ -0,0 +1,130 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.handlers; + +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.channel.pool.ChannelPool; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.ImmediateEventExecutor; +import org.junit.After; +import org.junit.Test; + +import java.util.concurrent.CompletableFuture; + +import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher; +import org.neo4j.driver.internal.util.Clock; +import org.neo4j.driver.internal.util.FakeClock; + +import static java.util.Collections.emptyMap; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.neo4j.driver.internal.async.ChannelAttributes.lastUsedTimestamp; + +public class ChannelReleasingResetResponseHandlerTest +{ + private final EmbeddedChannel channel = new EmbeddedChannel(); + private final InboundMessageDispatcher messageDispatcher = mock( InboundMessageDispatcher.class ); + + @After + public void tearDown() + { + channel.finishAndReleaseAll(); + } + + @Test + public void shouldReleaseChannelOnSuccess() + { + ChannelPool pool = newChannelPoolMock(); + FakeClock clock = new FakeClock(); + clock.progress( 5 ); + CompletableFuture releaseFuture = new CompletableFuture<>(); + ChannelReleasingResetResponseHandler handler = newHandler( pool, clock, releaseFuture ); + + handler.onSuccess( emptyMap() ); + + verifyLastUsedTimestamp( 5 ); + verify( pool ).release( eq( channel ) ); + assertTrue( releaseFuture.isDone() ); + assertFalse( releaseFuture.isCompletedExceptionally() ); + } + + @Test + public void shouldReleaseChannelOnFailure() + { + ChannelPool pool = newChannelPoolMock(); + FakeClock clock = new FakeClock(); + clock.progress( 100 ); + CompletableFuture releaseFuture = new CompletableFuture<>(); + ChannelReleasingResetResponseHandler handler = newHandler( pool, clock, releaseFuture ); + + handler.onFailure( new RuntimeException() ); + + verifyLastUsedTimestamp( 100 ); + verify( pool ).release( eq( channel ) ); + assertTrue( releaseFuture.isDone() ); + assertFalse( releaseFuture.isCompletedExceptionally() ); + } + + @Test + public void shouldUnMuteAckFailureOnSuccess() + { + ChannelPool pool = newChannelPoolMock(); + ChannelReleasingResetResponseHandler handler = newHandler( pool, new FakeClock(), new CompletableFuture<>() ); + + handler.onSuccess( emptyMap() ); + + verify( messageDispatcher ).unMuteAckFailure(); + } + + @Test + public void shouldUnMuteAckFailureOnFailure() + { + ChannelPool pool = newChannelPoolMock(); + ChannelReleasingResetResponseHandler handler = newHandler( pool, new FakeClock(), new CompletableFuture<>() ); + + handler.onFailure( new RuntimeException() ); + + verify( messageDispatcher ).unMuteAckFailure(); + } + + private void verifyLastUsedTimestamp( int expectedValue ) + { + assertEquals( expectedValue, lastUsedTimestamp( channel ).intValue() ); + } + + private ChannelReleasingResetResponseHandler newHandler( ChannelPool pool, Clock clock, + CompletableFuture releaseFuture ) + { + return new ChannelReleasingResetResponseHandler( channel, pool, messageDispatcher, clock, releaseFuture ); + } + + private static ChannelPool newChannelPoolMock() + { + ChannelPool pool = mock( ChannelPool.class ); + Future releasedFuture = ImmediateEventExecutor.INSTANCE.newSucceededFuture( null ); + when( pool.release( any() ) ).thenReturn( releasedFuture ); + return pool; + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/handlers/ResetResponseHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/handlers/ResetResponseHandlerTest.java index 3e87ab1376..92837f4b88 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/handlers/ResetResponseHandlerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/handlers/ResetResponseHandlerTest.java @@ -18,112 +18,96 @@ */ package org.neo4j.driver.internal.handlers; -import io.netty.channel.embedded.EmbeddedChannel; -import io.netty.channel.pool.ChannelPool; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.ImmediateEventExecutor; -import org.junit.After; import org.junit.Test; import java.util.concurrent.CompletableFuture; import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher; -import org.neo4j.driver.internal.util.Clock; -import org.neo4j.driver.internal.util.FakeClock; import static java.util.Collections.emptyMap; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; +import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import static org.neo4j.driver.internal.async.ChannelAttributes.lastUsedTimestamp; +import static org.neo4j.driver.v1.Values.values; public class ResetResponseHandlerTest { - private final EmbeddedChannel channel = new EmbeddedChannel(); - private final InboundMessageDispatcher messageDispatcher = mock( InboundMessageDispatcher.class ); - - @After - public void tearDown() - { - channel.finishAndReleaseAll(); - } - @Test - public void shouldReleaseChannelOnSuccess() + public void shouldCompleteFutureOnSuccess() throws Exception { - ChannelPool pool = newChannelPoolMock(); - FakeClock clock = new FakeClock(); - clock.progress( 5 ); - CompletableFuture releaseFuture = new CompletableFuture<>(); - ResetResponseHandler handler = newHandler( pool, clock, releaseFuture ); + CompletableFuture future = new CompletableFuture<>(); + ResetResponseHandler handler = newHandler( future ); + + assertFalse( future.isDone() ); handler.onSuccess( emptyMap() ); - verifyLastUsedTimestamp( 5 ); - verify( pool ).release( eq( channel ) ); - assertTrue( releaseFuture.isDone() ); - assertFalse( releaseFuture.isCompletedExceptionally() ); + assertTrue( future.isDone() ); + assertNull( future.get() ); } @Test - public void shouldReleaseChannelOnFailure() + public void shouldUnMuteAckFailureOnSuccess() { - ChannelPool pool = newChannelPoolMock(); - FakeClock clock = new FakeClock(); - clock.progress( 100 ); - CompletableFuture releaseFuture = new CompletableFuture<>(); - ResetResponseHandler handler = newHandler( pool, clock, releaseFuture ); + InboundMessageDispatcher messageDispatcher = mock( InboundMessageDispatcher.class ); + ResetResponseHandler handler = newHandler( messageDispatcher, new CompletableFuture<>() ); - handler.onFailure( new RuntimeException() ); + handler.onSuccess( emptyMap() ); - verifyLastUsedTimestamp( 100 ); - verify( pool ).release( eq( channel ) ); - assertTrue( releaseFuture.isDone() ); - assertFalse( releaseFuture.isCompletedExceptionally() ); + verify( messageDispatcher ).unMuteAckFailure(); } @Test - public void shouldUnMuteAckFailureOnSuccess() + public void shouldCompleteFutureOnFailure() throws Exception { - ChannelPool pool = newChannelPoolMock(); - ResetResponseHandler handler = newHandler( pool, new FakeClock(), new CompletableFuture<>() ); + CompletableFuture future = new CompletableFuture<>(); + ResetResponseHandler handler = newHandler( future ); - handler.onSuccess( emptyMap() ); + assertFalse( future.isDone() ); - verify( messageDispatcher ).unMuteAckFailure(); + handler.onFailure( new RuntimeException() ); + + assertTrue( future.isDone() ); + assertNull( future.get() ); } @Test public void shouldUnMuteAckFailureOnFailure() { - ChannelPool pool = newChannelPoolMock(); - ResetResponseHandler handler = newHandler( pool, new FakeClock(), new CompletableFuture<>() ); + InboundMessageDispatcher messageDispatcher = mock( InboundMessageDispatcher.class ); + ResetResponseHandler handler = newHandler( messageDispatcher, new CompletableFuture<>() ); handler.onFailure( new RuntimeException() ); verify( messageDispatcher ).unMuteAckFailure(); } - private void verifyLastUsedTimestamp( int expectedValue ) + @Test + public void shouldThrowWhenOnRecord() { - assertEquals( expectedValue, lastUsedTimestamp( channel ).intValue() ); + ResetResponseHandler handler = newHandler( new CompletableFuture<>() ); + + try + { + handler.onRecord( values( 1, 2, 3 ) ); + fail( "Exception expected" ); + } + catch ( UnsupportedOperationException ignore ) + { + } } - private ResetResponseHandler newHandler( ChannelPool pool, Clock clock, CompletableFuture releaseFuture ) + private static ResetResponseHandler newHandler( CompletableFuture future ) { - return new ResetResponseHandler( channel, pool, messageDispatcher, clock, releaseFuture ); + return new ResetResponseHandler( mock( InboundMessageDispatcher.class ), future ); } - private static ChannelPool newChannelPoolMock() + private static ResetResponseHandler newHandler( InboundMessageDispatcher messageDispatcher, + CompletableFuture future ) { - ChannelPool pool = mock( ChannelPool.class ); - Future releasedFuture = ImmediateEventExecutor.INSTANCE.newSucceededFuture( null ); - when( pool.release( any() ) ).thenReturn( releasedFuture ); - return pool; + return new ResetResponseHandler( messageDispatcher, future ); } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/handlers/TransactionPullAllResponseHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/handlers/TransactionPullAllResponseHandlerTest.java index 59b635357a..073ad596b9 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/handlers/TransactionPullAllResponseHandlerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/handlers/TransactionPullAllResponseHandlerTest.java @@ -20,6 +20,7 @@ import org.junit.Test; +import java.io.IOException; import java.util.concurrent.CompletableFuture; import org.neo4j.driver.internal.BoltServerAddress; @@ -27,6 +28,10 @@ import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.util.ServerVersion; import org.neo4j.driver.v1.Statement; +import org.neo4j.driver.v1.exceptions.ClientException; +import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; +import org.neo4j.driver.v1.exceptions.SessionExpiredException; +import org.neo4j.driver.v1.exceptions.TransientException; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -35,7 +40,26 @@ public class TransactionPullAllResponseHandlerTest { @Test - public void shouldMarkTransactionAsFailedOnFailure() + public void shouldMarkTransactionAsFailedOnNonFatalFailures() + { + testErrorHandling( new ClientException( "Neo.ClientError.Cluster.NotALeader", "" ), false ); + testErrorHandling( new ClientException( "Neo.ClientError.Procedure.ProcedureCallFailed", "" ), false ); + testErrorHandling( new TransientException( "Neo.TransientError.Transaction.Terminated", "" ), false ); + testErrorHandling( new TransientException( "Neo.TransientError.General.DatabaseUnavailable", "" ), false ); + } + + @Test + public void shouldMarkTransactionAsTerminatedOnFatalFailures() + { + testErrorHandling( new RuntimeException(), true ); + testErrorHandling( new IOException(), true ); + testErrorHandling( new ServiceUnavailableException( "" ), true ); + testErrorHandling( new SessionExpiredException( "" ), true ); + testErrorHandling( new SessionExpiredException( "" ), true ); + testErrorHandling( new ClientException( "Neo.ClientError.Request.Invalid" ), true ); + } + + private static void testErrorHandling( Throwable error, boolean fatal ) { Connection connection = mock( Connection.class ); when( connection.serverAddress() ).thenReturn( BoltServerAddress.LOCAL_DEFAULT ); @@ -44,8 +68,15 @@ public void shouldMarkTransactionAsFailedOnFailure() TransactionPullAllResponseHandler handler = new TransactionPullAllResponseHandler( new Statement( "RETURN 1" ), new RunResponseHandler( new CompletableFuture<>() ), connection, tx ); - handler.onFailure( new RuntimeException() ); + handler.onFailure( error ); - verify( tx ).failure(); + if ( fatal ) + { + verify( tx ).markTerminated(); + } + else + { + verify( tx ).failure(); + } } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/util/ChannelTrackingDriverFactory.java b/driver/src/test/java/org/neo4j/driver/internal/util/ChannelTrackingDriverFactory.java index 7c8cac4ddb..59b047d28b 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/util/ChannelTrackingDriverFactory.java +++ b/driver/src/test/java/org/neo4j/driver/internal/util/ChannelTrackingDriverFactory.java @@ -27,6 +27,7 @@ import org.neo4j.driver.internal.BoltServerAddress; import org.neo4j.driver.internal.ConnectionSettings; +import org.neo4j.driver.internal.async.BootstrapFactory; import org.neo4j.driver.internal.async.ChannelConnector; import org.neo4j.driver.internal.security.SecurityPlan; import org.neo4j.driver.internal.spi.ConnectionPool; @@ -36,11 +37,24 @@ public class ChannelTrackingDriverFactory extends DriverFactoryWithClock { private final List channels = new CopyOnWriteArrayList<>(); + private final int eventLoopThreads; private ConnectionPool pool; public ChannelTrackingDriverFactory( Clock clock ) + { + this( 0, clock ); + } + + public ChannelTrackingDriverFactory( int eventLoopThreads, Clock clock ) { super( clock ); + this.eventLoopThreads = eventLoopThreads; + } + + @Override + protected Bootstrap createBootstrap() + { + return eventLoopThreads == 0 ? super.createBootstrap() : BootstrapFactory.newBootstrap( eventLoopThreads ); } @Override diff --git a/driver/src/test/java/org/neo4j/driver/internal/util/FailingConnectionDriverFactory.java b/driver/src/test/java/org/neo4j/driver/internal/util/FailingConnectionDriverFactory.java index 559b3aa101..cb7ec6cba2 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/util/FailingConnectionDriverFactory.java +++ b/driver/src/test/java/org/neo4j/driver/internal/util/FailingConnectionDriverFactory.java @@ -140,6 +140,12 @@ public void runAndFlush( String statement, Map parameters, Respons delegate.runAndFlush( statement, parameters, runHandler, pullAllHandler ); } + @Override + public CompletionStage reset() + { + return delegate.reset(); + } + @Override public CompletionStage release() { diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java index 318c7f0d55..4b45abc90e 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java @@ -18,7 +18,6 @@ */ package org.neo4j.driver.v1.integration; -import org.hamcrest.MatcherAssert; import org.junit.After; import org.junit.Rule; import org.junit.Test; @@ -38,7 +37,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; import org.neo4j.driver.internal.DriverFactory; import org.neo4j.driver.internal.cluster.RoutingContext; @@ -60,7 +58,6 @@ import org.neo4j.driver.v1.TransactionWork; import org.neo4j.driver.v1.exceptions.AuthenticationException; import org.neo4j.driver.v1.exceptions.ClientException; -import org.neo4j.driver.v1.exceptions.Neo4jException; import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; import org.neo4j.driver.v1.exceptions.TransientException; import org.neo4j.driver.v1.summary.ResultSummary; @@ -71,13 +68,9 @@ import static java.lang.String.format; import static java.util.Collections.emptyList; import static org.hamcrest.CoreMatchers.containsString; -import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.CoreMatchers.notNullValue; -import static org.hamcrest.CoreMatchers.startsWith; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.emptyArray; -import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertEquals; @@ -165,281 +158,6 @@ public void shouldHandleNullAuthToken() throws Throwable driver = GraphDatabase.driver( neo4j.uri(), token ); } - @Test - public void shouldKillLongRunningStatement() throws Throwable - { - neo4j.ensureProcedures( "longRunningStatement.jar" ); - // Given - int executionTimeout = 10; // 10s - final int killTimeout = 1; // 1s - long startTime = -1, endTime; - - try ( Session session = neo4j.driver().session() ) - { - StatementResult result = - session.run( "CALL test.driver.longRunningStatement({seconds})", - parameters( "seconds", executionTimeout ) ); - - resetSessionAfterTimeout( session, killTimeout ); - - // When - startTime = System.currentTimeMillis(); - result.consume();// blocking to run the statement - - fail( "Should have got an exception about statement get killed." ); - } - catch ( Neo4jException e ) - { - endTime = System.currentTimeMillis(); - assertTrue( startTime > 0 ); - assertTrue( endTime - startTime > killTimeout * 1000 ); // get reset by session.reset - assertTrue( endTime - startTime < executionTimeout * 1000 / 2 ); // finished before execution finished - } - catch ( Exception e ) - { - fail( "Should be a Neo4jException" ); - } - } - - @Test - public void shouldKillLongStreamingResult() throws Throwable - { - neo4j.ensureProcedures( "longRunningStatement.jar" ); - // Given - int executionTimeout = 10; // 10s - final int killTimeout = 1; // 1s - long startTime = -1, endTime; - int recordCount = 0; - - try ( final Session session = neo4j.driver().session() ) - { - StatementResult result = session.run( "CALL test.driver.longStreamingResult({seconds})", - parameters( "seconds", executionTimeout ) ); - - resetSessionAfterTimeout( session, killTimeout ); - - // When - startTime = System.currentTimeMillis(); - while ( result.hasNext() ) - { - result.next(); - recordCount++; - } - - fail( "Should have got an exception about streaming get killed." ); - } - catch ( ClientException e ) - { - endTime = System.currentTimeMillis(); - assertThat( e.code(), equalTo( "Neo.ClientError.Procedure.ProcedureCallFailed" ) ); - assertThat( recordCount, greaterThan( 1 ) ); - - assertTrue( startTime > 0 ); - assertTrue( endTime - startTime > killTimeout * 1000 ); // get reset by session.reset - assertTrue( endTime - startTime < executionTimeout * 1000 / 2 ); // finished before execution finished - } - } - - @SuppressWarnings( "deprecation" ) - @Test - public void shouldAllowBeginTxIfResetFailureIsNotConsumed() throws Throwable - { - // Given - neo4j.ensureProcedures( "longRunningStatement.jar" ); - - try ( Session session = neo4j.driver().session() ) - { - Transaction tx1 = session.beginTransaction(); - - tx1.run( "CALL test.driver.longRunningStatement({seconds})", - parameters( "seconds", 10 ) ); - Thread.sleep( 1000 ); - session.reset(); - - // When - Transaction tx2 = session.beginTransaction(); - - // Then - assertThat( tx2, notNullValue() ); - - exception.expect( ClientException.class ); // errors differ depending of neo4j version - exception.expectMessage( - "Cannot run more statements in this transaction, it has been terminated by `Session#reset()`" ); - - tx1.run( "RETURN 1" ); - } - } - - @SuppressWarnings( "deprecation" ) - @Test - public void shouldThrowExceptionOnCloseIfResetFailureIsNotConsumed() throws Throwable - { - // Given - neo4j.ensureProcedures( "longRunningStatement.jar" ); - - Session session = neo4j.driver().session(); - session.run( "CALL test.driver.longRunningStatement({seconds})", - parameters( "seconds", 10 ) ); - Thread.sleep( 1000 ); - session.reset(); - - exception.expect( Neo4jException.class ); // errors differ depending of neo4j version - exception.expectMessage( containsString( "The transaction has been terminated" ) ); - - // When & Then - session.close(); - } - - @SuppressWarnings( "deprecation" ) - @Test - public void shouldBeAbleToBeginTxAfterResetFailureIsConsumed() throws Throwable - { - // Given - neo4j.ensureProcedures( "longRunningStatement.jar" ); - - try ( Session session = neo4j.driver().session() ) - { - Transaction tx = session.beginTransaction(); - - StatementResult procedureResult = tx.run( "CALL test.driver.longRunningStatement({seconds})", - parameters( "seconds", 10 ) ); - Thread.sleep( 1000 ); - session.reset(); - - try - { - procedureResult.consume(); - fail( "Should procedure throw an exception as we interrupted procedure call" ); - } - catch ( Neo4jException e ) - { - assertThat( e.getMessage(), containsString( "The transaction has been terminated" ) ); - } - catch ( Throwable e ) - { - fail( "Expected exception is different from what we've received: " + e.getMessage() ); - } - - // When - tx = session.beginTransaction(); - tx.run( "CREATE (n:FirstNode)" ); - tx.success(); - tx.close(); - - // Then - StatementResult result = session.run( "MATCH (n) RETURN count(n)" ); - long nodes = result.single().get( "count(n)" ).asLong(); - MatcherAssert.assertThat( nodes, equalTo( 1L ) ); - } - } - - @SuppressWarnings( "deprecation" ) - private void resetSessionAfterTimeout( final Session session, final int timeout ) - { - new Thread( () -> - { - try - { - Thread.sleep( timeout * 1000 ); // let the statement executing for timeout seconds - } - catch ( InterruptedException e ) - { - e.printStackTrace(); - } - finally - { - session.reset(); // reset the session after timeout - } - } ).start(); - } - - @SuppressWarnings( "deprecation" ) - @Test - public void shouldAllowMoreStatementAfterSessionReset() - { - // Given - try ( Session session = neo4j.driver().session() ) - { - - session.run( "Return 1" ).consume(); - - // When reset the state of this session - session.reset(); - - // Then can run successfully more statements without any error - session.run( "Return 2" ).consume(); - } - } - - @SuppressWarnings( "deprecation" ) - @Test - public void shouldAllowMoreTxAfterSessionReset() - { - // Given - try ( Session session = neo4j.driver().session() ) - { - try ( Transaction tx = session.beginTransaction() ) - { - tx.run( "Return 1" ); - tx.success(); - } - - // When reset the state of this session - session.reset(); - - // Then can run more Tx - try ( Transaction tx = session.beginTransaction() ) - { - tx.run( "Return 2" ); - tx.success(); - } - } - } - - @SuppressWarnings( "deprecation" ) - @Test - public void shouldMarkTxAsFailedAndDisallowRunAfterSessionReset() - { - // Given - try ( Session session = neo4j.driver().session() ) - { - try ( Transaction tx = session.beginTransaction() ) - { - // When reset the state of this session - session.reset(); - // Then - tx.run( "Return 1" ); - fail( "Should not allow tx run as tx is already failed." ); - } - catch ( Exception e ) - { - assertThat( e.getMessage(), startsWith( "Cannot run more statements in this transaction" ) ); - } - } - } - - @SuppressWarnings( "deprecation" ) - @Test - public void shouldAllowMoreTxAfterSessionResetInTx() - { - // Given - try ( Session session = neo4j.driver().session() ) - { - try ( Transaction tx = session.beginTransaction() ) - { - // When reset the state of this session - session.reset(); - } - - // Then can run more Tx - try ( Transaction tx = session.beginTransaction() ) - { - tx.run( "Return 2" ); - tx.success(); - } - } - } - @Test public void executeReadTxInReadSession() { @@ -892,80 +610,6 @@ public void writeTxRolledBackWhenMarkedAsSuccessAndThrowsException() } } - @Test - public void resetShouldStopQueryWaitingForALock() throws Exception - { - assumeServerIs31OrLater(); - testResetOfQueryWaitingForLock( new NodeIdUpdater() - { - @Override - void performUpdate( Driver driver, int nodeId, int newNodeId, - AtomicReference usedSessionRef, CountDownLatch latchToWait ) throws Exception - { - try ( Session session = driver.session() ) - { - usedSessionRef.set( session ); - latchToWait.await(); - StatementResult result = updateNodeId( session, nodeId, newNodeId ); - result.consume(); - } - } - } ); - } - - @Test - public void resetShouldStopTransactionWaitingForALock() throws Exception - { - assumeServerIs31OrLater(); - testResetOfQueryWaitingForLock( new NodeIdUpdater() - { - @Override - public void performUpdate( Driver driver, int nodeId, int newNodeId, - AtomicReference usedSessionRef, CountDownLatch latchToWait ) throws Exception - { - try ( Session session = neo4j.driver().session(); - Transaction tx = session.beginTransaction() ) - { - usedSessionRef.set( session ); - latchToWait.await(); - StatementResult result = updateNodeId( tx, nodeId, newNodeId ); - result.consume(); - } - } - } ); - } - - @Test - public void resetShouldStopWriteTransactionWaitingForALock() throws Exception - { - assumeServerIs31OrLater(); - final AtomicInteger invocationsOfWork = new AtomicInteger(); - - testResetOfQueryWaitingForLock( new NodeIdUpdater() - { - @Override - public void performUpdate( Driver driver, int nodeId, int newNodeId, - AtomicReference usedSessionRef, CountDownLatch latchToWait ) throws Exception - { - try ( Session session = driver.session() ) - { - usedSessionRef.set( session ); - latchToWait.await(); - - session.writeTransaction( tx -> - { - invocationsOfWork.incrementAndGet(); - StatementResult result = updateNodeId( tx, nodeId, newNodeId ); - result.consume(); - return null; - } ); - } - } - } ); - - assertEquals( 1, invocationsOfWork.get() ); - } - @Test public void transactionRunShouldFailOnDeadlocks() throws Exception { @@ -1624,11 +1268,32 @@ public void shouldConsumeWithFailure() } } - private void assumeServerIs31OrLater() + @Test + public void shouldNotAllowStartingMultipleTransactions() { - ServerVersion serverVersion = ServerVersion.version( neo4j.driver() ); - assumeTrue( "Ignored on `" + serverVersion + "`", - serverVersion.greaterThanOrEqual( v3_1_0 ) ); + try ( Session session = neo4j.driver().session() ) + { + Transaction tx = session.beginTransaction(); + assertNotNull( tx ); + + for ( int i = 0; i < 3; i++ ) + { + try + { + session.beginTransaction(); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.getMessage(), + containsString( "You cannot begin a transaction on a session with an open transaction" ) ); + } + } + + tx.close(); + + assertNotNull( session.beginTransaction() ); + } } private void testExecuteReadTx( AccessMode sessionMode ) @@ -1719,43 +1384,6 @@ private void testTxRollbackWhenFunctionThrows( AccessMode sessionMode ) } - @SuppressWarnings( "deprecation" ) - private void testResetOfQueryWaitingForLock( NodeIdUpdater nodeIdUpdater ) throws Exception - { - int nodeId = 42; - int newNodeId1 = 4242; - int newNodeId2 = 424242; - - createNodeWithId( nodeId ); - - CountDownLatch nodeLocked = new CountDownLatch( 1 ); - AtomicReference otherSessionRef = new AtomicReference<>(); - - try ( Session session = neo4j.driver().session(); - Transaction tx = session.beginTransaction() ) - { - Future txResult = nodeIdUpdater.update( nodeId, newNodeId1, otherSessionRef, nodeLocked ); - - StatementResult result = updateNodeId( tx, nodeId, newNodeId2 ); - result.consume(); - tx.success(); - - nodeLocked.countDown(); - // give separate thread some time to block on a lock - Thread.sleep( 2_000 ); - otherSessionRef.get().reset(); - - assertTransactionTerminated( txResult ); - } - - try ( Session session = neo4j.driver().session() ) - { - StatementResult result = session.run( "MATCH (n) RETURN n.id AS id" ); - int value = result.single().get( "id" ).asInt(); - assertEquals( newNodeId2, value ); - } - } - private Driver newDriverWithoutRetries() { return newDriverWithFixedRetries( 0 ); @@ -1818,20 +1446,6 @@ private static StatementResult updateNodeId( StatementRunner statementRunner, in parameters( "currentId", currentId, "newId", newId ) ); } - private static void assertTransactionTerminated( Future work ) throws Exception - { - try - { - work.get( 20, TimeUnit.SECONDS ); - fail( "Exception expected" ); - } - catch ( ExecutionException e ) - { - assertThat( e.getCause(), instanceOf( TransientException.class ) ); - assertThat( e.getCause().getMessage(), startsWith( "The transaction has been terminated" ) ); - } - } - private static boolean assertOneOfTwoFuturesFailWithDeadlock( Future future1, Future future2 ) throws Exception { @@ -1887,22 +1501,6 @@ private static void await( CountDownLatch latch ) } } - private abstract class NodeIdUpdater - { - final Future update( int nodeId, int newNodeId, AtomicReference usedSessionRef, - CountDownLatch latchToWait ) - { - return executeInDifferentThread( () -> - { - performUpdate( neo4j.driver(), nodeId, newNodeId, usedSessionRef, latchToWait ); - return null; - } ); - } - - abstract void performUpdate( Driver driver, int nodeId, int newNodeId, - AtomicReference usedSessionRef, CountDownLatch latchToWait ) throws Exception; - } - private static class ThrowingWork implements TransactionWork { final String query; diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionResetIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionResetIT.java new file mode 100644 index 0000000000..ff66de026d --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionResetIT.java @@ -0,0 +1,922 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.v1.integration; + +import org.hamcrest.CoreMatchers; +import org.hamcrest.MatcherAssert; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.URI; +import java.nio.channels.ClosedChannelException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.neo4j.driver.internal.util.ServerVersion; +import org.neo4j.driver.v1.Driver; +import org.neo4j.driver.v1.Session; +import org.neo4j.driver.v1.StatementResult; +import org.neo4j.driver.v1.StatementRunner; +import org.neo4j.driver.v1.Transaction; +import org.neo4j.driver.v1.exceptions.ClientException; +import org.neo4j.driver.v1.exceptions.Neo4jException; +import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; +import org.neo4j.driver.v1.exceptions.TransientException; +import org.neo4j.driver.v1.util.TestNeo4j; + +import static java.util.Collections.newSetFromMap; +import static java.util.concurrent.CompletableFuture.runAsync; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.stream.IntStream.range; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.startsWith; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.junit.Assume.assumeTrue; +import static org.neo4j.driver.internal.util.ServerVersion.v3_1_0; +import static org.neo4j.driver.v1.Values.parameters; +import static org.neo4j.driver.v1.util.DaemonThreadFactory.daemon; +import static org.neo4j.driver.v1.util.Neo4jRunner.HOME_DIR; +import static org.neo4j.driver.v1.util.Neo4jSettings.IMPORT_DIR; +import static org.neo4j.driver.v1.util.Neo4jSettings.TEST_SETTINGS; +import static org.neo4j.driver.v1.util.TestUtil.activeQueryCount; +import static org.neo4j.driver.v1.util.TestUtil.activeQueryNames; +import static org.neo4j.driver.v1.util.TestUtil.awaitCondition; + +@SuppressWarnings( "deprecation" ) +public class SessionResetIT +{ + private static final int CSV_FILE_SIZE = 10_000; + private static final int LOAD_CSV_BATCH_SIZE = 10; + + private static final String SHORT_QUERY_1 = "CREATE (n:Node {name: 'foo', occupation: 'bar'})"; + private static final String SHORT_QUERY_2 = "MATCH (n:Node {name: 'foo'}) RETURN count(n)"; + private static final String LONG_QUERY = "UNWIND range(0, 10000000) AS i CREATE (n:Node {idx: i}) DELETE n"; + private static final String LONG_PERIODIC_COMMIT_QUERY_TEMPLATE = + "USING PERIODIC COMMIT 1 " + + "LOAD CSV FROM '%s' AS line " + + "UNWIND range(1, " + LOAD_CSV_BATCH_SIZE + ") AS index " + + "CREATE (n:Node {id: index, name: line[0], occupation: line[1]})"; + + private static final int STRESS_TEST_THREAD_COUNT = Runtime.getRuntime().availableProcessors() * 2; + private static final long STRESS_TEST_DURATION_MS = SECONDS.toMillis( 5 ); + private static final String[] STRESS_TEST_QUERIES = {SHORT_QUERY_1, SHORT_QUERY_2, LONG_QUERY}; + + @Rule + public final TestNeo4j neo4j = new TestNeo4j(); + + private ExecutorService executor; + + @Before + public void setUp() + { + executor = Executors.newCachedThreadPool( daemon( getClass().getSimpleName() + "-thread" ) ); + } + + @After + public void tearDown() + { + if ( executor != null ) + { + executor.shutdownNow(); + } + } + + @Test + public void shouldTerminateAutoCommitQuery() throws Exception + { + testQueryTermination( LONG_QUERY, true ); + } + + @Test + public void shouldTerminateQueryInExplicitTransaction() throws Exception + { + testQueryTermination( LONG_QUERY, false ); + } + + /** + * It is currently unsafe to terminate periodic commit query because it'll then be half-committed. + */ + @Test + public void shouldNotTerminatePeriodicCommitQuery() throws Exception + { + Future queryResult = runQueryInDifferentThreadAndResetSession( longPeriodicCommitQuery(), true ); + + try + { + queryResult.get( 1, MINUTES ); + fail( "Exception expected" ); + } + catch ( ExecutionException e ) + { + assertThat( e.getCause(), instanceOf( Neo4jException.class ) ); + } + awaitNoActiveQueries(); + + assertEquals( CSV_FILE_SIZE * LOAD_CSV_BATCH_SIZE, countNodes() ); + } + + @Test + public void shouldTerminateAutoCommitQueriesRandomly() throws Exception + { + testRandomQueryTermination( true ); + } + + @Test + public void shouldTerminateQueriesInExplicitTransactionsRandomly() throws Exception + { + testRandomQueryTermination( false ); + } + + @Test + public void shouldNotAllowBeginTxIfResetFailureIsNotConsumed() throws Throwable + { + neo4j.ensureProcedures( "longRunningStatement.jar" ); + + try ( Session session = neo4j.driver().session() ) + { + Transaction tx1 = session.beginTransaction(); + + tx1.run( "CALL test.driver.longRunningStatement({seconds})", + parameters( "seconds", 10 ) ); + + awaitActiveQueriesToContain( "CALL test.driver.longRunningStatement" ); + session.reset(); + + try + { + session.beginTransaction(); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.getMessage(), + containsString( "You cannot begin a transaction on a session with an open transaction" ) ); + } + + try + { + tx1.run( "RETURN 1" ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.getMessage(), + containsString( "Cannot run more statements in this transaction, it has been terminated" ) ); + } + } + } + + @Test + public void shouldThrowExceptionOnCloseIfResetFailureIsNotConsumed() throws Throwable + { + neo4j.ensureProcedures( "longRunningStatement.jar" ); + + Session session = neo4j.driver().session(); + session.run( "CALL test.driver.longRunningStatement({seconds})", + parameters( "seconds", 10 ) ); + + awaitActiveQueriesToContain( "CALL test.driver.longRunningStatement" ); + session.reset(); + + try + { + session.close(); + fail( "Exception expected" ); + } + catch ( Neo4jException e ) + { + assertThat( e.getMessage(), containsString( "The transaction has been terminated" ) ); + } + } + + @Test + public void shouldBeAbleToBeginTxAfterResetFailureIsConsumed() throws Throwable + { + neo4j.ensureProcedures( "longRunningStatement.jar" ); + + try ( Session session = neo4j.driver().session() ) + { + Transaction tx1 = session.beginTransaction(); + + StatementResult procedureResult = tx1.run( "CALL test.driver.longRunningStatement({seconds})", + parameters( "seconds", 10 ) ); + + awaitActiveQueriesToContain( "CALL test.driver.longRunningStatement" ); + session.reset(); + + try + { + procedureResult.consume(); + fail( "Should procedure throw an exception as we interrupted procedure call" ); + } + catch ( Neo4jException e ) + { + assertThat( e.getMessage(), containsString( "The transaction has been terminated" ) ); + } + finally + { + tx1.close(); + } + + try ( Transaction tx2 = session.beginTransaction() ) + { + tx2.run( "CREATE (n:FirstNode)" ); + tx2.success(); + } + + StatementResult result = session.run( "MATCH (n) RETURN count(n)" ); + long nodes = result.single().get( "count(n)" ).asLong(); + MatcherAssert.assertThat( nodes, equalTo( 1L ) ); + } + } + + @Test + public void shouldKillLongRunningStatement() throws Throwable + { + neo4j.ensureProcedures( "longRunningStatement.jar" ); + // Given + int executionTimeout = 10; // 10s + final int killTimeout = 1; // 1s + long startTime = -1, endTime; + + try ( Session session = neo4j.driver().session() ) + { + StatementResult result = session.run( "CALL test.driver.longRunningStatement({seconds})", + parameters( "seconds", executionTimeout ) ); + + resetSessionAfterTimeout( session, killTimeout ); + + // When + startTime = System.currentTimeMillis(); + result.consume();// blocking to run the statement + + fail( "Should have got an exception about statement get killed." ); + } + catch ( Neo4jException e ) + { + endTime = System.currentTimeMillis(); + assertTrue( startTime > 0 ); + assertTrue( endTime - startTime > killTimeout * 1000 ); // get reset by session.reset + assertTrue( endTime - startTime < executionTimeout * 1000 / 2 ); // finished before execution finished + } + catch ( Exception e ) + { + fail( "Should be a Neo4jException" ); + } + } + + @Test + public void shouldKillLongStreamingResult() throws Throwable + { + neo4j.ensureProcedures( "longRunningStatement.jar" ); + // Given + int executionTimeout = 10; // 10s + final int killTimeout = 1; // 1s + long startTime = -1, endTime; + int recordCount = 0; + + try ( Session session = neo4j.driver().session() ) + { + StatementResult result = session.run( "CALL test.driver.longStreamingResult({seconds})", + parameters( "seconds", executionTimeout ) ); + + resetSessionAfterTimeout( session, killTimeout ); + + // When + startTime = System.currentTimeMillis(); + while ( result.hasNext() ) + { + result.next(); + recordCount++; + } + + fail( "Should have got an exception about streaming get killed." ); + } + catch ( ClientException e ) + { + endTime = System.currentTimeMillis(); + assertThat( e.code(), equalTo( "Neo.ClientError.Procedure.ProcedureCallFailed" ) ); + assertThat( recordCount, greaterThan( 1 ) ); + + assertTrue( startTime > 0 ); + assertTrue( endTime - startTime > killTimeout * 1000 ); // get reset by session.reset + assertTrue( endTime - startTime < executionTimeout * 1000 / 2 ); // finished before execution finished + } + } + + private void resetSessionAfterTimeout( Session session, int timeout ) + { + executor.submit( () -> + { + try + { + Thread.sleep( timeout * 1000 ); // let the statement executing for timeout seconds + } + catch ( InterruptedException ignore ) + { + } + finally + { + session.reset(); // reset the session after timeout + } + } ); + } + + @Test + public void shouldAllowMoreStatementAfterSessionReset() + { + // Given + try ( Session session = neo4j.driver().session() ) + { + + session.run( "RETURN 1" ).consume(); + + // When reset the state of this session + session.reset(); + + // Then can run successfully more statements without any error + session.run( "RETURN 2" ).consume(); + } + } + + @Test + public void shouldAllowMoreTxAfterSessionReset() + { + // Given + try ( Session session = neo4j.driver().session() ) + { + try ( Transaction tx = session.beginTransaction() ) + { + tx.run( "RETURN 1" ); + tx.success(); + } + + // When reset the state of this session + session.reset(); + + // Then can run more Tx + try ( Transaction tx = session.beginTransaction() ) + { + tx.run( "RETURN 2" ); + tx.success(); + } + } + } + + @SuppressWarnings( "deprecation" ) + @Test + public void shouldMarkTxAsFailedAndDisallowRunAfterSessionReset() + { + // Given + try ( Session session = neo4j.driver().session() ) + { + try ( Transaction tx = session.beginTransaction() ) + { + // When reset the state of this session + session.reset(); + // Then + tx.run( "RETURN 1" ); + fail( "Should not allow tx run as tx is already failed." ); + } + catch ( Exception e ) + { + assertThat( e.getMessage(), startsWith( "Cannot run more statements in this transaction" ) ); + } + } + } + + @SuppressWarnings( "deprecation" ) + @Test + public void shouldAllowMoreTxAfterSessionResetInTx() + { + // Given + try ( Session session = neo4j.driver().session() ) + { + try ( Transaction ignore = session.beginTransaction() ) + { + // When reset the state of this session + session.reset(); + } + + // Then can run more Tx + try ( Transaction tx = session.beginTransaction() ) + { + tx.run( "RETURN 2" ); + tx.success(); + } + } + } + + @Test + public void resetShouldStopQueryWaitingForALock() throws Exception + { + // 3.1+ neo4j supports termination of queries that wait for a lock + assumeServerIs31OrLater(); + + testResetOfQueryWaitingForLock( new NodeIdUpdater() + { + @Override + void performUpdate( Driver driver, int nodeId, int newNodeId, + AtomicReference usedSessionRef, CountDownLatch latchToWait ) throws Exception + { + try ( Session session = driver.session() ) + { + usedSessionRef.set( session ); + latchToWait.await(); + StatementResult result = updateNodeId( session, nodeId, newNodeId ); + result.consume(); + } + } + } ); + } + + @Test + public void resetShouldStopTransactionWaitingForALock() throws Exception + { + // 3.1+ neo4j supports termination of queries that wait for a lock + assumeServerIs31OrLater(); + + testResetOfQueryWaitingForLock( new NodeIdUpdater() + { + @Override + public void performUpdate( Driver driver, int nodeId, int newNodeId, + AtomicReference usedSessionRef, CountDownLatch latchToWait ) throws Exception + { + try ( Session session = neo4j.driver().session(); + Transaction tx = session.beginTransaction() ) + { + usedSessionRef.set( session ); + latchToWait.await(); + StatementResult result = updateNodeId( tx, nodeId, newNodeId ); + result.consume(); + } + } + } ); + } + + @Test + public void resetShouldStopWriteTransactionWaitingForALock() throws Exception + { + // 3.1+ neo4j supports termination of queries that wait for a lock + assumeServerIs31OrLater(); + + AtomicInteger invocationsOfWork = new AtomicInteger(); + + testResetOfQueryWaitingForLock( new NodeIdUpdater() + { + @Override + public void performUpdate( Driver driver, int nodeId, int newNodeId, + AtomicReference usedSessionRef, CountDownLatch latchToWait ) throws Exception + { + try ( Session session = driver.session() ) + { + usedSessionRef.set( session ); + latchToWait.await(); + + session.writeTransaction( tx -> + { + invocationsOfWork.incrementAndGet(); + StatementResult result = updateNodeId( tx, nodeId, newNodeId ); + result.consume(); + return null; + } ); + } + } + } ); + + assertEquals( 1, invocationsOfWork.get() ); + } + + @Test + public void shouldBeAbleToRunMoreStatementsAfterResetOnNoErrorState() + { + try ( Session session = neo4j.driver().session() ) + { + // Given + session.reset(); + + // When + Transaction tx = session.beginTransaction(); + tx.run( "CREATE (n:FirstNode)" ); + tx.success(); + tx.close(); + + // Then the outcome of both statements should be visible + StatementResult result = session.run( "MATCH (n) RETURN count(n)" ); + long nodes = result.single().get( "count(n)" ).asLong(); + assertThat( nodes, equalTo( 1L ) ); + } + } + + @Test + public void shouldHandleResetBeforeRun() + { + try ( Session session = neo4j.driver().session(); + Transaction tx = session.beginTransaction() ) + { + session.reset(); + + try + { + tx.run( "CREATE (n:FirstNode)" ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.getMessage(), + containsString( "Cannot run more statements in this transaction, it has been terminated" ) ); + } + } + } + + @Test + public void shouldHandleResetFromMultipleThreads() throws Throwable + { + Session session = neo4j.driver().session(); + + CountDownLatch beforeCommit = new CountDownLatch( 1 ); + CountDownLatch afterReset = new CountDownLatch( 1 ); + + executor.submit( () -> + { + try ( Transaction tx1 = session.beginTransaction() ) + { + tx1.run( "CREATE (n:FirstNode)" ); + beforeCommit.countDown(); + afterReset.await(); + } + + try ( Transaction tx2 = session.beginTransaction() ) + { + tx2.run( "CREATE (n:FirstNode)" ); + tx2.success(); + } + + return null; + } ); + + executor.submit( () -> + { + beforeCommit.await(); + session.reset(); + afterReset.countDown(); + return null; + } ); + + executor.shutdown(); + executor.awaitTermination( 10, SECONDS ); + + // Then the outcome of both statements should be visible + StatementResult result = session.run( "MATCH (n) RETURN count(n)" ); + long nodes = result.single().get( "count(n)" ).asLong(); + assertThat( nodes, equalTo( 1L ) ); + } + + private void testResetOfQueryWaitingForLock( NodeIdUpdater nodeIdUpdater ) throws Exception + { + int nodeId = 42; + int newNodeId1 = 4242; + int newNodeId2 = 424242; + + createNodeWithId( nodeId ); + + CountDownLatch nodeLocked = new CountDownLatch( 1 ); + AtomicReference otherSessionRef = new AtomicReference<>(); + + try ( Session session = neo4j.driver().session(); + Transaction tx = session.beginTransaction() ) + { + Future txResult = nodeIdUpdater.update( nodeId, newNodeId1, otherSessionRef, nodeLocked ); + + StatementResult result = updateNodeId( tx, nodeId, newNodeId2 ); + result.consume(); + tx.success(); + + nodeLocked.countDown(); + // give separate thread some time to block on a lock + Thread.sleep( 2_000 ); + otherSessionRef.get().reset(); + + assertTransactionTerminated( txResult ); + } + + try ( Session session = neo4j.driver().session() ) + { + StatementResult result = session.run( "MATCH (n) RETURN n.id AS id" ); + int value = result.single().get( "id" ).asInt(); + assertEquals( newNodeId2, value ); + } + } + + private void createNodeWithId( int id ) + { + try ( Session session = neo4j.driver().session() ) + { + session.run( "CREATE (n {id: {id}})", parameters( "id", id ) ); + } + } + + private static StatementResult updateNodeId( StatementRunner statementRunner, int currentId, int newId ) + { + return statementRunner.run( "MATCH (n {id: {currentId}}) SET n.id = {newId}", + parameters( "currentId", currentId, "newId", newId ) ); + } + + private static void assertTransactionTerminated( Future work ) throws Exception + { + try + { + work.get( 20, TimeUnit.SECONDS ); + fail( "Exception expected" ); + } + catch ( ExecutionException e ) + { + assertThat( e.getCause(), CoreMatchers.instanceOf( TransientException.class ) ); + assertThat( e.getCause().getMessage(), startsWith( "The transaction has been terminated" ) ); + } + } + + private void testRandomQueryTermination( boolean autoCommit ) throws Exception + { + Set runningSessions = newSetFromMap( new ConcurrentHashMap<>() ); + AtomicBoolean stop = new AtomicBoolean(); + List> futures = new ArrayList<>(); + + for ( int i = 0; i < STRESS_TEST_THREAD_COUNT; i++ ) + { + futures.add( executor.submit( () -> + { + ThreadLocalRandom random = ThreadLocalRandom.current(); + while ( !stop.get() ) + { + runRandomQuery( autoCommit, random, runningSessions, stop ); + } + } ) ); + } + + long deadline = System.currentTimeMillis() + STRESS_TEST_DURATION_MS; + while ( !stop.get() ) + { + if ( System.currentTimeMillis() > deadline ) + { + stop.set( true ); + } + + resetAny( runningSessions ); + + MILLISECONDS.sleep( 30 ); + } + + awaitAll( futures ); + awaitNoActiveQueries(); + } + + private void runRandomQuery( boolean autoCommit, Random random, Set runningSessions, AtomicBoolean stop ) + { + try + { + Session session = neo4j.driver().session(); + runningSessions.add( session ); + try + { + String query = STRESS_TEST_QUERIES[random.nextInt( STRESS_TEST_QUERIES.length - 1 )]; + runQuery( session, query, autoCommit ); + } + finally + { + runningSessions.remove( session ); + session.close(); + } + } + catch ( Throwable error ) + { + if ( !stop.get() && !isAcceptable( error ) ) + { + stop.set( true ); + throw error; + } + // else it is fine to receive some errors from the driver because + // sessions are being reset concurrently by the main thread, driver can also be closed concurrently + } + } + + private void testQueryTermination( String query, boolean autoCommit ) throws Exception + { + Future queryResult = runQueryInDifferentThreadAndResetSession( query, autoCommit ); + + try + { + queryResult.get( 10, SECONDS ); + fail( "Exception expected" ); + } + catch ( ExecutionException e ) + { + assertThat( e.getCause(), instanceOf( Neo4jException.class ) ); + } + + awaitNoActiveQueries(); + } + + private Future runQueryInDifferentThreadAndResetSession( String query, boolean autoCommit ) + { + AtomicReference sessionRef = new AtomicReference<>(); + + Future queryResult = runAsync( () -> + { + Session session = neo4j.driver().session(); + sessionRef.set( session ); + runQuery( session, query, autoCommit ); + } ); + + awaitActiveQueriesToContain( query ); + + Session session = sessionRef.get(); + assertNotNull( session ); + session.reset(); + + return queryResult; + } + + private static void runQuery( Session session, String query, boolean autoCommit ) + { + if ( autoCommit ) + { + session.run( query ).consume(); + } + else + { + try ( Transaction tx = session.beginTransaction() ) + { + tx.run( query ); + tx.success(); + } + } + } + + private void awaitNoActiveQueries() + { + awaitCondition( () -> activeQueryCount( neo4j.driver() ) == 0 ); + } + + private void awaitActiveQueriesToContain( String value ) + { + awaitCondition( () -> + activeQueryNames( neo4j.driver() ).stream().anyMatch( query -> query.contains( value ) ) ); + } + + private long countNodes() + { + try ( Session session = neo4j.driver().session() ) + { + StatementResult result = session.run( "MATCH (n) RETURN count(n) AS result" ); + return result.single().get( 0 ).asLong(); + } + } + + private static void resetAny( Set sessions ) + { + sessions.stream().findAny().ifPresent( session -> + { + if ( sessions.remove( session ) ) + { + resetSafely( session ); + } + } ); + } + + private static void resetSafely( Session session ) + { + try + { + if ( session.isOpen() ) + { + session.reset(); + } + } + catch ( ClientException e ) + { + if ( session.isOpen() ) + { + throw e; + } + // else this thread lost race with close and it's fine + } + } + + private static boolean isAcceptable( Throwable error ) + { + // get the root cause + while ( error.getCause() != null ) + { + error = error.getCause(); + } + + return isTransactionTerminatedException( error ) || + error instanceof ServiceUnavailableException || + error instanceof ClientException || + error instanceof ClosedChannelException; + } + + private static boolean isTransactionTerminatedException( Throwable error ) + { + return error instanceof TransientException && + error.getMessage().startsWith( "The transaction has been terminated" ); + } + + private static String longPeriodicCommitQuery() + { + URI fileUri = createTmpCsvFile(); + return String.format( LONG_PERIODIC_COMMIT_QUERY_TEMPLATE, fileUri ); + } + + private static URI createTmpCsvFile() + { + try + { + Path importDir = Paths.get( HOME_DIR, TEST_SETTINGS.propertiesMap().get( IMPORT_DIR ) ); + Path csvFile = Files.createTempFile( importDir, "test", ".csv" ); + Iterable lines = range( 0, CSV_FILE_SIZE ).mapToObj( i -> "Foo-" + i + ", Bar-" + i )::iterator; + return URI.create( "file:///" + Files.write( csvFile, lines ).getFileName() ); + } + catch ( IOException e ) + { + throw new UncheckedIOException( e ); + } + } + + private static void awaitAll( List> futures ) throws Exception + { + for ( Future future : futures ) + { + assertNull( future.get( 1, MINUTES ) ); + } + } + + private void assumeServerIs31OrLater() + { + ServerVersion serverVersion = ServerVersion.version( neo4j.driver() ); + assumeTrue( "Ignored on `" + serverVersion + "`", serverVersion.greaterThanOrEqual( v3_1_0 ) ); + } + + private abstract class NodeIdUpdater + { + final Future update( int nodeId, int newNodeId, AtomicReference usedSessionRef, + CountDownLatch latchToWait ) + { + return executor.submit( () -> + { + performUpdate( neo4j.driver(), nodeId, newNodeId, usedSessionRef, latchToWait ); + return null; + } ); + } + + abstract void performUpdate( Driver driver, int nodeId, int newNodeId, + AtomicReference usedSessionRef, CountDownLatch latchToWait ) throws Exception; + } +} diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java index 18ea98791e..c9ebc981cd 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java @@ -653,15 +653,16 @@ public void shouldFailWhenListTransformationFunctionFails() } @Test - public void shouldFailWhenServerIsRestarted() + public void shouldFailToCommitWhenServerIsRestarted() { Transaction tx = await( session.beginTransactionAsync() ); + await( tx.runAsync( "CREATE ()" ) ); + neo4j.killDb(); try { - await( tx.runAsync( "CREATE ()" ) ); await( tx.commitAsync() ); fail( "Exception expected" ); } @@ -806,9 +807,8 @@ public void shouldFailToCommitAfterTermination() } catch ( ClientException e ) { - assertEquals( "Can't commit, transaction has been terminated by `Session#reset()`", e.getMessage() ); + assertEquals( "Can't commit, transaction has been terminated", e.getMessage() ); } - assertFalse( tx.isOpen() ); } @Test @@ -924,8 +924,7 @@ public void shouldFailToRunQueryWhenTerminated() } catch ( ClientException e ) { - assertEquals( "Cannot run more statements in this transaction, it has been terminated by `Session#reset()`", - e.getMessage() ); + assertEquals( "Cannot run more statements in this transaction, it has been terminated", e.getMessage() ); } } @@ -1283,6 +1282,32 @@ public void shouldAllowUsingBlockingApiInCommonPoolWhenChaining() assertEquals( 1, countNodes( 42 ) ); } + @Test + public void shouldBePossibleToRunMoreTransactionsAfterOneIsTerminated() + { + Transaction tx1 = await( session.beginTransactionAsync() ); + ((ExplicitTransaction) tx1).markTerminated(); + + try + { + // commit should fail, make session forget about this transaction and release the connection to the pool + await( tx1.commitAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertEquals( "Can't commit, transaction has been terminated", e.getMessage() ); + } + + await( session.beginTransactionAsync() + .thenCompose( tx -> tx.runAsync( "CREATE (:Node {id: 42})" ) + .thenCompose( StatementResultCursor::consumeAsync ) + .thenApply( ignore -> tx ) + ).thenCompose( Transaction::commitAsync ) ); + + assertEquals( 1, countNodes( 42 ) ); + } + private int countNodes( Object id ) { StatementResult result = session.run( "MATCH (n:Node {id: $id}) RETURN count(n)", parameters( "id", id ) ); diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionIT.java index b4da5c71da..be4741a759 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionIT.java @@ -18,15 +18,20 @@ */ package org.neo4j.driver.v1.integration; +import io.netty.channel.Channel; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import org.neo4j.driver.internal.cluster.RoutingSettings; +import org.neo4j.driver.internal.util.ChannelTrackingDriverFactory; +import org.neo4j.driver.internal.util.Clock; +import org.neo4j.driver.v1.AuthTokens; +import org.neo4j.driver.v1.Config; +import org.neo4j.driver.v1.Driver; +import org.neo4j.driver.v1.GraphDatabase; import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Session; import org.neo4j.driver.v1.StatementResult; @@ -34,17 +39,21 @@ import org.neo4j.driver.v1.Value; import org.neo4j.driver.v1.exceptions.ClientException; import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; +import org.neo4j.driver.v1.exceptions.TransientException; +import org.neo4j.driver.v1.util.StubServer; import org.neo4j.driver.v1.util.TestNeo4jSession; import org.neo4j.driver.v1.util.TestUtil; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; +import static org.neo4j.driver.internal.retry.RetrySettings.DEFAULT; public class TransactionIT { @@ -53,8 +62,6 @@ public class TransactionIT @Rule public TestNeo4jSession session = new TestNeo4jSession(); - private Transaction globalTx; - @Test public void shouldRunAndCommit() throws Throwable { @@ -224,95 +231,6 @@ public void shouldHandleNullMapParameters() throws Throwable // Then it wasn't the end of the world as we know it } - @SuppressWarnings( "deprecation" ) - @Test - public void shouldBeAbleToRunMoreStatementsAfterResetOnNoErrorState() throws Throwable - { - // Given - session.reset(); - - // When - Transaction tx = session.beginTransaction(); - tx.run( "CREATE (n:FirstNode)" ); - tx.success(); - tx.close(); - - // Then the outcome of both statements should be visible - StatementResult result = session.run( "MATCH (n) RETURN count(n)" ); - long nodes = result.single().get( "count(n)" ).asLong(); - assertThat( nodes, equalTo( 1L ) ); - } - - @SuppressWarnings( "deprecation" ) - @Test - public void shouldHandleResetBeforeRun() throws Throwable - { - // Expect - exception.expect( ClientException.class ); - exception.expectMessage( "Cannot run more statements in this transaction, it has been terminated by" ); - // When - Transaction tx = session.beginTransaction(); - session.reset(); - tx.run( "CREATE (n:FirstNode)" ); - } - - @SuppressWarnings( "deprecation" ) - @Test - public void shouldHandleResetFromMultipleThreads() throws Throwable - { - // When - ExecutorService runner = Executors.newFixedThreadPool( 2 ); - runner.execute( new Runnable() - - { - @Override - public void run() - { - globalTx = session.beginTransaction(); - globalTx.run( "CREATE (n:FirstNode)" ); - try - { - Thread.sleep( 1000 ); - } - catch ( InterruptedException e ) - { - throw new AssertionError( e ); - } - - globalTx = session.beginTransaction(); - globalTx.run( "CREATE (n:FirstNode)" ); - globalTx.success(); - globalTx.close(); - - } - } ); - runner.execute( new Runnable() - - { - @Override - public void run() - { - try - { - Thread.sleep( 500 ); - } - catch ( InterruptedException e ) - { - throw new AssertionError( e ); - } - - session.reset(); - } - } ); - - runner.awaitTermination( 5, TimeUnit.SECONDS ); - - // Then the outcome of both statements should be visible - StatementResult result = session.run( "MATCH (n) RETURN count(n)" ); - long nodes = result.single().get( "count(n)" ).asLong(); - assertThat( nodes, equalTo( 1L ) ); - } - @Test public void shouldRollBackTxIfErrorWithoutConsume() throws Throwable { @@ -391,7 +309,7 @@ public void shouldPropagateFailureFromSummary() @Test public void shouldBeResponsiveToThreadInterruptWhenWaitingForResult() throws Exception { - try ( Session otherSession = this.session.driver().session() ) + try ( Session otherSession = session.driver().session() ) { session.run( "CREATE (:Person {name: 'Beta Ray Bill'})" ).consume(); @@ -425,7 +343,7 @@ public void shouldBeResponsiveToThreadInterruptWhenWaitingForResult() throws Exc @Test public void shouldBeResponsiveToThreadInterruptWhenWaitingForCommit() throws Exception { - try ( Session otherSession = this.session.driver().session() ) + try ( Session otherSession = session.driver().session() ) { session.run( "CREATE (:Person {name: 'Beta Ray Bill'})" ).consume(); @@ -458,4 +376,105 @@ public void shouldBeResponsiveToThreadInterruptWhenWaitingForCommit() throws Exc } } } + + @Test + public void shouldThrowWhenConnectionKilledDuringTransaction() + { + testFailWhenConnectionKilledDuringTransaction( false ); + } + + @Test + public void shouldThrowWhenConnectionKilledDuringTransactionMarkedForSuccess() + { + testFailWhenConnectionKilledDuringTransaction( true ); + } + + @Test + public void shouldThrowCommitError() throws Exception + { + testTxCloseErrorPropagation( "commit_error.script", true, "Unable to commit" ); + } + + @Test + public void shouldThrowRollbackError() throws Exception + { + testTxCloseErrorPropagation( "rollback_error.script", false, "Unable to rollback" ); + } + + private void testFailWhenConnectionKilledDuringTransaction( boolean markForSuccess ) + { + ChannelTrackingDriverFactory factory = new ChannelTrackingDriverFactory( 1, Clock.SYSTEM ); + RoutingSettings instance = new RoutingSettings( 1, 0 ); + Config config = Config.build().withLogging( DEV_NULL_LOGGING ).toConfig(); + + try ( Driver driver = factory.newInstance( session.uri(), session.authToken(), instance, DEFAULT, config ) ) + { + try ( Session session = driver.session(); + Transaction tx = session.beginTransaction() ) + { + tx.run( "CREATE (:MyNode {id: 1})" ).consume(); + + if ( markForSuccess ) + { + tx.success(); + } + + // kill all network channels + for ( Channel channel : factory.channels() ) + { + channel.close().syncUninterruptibly(); + } + + tx.run( "CREATE (:MyNode {id: 1})" ).consume(); + fail( "Exception expected" ); + } + catch ( ServiceUnavailableException e ) + { + assertThat( e.getMessage(), containsString( "Connection to the database terminated" ) ); + } + } + + assertEquals( 0, session.run( "MATCH (n:MyNode {id: 1}) RETURN count(n)" ).single().get( 0 ).asInt() ); + } + + private static void testTxCloseErrorPropagation( String script, boolean commit, String expectedErrorMessage ) + throws Exception + { + StubServer server = StubServer.start( script, 9001 ); + try + { + Config config = Config.build().withLogging( DEV_NULL_LOGGING ).withoutEncryption().toConfig(); + try ( Driver driver = GraphDatabase.driver( "bolt://localhost:9001", AuthTokens.none(), config ); + Session session = driver.session() ) + { + Transaction tx = session.beginTransaction(); + StatementResult result = tx.run( "CREATE (n {name:'Alice'}) RETURN n.name AS name" ); + assertEquals( "Alice", result.single().get( "name" ).asString() ); + + if ( commit ) + { + tx.success(); + } + else + { + tx.failure(); + } + + try + { + tx.close(); + fail( "Exception expected" ); + } + catch ( TransientException e ) + { + assertEquals( "Neo.TransientError.General.DatabaseUnavailable", e.code() ); + assertEquals( expectedErrorMessage, e.getMessage() ); + } + } + } + finally + { + assertEquals( 0, server.exitStatus() ); + } + } } diff --git a/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java b/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java index 8634b421b8..db11cd18c7 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java +++ b/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java @@ -30,7 +30,9 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.BooleanSupplier; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ResponseHandler; @@ -39,9 +41,12 @@ import org.neo4j.driver.v1.StatementResult; import static java.util.Collections.emptyMap; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.stream.Collectors.toList; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; @@ -49,6 +54,8 @@ public final class TestUtil { + private static final long DEFAULT_WAIT_TIME_MS = MINUTES.toMillis( 1 ); + private TestUtil() { } @@ -84,7 +91,7 @@ public static > T await( U future ) { try { - return future.get( 5, MINUTES ); + return future.get( DEFAULT_WAIT_TIME_MS, MILLISECONDS ); } catch ( InterruptedException e ) { @@ -200,6 +207,50 @@ public static void interruptWhenInWaitingState( Thread thread ) } ); } + public static int activeQueryCount( Driver driver ) + { + return activeQueryNames( driver ).size(); + } + + public static List activeQueryNames( Driver driver ) + { + try ( Session session = driver.session() ) + { + return session.run( "CALL dbms.listQueries() YIELD query RETURN query" ) + .list() + .stream() + .map( record -> record.get( 0 ).asString() ) + .filter( query -> !query.contains( "dbms.listQueries" ) ) // do not include listQueries procedure + .collect( toList() ); + } + } + + public static void awaitCondition( BooleanSupplier condition ) + { + awaitCondition( condition, DEFAULT_WAIT_TIME_MS, MILLISECONDS ); + } + + public static void awaitCondition( BooleanSupplier condition, long value, TimeUnit unit ) + { + long deadline = System.currentTimeMillis() + unit.toMillis( value ); + while ( !condition.getAsBoolean() ) + { + if ( System.currentTimeMillis() > deadline ) + { + fail( "Condition was not met in time" ); + } + try + { + MILLISECONDS.sleep( 100 ); + } + catch ( InterruptedException e ) + { + Thread.currentThread().interrupt(); + fail( "Interrupted while waiting" ); + } + } + } + private static void setupSuccessfulPullAll( Connection connection, String statement ) { doAnswer( invocation -> diff --git a/driver/src/test/resources/commit_error.script b/driver/src/test/resources/commit_error.script new file mode 100644 index 0000000000..1c4ac21674 --- /dev/null +++ b/driver/src/test/resources/commit_error.script @@ -0,0 +1,18 @@ +!: AUTO INIT +!: AUTO RESET + +C: RUN "BEGIN" {} + PULL_ALL +S: SUCCESS {} + SUCCESS {} +C: RUN "CREATE (n {name:'Alice'}) RETURN n.name AS name" {} + PULL_ALL +S: SUCCESS {"fields": ["name"]} + RECORD ["Alice"] + SUCCESS {} +C: RUN "COMMIT" {} + PULL_ALL +S: FAILURE {"code": "Neo.TransientError.General.DatabaseUnavailable", "message": "Unable to commit"} + IGNORED +C: ACK_FAILURE +S: SUCCESS {} diff --git a/driver/src/test/resources/rollback_error.script b/driver/src/test/resources/rollback_error.script new file mode 100644 index 0000000000..637be70b2a --- /dev/null +++ b/driver/src/test/resources/rollback_error.script @@ -0,0 +1,18 @@ +!: AUTO INIT +!: AUTO RESET + +C: RUN "BEGIN" {} + PULL_ALL +S: SUCCESS {} + SUCCESS {} +C: RUN "CREATE (n {name:'Alice'}) RETURN n.name AS name" {} + PULL_ALL +S: SUCCESS {"fields": ["name"]} + RECORD ["Alice"] + SUCCESS {} +C: RUN "ROLLBACK" {} + PULL_ALL +S: FAILURE {"code": "Neo.TransientError.General.DatabaseUnavailable", "message": "Unable to rollback"} + IGNORED +C: ACK_FAILURE +S: SUCCESS {}