diff --git a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java index 9f89c93894..fe323e19c0 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java @@ -49,10 +49,13 @@ public class NetworkSession implements Session @Override public void run() { - if ( currentTransaction != null ) + synchronized ( NetworkSession.this ) { - lastBookmark = currentTransaction.bookmark(); - currentTransaction = null; + if ( currentTransaction != null ) + { + lastBookmark = currentTransaction.bookmark(); + currentTransaction = null; + } } } }; @@ -73,9 +76,9 @@ public StatementResult run( String statementText ) } @Override - public StatementResult run( String statementText, Map statementParameters ) + public StatementResult run( String statementText, Map statementParameters ) { - Value params = statementParameters == null ? Values.EmptyMap : value(statementParameters); + Value params = statementParameters == null ? Values.EmptyMap : value( statementParameters ); return run( statementText, params ); } @@ -97,21 +100,24 @@ public StatementResult run( Statement statement ) { ensureConnectionIsValidBeforeRunningSession(); InternalStatementResult cursor = new InternalStatementResult( connection, null, statement ); - connection.run( statement.text(), statement.parameters().asMap( Values.ofValue() ), cursor.runResponseCollector() ); + connection.run( statement.text(), statement.parameters().asMap( Values.ofValue() ), + cursor.runResponseCollector() ); connection.pullAll( cursor.pullAllResponseCollector() ); connection.flush(); return cursor; } - public void reset() + public synchronized void reset() { ensureSessionIsOpen(); ensureNoUnrecoverableError(); ensureConnectionIsOpen(); - if( currentTransaction != null ) + if ( currentTransaction != null ) { currentTransaction.markToClose(); + lastBookmark = currentTransaction.bookmark(); + currentTransaction = null; } connection.resetAsync(); } @@ -126,21 +132,24 @@ public boolean isOpen() public void close() { // Use atomic operation to protect from closing the connection twice (putting back to the pool twice). - if( !isOpen.compareAndSet( true, false ) ) + if ( !isOpen.compareAndSet( true, false ) ) { throw new ClientException( "This session has already been closed." ); } else { - if ( currentTransaction != null ) + synchronized ( this ) { - try - { - currentTransaction.close(); - } - catch ( Throwable e ) + if ( currentTransaction != null ) { - // Best-effort + try + { + currentTransaction.close(); + } + catch ( Throwable e ) + { + // Best-effort + } } } try @@ -167,7 +176,7 @@ public Transaction beginTransaction() } @Override - public Transaction beginTransaction( String bookmark ) + public synchronized Transaction beginTransaction( String bookmark ) { ensureConnectionIsValidBeforeOpeningTransaction(); currentTransaction = new ExplicitTransaction( connection, txCleanup, bookmark ); @@ -224,7 +233,7 @@ private void ensureConnectionIsValidBeforeOpeningTransaction() @Override protected void finalize() throws Throwable { - if( isOpen.compareAndSet( true, false ) ) + if ( isOpen.compareAndSet( true, false ) ) { logger.error( "Neo4j Session object leaked, please ensure that your application calls the `close` " + "method on Sessions before disposing of the objects.", null ); @@ -235,7 +244,7 @@ protected void finalize() throws Throwable private void ensureNoUnrecoverableError() { - if( connection.hasUnrecoverableErrors() ) + if ( connection.hasUnrecoverableErrors() ) { throw new ClientException( "Cannot run more statements in the current session as an unrecoverable error " + "has happened. Please close the current session and re-run your statement in a" + @@ -243,6 +252,7 @@ private void ensureNoUnrecoverableError() } } + //should be called from a synchronized block private void ensureNoOpenTransactionBeforeRunningSession() { if ( currentTransaction != null ) @@ -252,6 +262,7 @@ private void ensureNoOpenTransactionBeforeRunningSession() } } + //should be called from a synchronized block private void ensureNoOpenTransactionBeforeOpeningTransaction() { if ( currentTransaction != null ) @@ -273,12 +284,13 @@ private void ensureConnectionIsOpen() private void ensureSessionIsOpen() { - if( !isOpen() ) + if ( !isOpen() ) { throw new ClientException( "No more interaction with this session is allowed " + "as the current session is already closed or marked as closed. " + - "You get this error either because you have a bad reference to a session that has already be closed " + + "You get this error either because you have a bad reference to a session that has already be " + + "closed " + "or you are trying to reuse a session that you have called `reset` on it." ); } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/ConcurrencyGuardingConnection.java b/driver/src/main/java/org/neo4j/driver/internal/net/ConcurrencyGuardingConnection.java index 8d1a568d68..0f7644a963 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/ConcurrencyGuardingConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/ConcurrencyGuardingConnection.java @@ -208,9 +208,9 @@ public void resetAsync() } @Override - public boolean isInterrupted() + public boolean isAckFailureMuted() { - return delegate.isInterrupted(); + return delegate.isAckFailureMuted(); } private void markAsAvailable() diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java b/driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java index 43de6f0ab5..80e9b63fa7 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java @@ -46,7 +46,8 @@ public class SocketConnection implements Connection { private final Queue pendingMessages = new LinkedList<>(); private final SocketResponseHandler responseHandler; - private AtomicBoolean interrupted = new AtomicBoolean( false ); + private AtomicBoolean isInterrupted = new AtomicBoolean( false ); + private AtomicBoolean isAckFailureMuted = new AtomicBoolean( false ); private final Collector.InitCollector initCollector = new Collector.InitCollector(); private final SocketClient socket; @@ -115,6 +116,8 @@ public void sync() @Override public synchronized void flush() { + ensureNotInterrupted(); + try { socket.send( pendingMessages ); @@ -126,6 +129,29 @@ public synchronized void flush() } } + private void ensureNotInterrupted() + { + try + { + if( isInterrupted.get() ) + { + // receive each of it and throw error immediately + while ( responseHandler.collectorsWaiting() > 0 ) + { + receiveOne(); + } + } + } + catch ( Neo4jException e ) + { + throw new ClientException( + "An error has occurred due to the cancellation of executing a previous statement. " + + "You received this error probably because you did not consume the result immediately after " + + "running the statement which get reset in this session.", e ); + } + + } + private void receiveAll() { try @@ -159,6 +185,7 @@ private void assertNoServerFailure() { Neo4jException exception = responseHandler.serverFailure(); responseHandler.clearError(); + isInterrupted.set( false ); throw exception; } } @@ -182,6 +209,8 @@ else if ( e instanceof SocketTimeoutException ) private synchronized void queueMessage( Message msg, Collector collector ) { + ensureNotInterrupted(); + pendingMessages.add( msg ); responseHandler.appendResultCollector( collector ); } @@ -211,26 +240,26 @@ public boolean hasUnrecoverableErrors() } @Override - public void resetAsync() + public synchronized void resetAsync() { - if( interrupted.compareAndSet( false, true ) ) + queueMessage( RESET, new Collector.ResetCollector( new Runnable() { - queueMessage( RESET, new Collector.ResetCollector( new Runnable() + @Override + public void run() { - @Override - public void run() - { - interrupted.set( false ); - } - } ) ); - flush(); - } + isInterrupted.set( false ); + isAckFailureMuted.set( false ); + } + } ) ); + flush(); + isInterrupted.set( true ); + isAckFailureMuted.set( true ); } @Override - public boolean isInterrupted() + public boolean isAckFailureMuted() { - return interrupted.get(); + return isAckFailureMuted.get(); } @Override diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnection.java b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnection.java index 4542c07ec5..0de4c92a52 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnection.java @@ -226,9 +226,9 @@ public void resetAsync() } @Override - public boolean isInterrupted() + public boolean isAckFailureMuted() { - return delegate.isInterrupted(); + return delegate.isAckFailureMuted(); } @Override @@ -260,7 +260,7 @@ private void onDelegateException( RuntimeException e ) { unrecoverableErrorsOccurred = true; } - else if( !isInterrupted() ) + else if( !isAckFailureMuted() ) { ackFailure(); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java b/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java index ed98953ce5..b33f98579d 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java @@ -114,10 +114,10 @@ public interface Connection extends AutoCloseable void resetAsync(); /** - * Return true if the current session statement execution has been interrupted by another thread, otherwise false. - * @return true if the current session statement execution has been interrupted by another thread, otherwise false + * Return true if ack_failure message is temporarily muted as the failure message will be acked using reset instead + * @return true if no ack_failre message should be sent when ackable failures are received. */ - boolean isInterrupted(); + boolean isAckFailureMuted(); /** * Returns the version of the server connected to. diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java index 83543355e0..46a2679c5f 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java @@ -18,8 +18,10 @@ */ package org.neo4j.driver.v1.integration; +import org.hamcrest.MatcherAssert; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.neo4j.driver.v1.AuthToken; import org.neo4j.driver.v1.AuthTokens; @@ -30,9 +32,11 @@ import org.neo4j.driver.v1.Transaction; import org.neo4j.driver.v1.exceptions.ClientException; import org.neo4j.driver.v1.exceptions.Neo4jException; +import org.neo4j.driver.v1.exceptions.TransientException; import org.neo4j.driver.v1.util.TestNeo4j; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.CoreMatchers.startsWith; import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertFalse; @@ -46,6 +50,9 @@ public class SessionIT @Rule public TestNeo4j neo4j = new TestNeo4j(); + @Rule + public ExpectedException exception = ExpectedException.none(); + @Test public void shouldKnowSessionIsClosed() throws Throwable { @@ -161,7 +168,7 @@ public void shouldKillLongStreamingResult() throws Throwable recordCount++; } - fail("Should have got an exception about statement get killed."); + fail("Should have got an exception about streaming get killed."); } catch( ClientException e ) { @@ -175,6 +182,96 @@ public void shouldKillLongStreamingResult() throws Throwable } } + @Test + public void shouldNotAllowBeginTxIfResetFailureIsNotConsumed() throws Throwable + { + // Given + neo4j.ensureProcedures( "longRunningStatement.jar" ); + Driver driver = GraphDatabase.driver( neo4j.uri() ); + + try( Session session = driver.session() ) + { + Transaction tx = session.beginTransaction(); + + tx.run("CALL test.driver.longRunningStatement({seconds})", + parameters( "seconds", 10 ) ); + Thread.sleep( 1* 1000 ); + session.reset(); + + exception.expect( ClientException.class ); + exception.expectMessage( startsWith( + "An error has occurred due to the cancellation of executing a previous statement." ) ); + + // When & Then + tx = session.beginTransaction(); + assertThat( tx, notNullValue() ); + } + } + + @Test + public void shouldThrowExceptionOnCloseIfResetFailureIsNotConsumed() throws Throwable + { + // Given + neo4j.ensureProcedures( "longRunningStatement.jar" ); + Driver driver = GraphDatabase.driver( neo4j.uri() ); + + Session session = driver.session(); + session.run( "CALL test.driver.longRunningStatement({seconds})", + parameters( "seconds", 10 ) ); + Thread.sleep( 1 * 1000 ); + session.reset(); + + exception.expect( ClientException.class ); + exception.expectMessage( startsWith( + "An error has occurred due to the cancellation of executing a previous statement." ) ); + + // When & Then + session.close(); + } + + @Test + public void shouldBeAbleToBeginTxAfterResetFailureIsConsumed() throws Throwable + { + // Given + neo4j.ensureProcedures( "longRunningStatement.jar" ); + Driver driver = GraphDatabase.driver( neo4j.uri() ); + + try( Session session = driver.session() ) + { + Transaction tx = session.beginTransaction(); + + StatementResult procedureResult = tx.run("CALL test.driver.longRunningStatement({seconds})", + parameters( "seconds", 10 ) ); + Thread.sleep( 1* 1000 ); + session.reset(); + + try + { + procedureResult.consume(); + fail( "Should procedure call with an exception as we interrupted procedure call" ); + } + catch ( TransientException e ) + { + MatcherAssert.assertThat( e.getMessage(), startsWith( "The transaction has been terminated." ) ); + } + catch ( Throwable e ) + { + fail( "Expected exception is different from what we've received: " + e.getMessage() ); + } + + // When + tx = session.beginTransaction(); + tx.run( "CREATE (n:FirstNode)" ); + tx.success(); + tx.close(); + + // Then + StatementResult result = session.run( "MATCH (n) RETURN count(n)" ); + long nodes = result.single().get( "count(n)" ).asLong(); + MatcherAssert.assertThat( nodes, equalTo( 1L ) ); + } + } + private void resetSessionAfterTimeout( final Session session, final int timeout ) { new Thread( new Runnable() diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionIT.java index dbe6de3750..fee670cf86 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionIT.java @@ -23,6 +23,9 @@ import org.junit.rules.ExpectedException; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.StatementResult; @@ -142,7 +145,7 @@ public void shouldBeOpenBeforeCommit() throws Throwable public void shouldHandleNullParametersGracefully() { // When - session.run("match (n) return count(n)", (Value)null); + session.run( "match (n) return count(n)", (Value) null ); // Then // pass - no exception thrown @@ -155,7 +158,7 @@ public void shouldHandleFailureAfterClosingTransaction() { // GIVEN a successful query in a transaction Transaction tx = session.beginTransaction(); - StatementResult result = tx.run("CREATE (n) RETURN n"); + StatementResult result = tx.run( "CREATE (n) RETURN n" ); result.consume(); tx.success(); tx.close(); @@ -164,7 +167,7 @@ public void shouldHandleFailureAfterClosingTransaction() exception.expect( ClientException.class ); //WHEN running a malformed query in the original session - session.run("CREAT (n) RETURN n").consume(); + session.run( "CREAT (n) RETURN n" ).consume(); } @SuppressWarnings( "ConstantConditions" ) @@ -204,11 +207,100 @@ public void shouldHandleNullMapParameters() throws Throwable // When try ( Transaction tx = session.beginTransaction() ) { - Map params = null; + Map params = null; tx.run( "CREATE (n:FirstNode)", params ); tx.success(); } // Then it wasn't the end of the world as we know it } + + @Test + public void shouldBeAbleToRunMoreStatementsAfterResetOnNoErrorState() throws Throwable + { + // Given + session.reset(); + + // When + Transaction tx = session.beginTransaction(); + tx.run( "CREATE (n:FirstNode)" ); + tx.success(); + tx.close(); + + // Then the outcome of both statements should be visible + StatementResult result = session.run( "MATCH (n) RETURN count(n)" ); + long nodes = result.single().get( "count(n)" ).asLong(); + assertThat( nodes, equalTo( 1L ) ); + } + + @Test + public void shouldHandleResetBeforeRun() throws Throwable + { + // Expect + exception.expect( ClientException.class ); + exception.expectMessage( "Cannot run more statements in this transaction, because previous statements in the " + + "transaction has failed and the transaction has been rolled back. Please start a new" + + " transaction to run another statement." ); + // When + Transaction tx = session.beginTransaction(); + session.reset(); + tx.run( "CREATE (n:FirstNode)" ); + } + + private Transaction globalTx = null; + @Test + public void shouldHandleResetFromMultipleThreads() throws Throwable + { + // When + ExecutorService runner = Executors.newFixedThreadPool( 2 ); + runner.execute( new Runnable() + + { + @Override + public void run() + { + globalTx = session.beginTransaction(); + globalTx.run( "CREATE (n:FirstNode)" ); + try + { + Thread.sleep( 1000 ); + } + catch ( InterruptedException e ) + { + new AssertionError( e ); + } + + globalTx = session.beginTransaction(); + globalTx.run( "CREATE (n:FirstNode)" ); + globalTx.success(); + globalTx.close(); + + } + } ); + runner.execute( new Runnable() + + { + @Override + public void run() + { + try + { + Thread.sleep( 500 ); + } + catch ( InterruptedException e ) + { + new AssertionError( e ); + } + + session.reset(); + } + } ); + + runner.awaitTermination( 5, TimeUnit.SECONDS ); + + // Then the outcome of both statements should be visible + StatementResult result = session.run( "MATCH (n) RETURN count(n)" ); + long nodes = result.single().get( "count(n)" ).asLong(); + assertThat( nodes, equalTo( 1L ) ); + } }