diff --git a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java index f2397c3460..7436745db2 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java @@ -173,11 +173,6 @@ else if ( state == State.ROLLED_BACK ) { return failedFuture( new ClientException( "Can't commit, transaction has been rolled back" ) ); } - else if ( state == State.TERMINATED ) - { - transactionClosed( State.ROLLED_BACK ); - return failedFuture( new ClientException( "Can't commit, transaction has been terminated" ) ); - } else { return resultCursors.retrieveNotConsumedError() @@ -197,12 +192,6 @@ else if ( state == State.ROLLED_BACK ) { return completedWithNull(); } - else if ( state == State.TERMINATED ) - { - // no need for explicit rollback, transaction should've been rolled back by the database - transactionClosed( State.ROLLED_BACK ); - return completedWithNull(); - } else { return resultCursors.retrieveNotConsumedError() @@ -344,6 +333,11 @@ public void setBookmark( Bookmark bookmark ) private CompletionStage doCommitAsync() { + if ( state == State.TERMINATED ) + { + return failedFuture( new ClientException( "Can't commit, transaction has been terminated" ) ); + } + CompletableFuture commitFuture = new CompletableFuture<>(); ResponseHandler pullAllHandler = new CommitTxResponseHandler( commitFuture, this ); connection.runAndFlush( COMMIT_QUERY, emptyMap(), NoOpResponseHandler.INSTANCE, pullAllHandler ); @@ -352,6 +346,11 @@ private CompletionStage doCommitAsync() private CompletionStage doRollbackAsync() { + if ( state == State.TERMINATED ) + { + return completedWithNull(); + } + CompletableFuture rollbackFuture = new CompletableFuture<>(); ResponseHandler pullAllHandler = new RollbackTxResponseHandler( rollbackFuture ); connection.runAndFlush( ROLLBACK_QUERY, emptyMap(), NoOpResponseHandler.INSTANCE, pullAllHandler ); diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/HandshakeHandler.java b/driver/src/main/java/org/neo4j/driver/internal/async/HandshakeHandler.java index 16afecb32d..2c2b34d849 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/HandshakeHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/HandshakeHandler.java @@ -95,16 +95,8 @@ public void exceptionCaught( ChannelHandlerContext ctx, Throwable error ) else { failed = true; - - Throwable cause = error instanceof DecoderException ? error.getCause() : error; - if ( cause instanceof SSLHandshakeException ) - { - fail( ctx, new SecurityException( "Failed to establish secured connection with the server", cause ) ); - } - else - { - fail( ctx, cause ); - } + Throwable cause = transformError( error ); + fail( ctx, cause ); } } @@ -161,4 +153,21 @@ private static Throwable protocolNoSupportedByDriverError( int suggestedProtocol return new ClientException( "Protocol error, server suggested unexpected protocol version: " + suggestedProtocolVersion ); } + + private static Throwable transformError( Throwable error ) + { + Throwable cause = error instanceof DecoderException ? error.getCause() : error; + if ( cause instanceof ServiceUnavailableException ) + { + return cause; + } + else if ( cause instanceof SSLHandshakeException ) + { + return new SecurityException( "Failed to establish secured connection with the server", cause ); + } + else + { + return new ServiceUnavailableException( "Failed to establish connection with the server", cause ); + } + } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ChannelErrorHandler.java b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ChannelErrorHandler.java index 4fda29998f..fbff09ba0a 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ChannelErrorHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ChannelErrorHandler.java @@ -101,7 +101,7 @@ private void fail( ChannelHandlerContext ctx, Throwable error ) ctx.close(); } - private Throwable transformError( Throwable error ) + private static Throwable transformError( Throwable error ) { if ( error instanceof CodecException ) { diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingSettings.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingSettings.java index d20f58d622..80280a5edf 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingSettings.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingSettings.java @@ -18,8 +18,12 @@ */ package org.neo4j.driver.internal.cluster; +import static java.util.concurrent.TimeUnit.SECONDS; + public class RoutingSettings { + public static final RoutingSettings DEFAULT = new RoutingSettings( 1, SECONDS.toMillis( 5 ) ); + private final int maxRoutingFailures; private final long retryTimeoutDelay; private final RoutingContext routingContext; diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/BookmarkResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/BookmarkResponseHandler.java deleted file mode 100644 index 4b6eb94593..0000000000 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/BookmarkResponseHandler.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (c) 2002-2018 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.driver.internal.handlers; - -import java.util.Map; - -import org.neo4j.driver.internal.Bookmark; -import org.neo4j.driver.internal.ExplicitTransaction; -import org.neo4j.driver.internal.spi.ResponseHandler; -import org.neo4j.driver.v1.Value; - -public class BookmarkResponseHandler implements ResponseHandler -{ - private final ExplicitTransaction tx; - - public BookmarkResponseHandler( ExplicitTransaction tx ) - { - this.tx = tx; - } - - @Override - public void onSuccess( Map metadata ) - { - Value bookmarkValue = metadata.get( "bookmark" ); - if ( bookmarkValue != null ) - { - tx.setBookmark( Bookmark.from( bookmarkValue.asString() ) ); - } - } - - @Override - public void onFailure( Throwable error ) - { - } - - @Override - public void onRecord( Value[] fields ) - { - } -} diff --git a/driver/src/main/java/org/neo4j/driver/v1/Config.java b/driver/src/main/java/org/neo4j/driver/v1/Config.java index 1971ec3300..b29f8a1374 100644 --- a/driver/src/main/java/org/neo4j/driver/v1/Config.java +++ b/driver/src/main/java/org/neo4j/driver/v1/Config.java @@ -267,8 +267,8 @@ public static class ConfigBuilder private boolean encrypted = true; private TrustStrategy trustStrategy = trustAllCertificates(); private LoadBalancingStrategy loadBalancingStrategy = LoadBalancingStrategy.LEAST_CONNECTED; - private int routingFailureLimit = 1; - private long routingRetryDelayMillis = TimeUnit.SECONDS.toMillis( 5 ); + private int routingFailureLimit = RoutingSettings.DEFAULT.maxRoutingFailures(); + private long routingRetryDelayMillis = RoutingSettings.DEFAULT.retryTimeoutDelay(); private int connectionTimeoutMillis = (int) TimeUnit.SECONDS.toMillis( 5 ); private RetrySettings retrySettings = RetrySettings.DEFAULT; diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/ChannelConnectorImplIT.java b/driver/src/test/java/org/neo4j/driver/internal/async/ChannelConnectorImplIT.java index 02cac4d285..f94a2da47c 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/ChannelConnectorImplIT.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/ChannelConnectorImplIT.java @@ -29,7 +29,9 @@ import org.junit.Test; import java.io.IOException; +import java.io.UncheckedIOException; import java.net.ServerSocket; +import java.net.Socket; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -44,6 +46,7 @@ import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; import org.neo4j.driver.v1.util.TestNeo4j; +import static java.util.concurrent.CompletableFuture.runAsync; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.startsWith; import static org.junit.Assert.assertEquals; @@ -186,6 +189,42 @@ public void shouldFailWhenTLSHandshakeTakesTooLong() throws Exception testReadTimeoutOnConnect( SecurityPlan.forAllCertificates() ); } + @Test + public void shouldThrowServiceUnavailableExceptionOnFailureDuringConnect() throws Exception + { + ServerSocket server = new ServerSocket( 0 ); + BoltServerAddress address = new BoltServerAddress( "localhost", server.getLocalPort() ); + + runAsync( () -> + { + try + { + // wait for a connection + Socket socket = server.accept(); + // and terminate it immediately so that client gets a "reset by peer" IOException + socket.close(); + server.close(); + } + catch ( IOException e ) + { + throw new UncheckedIOException( e ); + } + } ); + + ChannelConnector connector = newConnector( neo4j.authToken() ); + ChannelFuture channelFuture = connector.connect( address, bootstrap ); + + // connect operation should fail with ServiceUnavailableException + try + { + await( channelFuture ); + fail( "Exception expected" ); + } + catch ( ServiceUnavailableException ignore ) + { + } + } + private void testReadTimeoutOnConnect( SecurityPlan securityPlan ) throws IOException { try ( ServerSocket server = new ServerSocket( 0 ) ) // server that accepts connections but does not reply diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/HandshakeHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/HandshakeHandlerTest.java index b572f869c0..2aee56d048 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/HandshakeHandlerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/HandshakeHandlerTest.java @@ -46,10 +46,10 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; -import static org.neo4j.driver.internal.async.ChannelAttributes.setMessageDispatcher; import static org.neo4j.driver.internal.async.BoltProtocolV1Util.HTTP; import static org.neo4j.driver.internal.async.BoltProtocolV1Util.NO_PROTOCOL_VERSION; import static org.neo4j.driver.internal.async.BoltProtocolV1Util.PROTOCOL_VERSION_1; +import static org.neo4j.driver.internal.async.ChannelAttributes.setMessageDispatcher; import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; import static org.neo4j.driver.v1.util.TestUtil.await; @@ -85,9 +85,34 @@ public void shouldFailGivenPromiseWhenExceptionCaught() await( handshakeCompletedPromise ); fail( "Exception expected" ); } - catch ( Exception e ) + catch ( ServiceUnavailableException e ) + { + assertEquals( cause, e.getCause() ); + } + + // channel should be closed + assertNull( await( channel.closeFuture() ) ); + } + + @Test + public void shouldFailGivenPromiseWhenServiceUnavailableExceptionCaught() + { + ChannelPromise handshakeCompletedPromise = channel.newPromise(); + HandshakeHandler handler = newHandler( handshakeCompletedPromise ); + channel.pipeline().addLast( handler ); + + ServiceUnavailableException error = new ServiceUnavailableException( "Bad error" ); + channel.pipeline().fireExceptionCaught( error ); + + try { - assertEquals( cause, e ); + // promise should fail + await( handshakeCompletedPromise ); + fail( "Exception expected" ); + } + catch ( ServiceUnavailableException e ) + { + assertEquals( error, e ); } // channel should be closed @@ -112,9 +137,9 @@ public void shouldFailGivenPromiseWhenMultipleExceptionsCaught() await( handshakeCompletedPromise ); fail( "Exception expected" ); } - catch ( RuntimeException e ) + catch ( ServiceUnavailableException e ) { - assertEquals( error1, e ); + assertEquals( error1, e.getCause() ); } // channel should be closed @@ -147,9 +172,9 @@ public void shouldUnwrapDecoderException() await( handshakeCompletedPromise ); fail( "Exception expected" ); } - catch ( Exception e ) + catch ( ServiceUnavailableException e ) { - assertEquals( cause, e ); + assertEquals( cause, e.getCause() ); } // channel should be closed diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java index e13dd2b8fd..4c99252ac0 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java @@ -21,6 +21,7 @@ import io.netty.channel.Channel; import org.junit.After; import org.junit.AfterClass; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; @@ -28,6 +29,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CompletionStage; import java.util.concurrent.CountDownLatch; @@ -58,6 +60,7 @@ import org.neo4j.driver.v1.TransactionWork; import org.neo4j.driver.v1.Values; import org.neo4j.driver.v1.exceptions.ClientException; +import org.neo4j.driver.v1.exceptions.Neo4jException; import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; import org.neo4j.driver.v1.exceptions.SessionExpiredException; import org.neo4j.driver.v1.exceptions.TransientException; @@ -406,6 +409,7 @@ public String apply( Session session ) } @Test + @Ignore public void shouldNotServeWritesWhenMajorityOfCoresAreDead() throws Exception { Cluster cluster = clusterRule.getCluster(); @@ -413,11 +417,12 @@ public void shouldNotServeWritesWhenMajorityOfCoresAreDead() throws Exception try ( Driver driver = createDriver( leader.getRoutingUri() ) ) { + Set cores = cluster.cores(); for ( ClusterMember follower : cluster.followers() ) { cluster.kill( follower ); } - awaitLeaderToStepDown( driver ); + awaitLeaderToStepDown( cores ); // now we should be unable to write because majority of cores is down for ( int i = 0; i < 10; i++ ) @@ -436,6 +441,7 @@ public void shouldNotServeWritesWhenMajorityOfCoresAreDead() throws Exception } @Test + @Ignore public void shouldServeReadsWhenMajorityOfCoresAreDead() throws Exception { Cluster cluster = clusterRule.getCluster(); @@ -462,11 +468,12 @@ public Integer execute( Transaction tx ) ensureNodeVisible( cluster, "Star Lord", bookmark ); + Set cores = cluster.cores(); for ( ClusterMember follower : cluster.followers() ) { cluster.kill( follower ); } - awaitLeaderToStepDown( driver ); + awaitLeaderToStepDown( cores ); // now we should be unable to write because majority of cores is down try ( Session session = driver.session( AccessMode.WRITE ) ) @@ -913,44 +920,27 @@ public Integer execute( Transaction tx ) } } - private void awaitLeaderToStepDown( Driver driver ) + private void awaitLeaderToStepDown( Set cores ) { - int leadersCount; - int followersCount; - int readReplicasCount; + long deadline = System.currentTimeMillis() + DEFAULT_TIMEOUT_MS; + ClusterOverview overview = null; do { - try ( Session session = driver.session() ) + for ( ClusterMember core : cores ) { - int newLeadersCount = 0; - int newFollowersCount = 0; - int newReadReplicasCount = 0; - for ( Record record : session.run( "CALL dbms.cluster.overview()" ).list() ) + overview = fetchClusterOverview( core ); + if ( overview != null ) { - ClusterMemberRole role = ClusterMemberRole.valueOf( record.get( "role" ).asString() ); - if ( role == ClusterMemberRole.LEADER ) - { - newLeadersCount++; - } - else if ( role == ClusterMemberRole.FOLLOWER ) - { - newFollowersCount++; - } - else if ( role == ClusterMemberRole.READ_REPLICA ) - { - newReadReplicasCount++; - } - else - { - throw new AssertionError( "Unknown role: " + role ); - } + break; } - leadersCount = newLeadersCount; - followersCount = newFollowersCount; - readReplicasCount = newReadReplicasCount; } } - while ( !(leadersCount == 0 && followersCount == 1 && readReplicasCount == 2) ); + while ( !isSingleFollowerWithReadReplicas( overview ) && System.currentTimeMillis() <= deadline ); + + if ( System.currentTimeMillis() > deadline ) + { + throw new IllegalStateException( "Leader did not step down in " + DEFAULT_TIMEOUT_MS + "ms. Last seen cluster overview: " + overview ); + } } private Driver createDriver( URI boltUri ) @@ -968,6 +958,43 @@ private Driver discoverDriver( List routingUris ) return GraphDatabase.routingDriver( routingUris, clusterRule.getDefaultAuthToken(), configWithoutLogging() ); } + private ClusterOverview fetchClusterOverview( ClusterMember member ) + { + int leaderCount = 0; + int followerCount = 0; + int readReplicaCount = 0; + + Driver driver = clusterRule.getCluster().getDirectDriver( member ); + try ( Session session = driver.session() ) + { + for ( Record record : session.run( "CALL dbms.cluster.overview()" ).list() ) + { + ClusterMemberRole role = ClusterMemberRole.valueOf( record.get( "role" ).asString() ); + if ( role == ClusterMemberRole.LEADER ) + { + leaderCount++; + } + else if ( role == ClusterMemberRole.FOLLOWER ) + { + followerCount++; + } + else if ( role == ClusterMemberRole.READ_REPLICA ) + { + readReplicaCount++; + } + else + { + throw new AssertionError( "Unknown role: " + role ); + } + } + return new ClusterOverview( leaderCount, followerCount, readReplicaCount ); + } + catch ( Neo4jException ignore ) + { + return null; + } + } + private static void createNodesInDifferentThreads( int count, final Driver driver ) throws Exception { final CountDownLatch beforeRunLatch = new CountDownLatch( count ); @@ -1133,6 +1160,17 @@ private static ExecutorService newExecutor() return Executors.newCachedThreadPool( daemon( CausalClusteringIT.class.getSimpleName() + "-thread-" ) ); } + private static boolean isSingleFollowerWithReadReplicas( ClusterOverview overview ) + { + if ( overview == null ) + { + return false; + } + return overview.leaderCount == 0 && + overview.followerCount == 1 && + overview.readReplicaCount == ClusterRule.READ_REPLICA_COUNT; + } + private static class RecordAndSummary { final Record record; @@ -1144,4 +1182,28 @@ private static class RecordAndSummary this.summary = summary; } } + + private static class ClusterOverview + { + final int leaderCount; + final int followerCount; + final int readReplicaCount; + + ClusterOverview( int leaderCount, int followerCount, int readReplicaCount ) + { + this.leaderCount = leaderCount; + this.followerCount = followerCount; + this.readReplicaCount = readReplicaCount; + } + + @Override + public String toString() + { + return "ClusterOverview{" + + "leaderCount=" + leaderCount + + ", followerCount=" + followerCount + + ", readReplicaCount=" + readReplicaCount + + '}'; + } + } } diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionResetIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionResetIT.java index 8b66616e35..e56e4cd189 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionResetIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionResetIT.java @@ -183,8 +183,7 @@ public void shouldNotAllowBeginTxIfResetFailureIsNotConsumed() throws Throwable { Transaction tx1 = session.beginTransaction(); - tx1.run( "CALL test.driver.longRunningStatement({seconds})", - parameters( "seconds", 10 ) ); + StatementResult result = tx1.run( "CALL test.driver.longRunningStatement({seconds})", parameters( "seconds", 10 ) ); awaitActiveQueriesToContain( "CALL test.driver.longRunningStatement" ); session.reset(); @@ -210,6 +209,17 @@ public void shouldNotAllowBeginTxIfResetFailureIsNotConsumed() throws Throwable assertThat( e.getMessage(), containsString( "Cannot run more statements in this transaction, it has been terminated" ) ); } + + // Make sure failure from the terminated long running statement is propagated + try + { + result.consume(); + fail( "Exception expected" ); + } + catch ( Neo4jException e ) + { + assertThat( e.getMessage(), containsString( "The transaction has been terminated" ) ); + } } } @@ -584,25 +594,33 @@ public void shouldHandleResetFromMultipleThreads() throws Throwable CountDownLatch beforeCommit = new CountDownLatch( 1 ); CountDownLatch afterReset = new CountDownLatch( 1 ); - executor.submit( () -> + Future txFuture = executor.submit( () -> { - try ( Transaction tx1 = session.beginTransaction() ) + Transaction tx1 = session.beginTransaction(); + tx1.run( "CREATE (n:FirstNode)" ); + beforeCommit.countDown(); + afterReset.await(); + + // session has been reset, it should not be possible to commit the transaction + try + { + tx1.success(); + tx1.close(); + } + catch ( Neo4jException ignore ) { - tx1.run( "CREATE (n:FirstNode)" ); - beforeCommit.countDown(); - afterReset.await(); } try ( Transaction tx2 = session.beginTransaction() ) { - tx2.run( "CREATE (n:FirstNode)" ); + tx2.run( "CREATE (n:SecondNode)" ); tx2.success(); } return null; } ); - executor.submit( () -> + Future resetFuture = executor.submit( () -> { beforeCommit.await(); session.reset(); @@ -611,12 +629,13 @@ public void shouldHandleResetFromMultipleThreads() throws Throwable } ); executor.shutdown(); - executor.awaitTermination( 10, SECONDS ); + executor.awaitTermination( 20, SECONDS ); + + txFuture.get( 20, SECONDS ); + resetFuture.get( 20, SECONDS ); - // Then the outcome of both statements should be visible - StatementResult result = session.run( "MATCH (n) RETURN count(n)" ); - long nodes = result.single().get( "count(n)" ).asLong(); - assertThat( nodes, equalTo( 1L ) ); + assertEquals( 0, countNodes( "FirstNode" ) ); + assertEquals( 1, countNodes( "SecondNode" ) ); } private void testResetOfQueryWaitingForLock( NodeIdUpdater nodeIdUpdater ) throws Exception @@ -812,10 +831,15 @@ private void awaitActiveQueriesToContain( String value ) } private long countNodes() + { + return countNodes( null ); + } + + private long countNodes( String label ) { try ( Session session = neo4j.driver().session() ) { - StatementResult result = session.run( "MATCH (n) RETURN count(n) AS result" ); + StatementResult result = session.run( "MATCH (n" + (label == null ? "" : ":" + label) + ") RETURN count(n) AS result" ); return result.single().get( 0 ).asLong(); } } diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java index c9ebc981cd..bc5953226a 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java @@ -18,6 +18,9 @@ */ package org.neo4j.driver.v1.integration; +import io.netty.channel.Channel; +import io.netty.channel.ChannelPipeline; +import io.netty.util.concurrent.Future; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -37,6 +40,12 @@ import org.neo4j.driver.internal.ExplicitTransaction; import org.neo4j.driver.internal.async.EventLoopGroupFactory; +import org.neo4j.driver.internal.cluster.RoutingSettings; +import org.neo4j.driver.internal.retry.RetrySettings; +import org.neo4j.driver.internal.util.ChannelTrackingDriverFactory; +import org.neo4j.driver.internal.util.Clock; +import org.neo4j.driver.v1.Config; +import org.neo4j.driver.v1.Driver; import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Session; import org.neo4j.driver.v1.Statement; @@ -67,6 +76,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.junit.Assume.assumeTrue; +import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; import static org.neo4j.driver.internal.util.Iterables.single; import static org.neo4j.driver.internal.util.Matchers.blockingOperationInEventLoopError; import static org.neo4j.driver.internal.util.Matchers.containsResultAvailableAfterAndResultConsumedAfter; @@ -1308,6 +1318,18 @@ public void shouldBePossibleToRunMoreTransactionsAfterOneIsTerminated() assertEquals( 1, countNodes( 42 ) ); } + @Test + public void shouldPropagateCommitFailureAfterFatalError() + { + testCommitAndRollbackFailurePropagation( true ); + } + + @Test + public void shouldPropagateRollbackFailureAfterFatalError() + { + testCommitAndRollbackFailurePropagation( false ); + } + private int countNodes( Object id ) { StatementResult result = session.run( "MATCH (n:Node {id: $id}) RETURN count(n)", parameters( "id", id ) ); @@ -1356,6 +1378,45 @@ private void testConsume( String query ) assertNull( await( cursor.nextAsync() ) ); } + private void testCommitAndRollbackFailurePropagation( boolean commit ) + { + ChannelTrackingDriverFactory driverFactory = new ChannelTrackingDriverFactory( 1, Clock.SYSTEM ); + Config config = Config.build().withLogging( DEV_NULL_LOGGING ).toConfig(); + + try ( Driver driver = driverFactory.newInstance( neo4j.uri(), neo4j.authToken(), RoutingSettings.DEFAULT, RetrySettings.DEFAULT, config ) ) + { + try ( Session session = driver.session() ) + { + Transaction tx = session.beginTransaction(); + + // run query but do not consume the result + tx.run( "UNWIND range(0, 10000) AS x RETURN x + 1" ); + + IOException ioError = new IOException( "Connection reset by peer" ); + for ( Channel channel : driverFactory.channels() ) + { + // make channel experience a fatal network error + // run in the event loop thread and wait for the whole operation to complete + Future future = channel.eventLoop().submit( () -> channel.pipeline().fireExceptionCaught( ioError ) ); + await( future ); + } + + CompletionStage commitOrRollback = commit ? tx.commitAsync() : tx.rollbackAsync(); + + // commit/rollback should fail and propagate the network error + try + { + await( commitOrRollback ); + fail( "Exception expected" ); + } + catch ( ServiceUnavailableException e ) + { + assertEquals( ioError, e.getCause() ); + } + } + } + } + private void assumeDatabaseSupportsBookmarks() { assumeTrue( "Neo4j " + neo4j.version() + " does not support bookmarks", diff --git a/driver/src/test/java/org/neo4j/driver/v1/util/RecordingByteChannel.java b/driver/src/test/java/org/neo4j/driver/v1/util/RecordingByteChannel.java deleted file mode 100644 index 9ea5d45117..0000000000 --- a/driver/src/test/java/org/neo4j/driver/v1/util/RecordingByteChannel.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Copyright (c) 2002-2018 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.driver.v1.util; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.ReadableByteChannel; -import java.nio.channels.WritableByteChannel; - -public class RecordingByteChannel implements WritableByteChannel, ReadableByteChannel -{ - private final ByteBuffer buffer = ByteBuffer.allocate( 16 * 1024 ); - private int writePosition = 0; - private int readPosition = 0; - private boolean eof; - - @Override - public boolean isOpen() - { - return true; - } - - @Override - public void close() throws IOException - { - - } - - @Override - public int write( ByteBuffer src ) throws IOException - { - buffer.position( writePosition ); - int originalPosition = writePosition; - - buffer.put( src ); - - writePosition = buffer.position(); - return writePosition - originalPosition; - } - - @Override - public int read( ByteBuffer dst ) throws IOException - { - if ( readPosition == writePosition ) - { - return eof ? -1 : 0; - } - buffer.position( readPosition ); - int originalPosition = readPosition; - int originalLimit = buffer.limit(); - - buffer.limit( Math.min( buffer.position() + (dst.limit() - dst.position()), writePosition ) ); - dst.put( buffer ); - - readPosition = buffer.position(); - buffer.limit( originalLimit ); - return readPosition - originalPosition; - } - - public byte[] getBytes() - { - byte[] bytes = new byte[buffer.position()]; - buffer.position( 0 ); - buffer.get( bytes ); - return bytes; - } - - /** - * Mark this buffer as ended. Once whatever is currently unread in it is consumed, - * it will start yielding -1 responses. - */ - public void eof() - { - eof = true; - } -} diff --git a/driver/src/test/java/org/neo4j/driver/v1/util/cc/Cluster.java b/driver/src/test/java/org/neo4j/driver/v1/util/cc/Cluster.java index 034fb7bee9..10485b1da0 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/util/cc/Cluster.java +++ b/driver/src/test/java/org/neo4j/driver/v1/util/cc/Cluster.java @@ -123,6 +123,14 @@ public ClusterMember anyReadReplica() return randomOf( readReplicas() ); } + public Set cores() + { + Set readReplicas = membersWithRole( ClusterMemberRole.READ_REPLICA ); + Set cores = new HashSet<>( members ); + cores.removeAll( readReplicas ); + return cores; + } + public Set readReplicas() { return membersWithRole( ClusterMemberRole.READ_REPLICA ); diff --git a/driver/src/test/java/org/neo4j/driver/v1/util/cc/ClusterRule.java b/driver/src/test/java/org/neo4j/driver/v1/util/cc/ClusterRule.java index b4f8782644..06f05c9a53 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/util/cc/ClusterRule.java +++ b/driver/src/test/java/org/neo4j/driver/v1/util/cc/ClusterRule.java @@ -30,18 +30,18 @@ import org.neo4j.driver.v1.util.Neo4jRunner; import static org.junit.Assume.assumeTrue; +import static org.neo4j.driver.v1.util.Neo4jRunner.PASSWORD; import static org.neo4j.driver.v1.util.Neo4jRunner.TARGET_DIR; +import static org.neo4j.driver.v1.util.Neo4jRunner.USER; import static org.neo4j.driver.v1.util.cc.CommandLineUtil.boltKitAvailable; public class ClusterRule extends ExternalResource { private static final Path CLUSTER_DIR = Paths.get( TARGET_DIR, "test-cluster" ).toAbsolutePath(); - private static final String PASSWORD = "test"; private static final int INITIAL_PORT = 20_000; - // todo: should be possible to configure (dynamically add/remove) cores and read replicas - private static final int CORE_COUNT = 3; - private static final int READ_REPLICA_COUNT = 2; + public static final int CORE_COUNT = 3; + public static final int READ_REPLICA_COUNT = 2; public Cluster getCluster() { @@ -50,7 +50,7 @@ public Cluster getCluster() public AuthToken getDefaultAuthToken() { - return AuthTokens.basic( "neo4j", PASSWORD ); + return AuthTokens.basic( USER, PASSWORD ); } public static void stopSharedCluster()