Skip to content

Commit 87a46c8

Browse files
committed
Check for fatal errors last in the transaction
Fatal errors terminate the transaction. It will fail to commit when terminated and rollback will be a no-op. All outstanding errors from previous queries should still be propagated. Previously transaction checked it's terminated status too early and sometimes did not propagate pending errors. This commit fixes the problem by moving the termination check to a place where all result cursors are fully fetched. So errors from cursors will always get propagated. Also added a constant for default `RoutingSettings` for convenience.
1 parent dc9b272 commit 87a46c8

File tree

5 files changed

+116
-28
lines changed

5 files changed

+116
-28
lines changed

driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -173,11 +173,6 @@ else if ( state == State.ROLLED_BACK )
173173
{
174174
return failedFuture( new ClientException( "Can't commit, transaction has been rolled back" ) );
175175
}
176-
else if ( state == State.TERMINATED )
177-
{
178-
transactionClosed( State.ROLLED_BACK );
179-
return failedFuture( new ClientException( "Can't commit, transaction has been terminated" ) );
180-
}
181176
else
182177
{
183178
return resultCursors.retrieveNotConsumedError()
@@ -197,12 +192,6 @@ else if ( state == State.ROLLED_BACK )
197192
{
198193
return completedWithNull();
199194
}
200-
else if ( state == State.TERMINATED )
201-
{
202-
// no need for explicit rollback, transaction should've been rolled back by the database
203-
transactionClosed( State.ROLLED_BACK );
204-
return completedWithNull();
205-
}
206195
else
207196
{
208197
return resultCursors.retrieveNotConsumedError()
@@ -344,6 +333,11 @@ public void setBookmark( Bookmark bookmark )
344333

345334
private CompletionStage<Void> doCommitAsync()
346335
{
336+
if ( state == State.TERMINATED )
337+
{
338+
return failedFuture( new ClientException( "Can't commit, transaction has been terminated" ) );
339+
}
340+
347341
CompletableFuture<Void> commitFuture = new CompletableFuture<>();
348342
ResponseHandler pullAllHandler = new CommitTxResponseHandler( commitFuture, this );
349343
connection.runAndFlush( COMMIT_QUERY, emptyMap(), NoOpResponseHandler.INSTANCE, pullAllHandler );
@@ -352,6 +346,11 @@ private CompletionStage<Void> doCommitAsync()
352346

353347
private CompletionStage<Void> doRollbackAsync()
354348
{
349+
if ( state == State.TERMINATED )
350+
{
351+
return completedWithNull();
352+
}
353+
355354
CompletableFuture<Void> rollbackFuture = new CompletableFuture<>();
356355
ResponseHandler pullAllHandler = new RollbackTxResponseHandler( rollbackFuture );
357356
connection.runAndFlush( ROLLBACK_QUERY, emptyMap(), NoOpResponseHandler.INSTANCE, pullAllHandler );

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingSettings.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,12 @@
1818
*/
1919
package org.neo4j.driver.internal.cluster;
2020

21+
import static java.util.concurrent.TimeUnit.SECONDS;
22+
2123
public class RoutingSettings
2224
{
25+
public static final RoutingSettings DEFAULT = new RoutingSettings( 1, SECONDS.toMillis( 5 ) );
26+
2327
private final int maxRoutingFailures;
2428
private final long retryTimeoutDelay;
2529
private final RoutingContext routingContext;

driver/src/main/java/org/neo4j/driver/v1/Config.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -267,8 +267,8 @@ public static class ConfigBuilder
267267
private boolean encrypted = true;
268268
private TrustStrategy trustStrategy = trustAllCertificates();
269269
private LoadBalancingStrategy loadBalancingStrategy = LoadBalancingStrategy.LEAST_CONNECTED;
270-
private int routingFailureLimit = 1;
271-
private long routingRetryDelayMillis = TimeUnit.SECONDS.toMillis( 5 );
270+
private int routingFailureLimit = RoutingSettings.DEFAULT.maxRoutingFailures();
271+
private long routingRetryDelayMillis = RoutingSettings.DEFAULT.retryTimeoutDelay();
272272
private int connectionTimeoutMillis = (int) TimeUnit.SECONDS.toMillis( 5 );
273273
private RetrySettings retrySettings = RetrySettings.DEFAULT;
274274

driver/src/test/java/org/neo4j/driver/v1/integration/SessionResetIT.java

Lines changed: 39 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -183,8 +183,7 @@ public void shouldNotAllowBeginTxIfResetFailureIsNotConsumed() throws Throwable
183183
{
184184
Transaction tx1 = session.beginTransaction();
185185

186-
tx1.run( "CALL test.driver.longRunningStatement({seconds})",
187-
parameters( "seconds", 10 ) );
186+
StatementResult result = tx1.run( "CALL test.driver.longRunningStatement({seconds})", parameters( "seconds", 10 ) );
188187

189188
awaitActiveQueriesToContain( "CALL test.driver.longRunningStatement" );
190189
session.reset();
@@ -210,6 +209,17 @@ public void shouldNotAllowBeginTxIfResetFailureIsNotConsumed() throws Throwable
210209
assertThat( e.getMessage(),
211210
containsString( "Cannot run more statements in this transaction, it has been terminated" ) );
212211
}
212+
213+
// Make sure failure from the terminated long running statement is propagated
214+
try
215+
{
216+
result.consume();
217+
fail( "Exception expected" );
218+
}
219+
catch ( Neo4jException e )
220+
{
221+
assertThat( e.getMessage(), containsString( "The transaction has been terminated" ) );
222+
}
213223
}
214224
}
215225

@@ -584,25 +594,33 @@ public void shouldHandleResetFromMultipleThreads() throws Throwable
584594
CountDownLatch beforeCommit = new CountDownLatch( 1 );
585595
CountDownLatch afterReset = new CountDownLatch( 1 );
586596

587-
executor.submit( () ->
597+
Future<Void> txFuture = executor.submit( () ->
588598
{
589-
try ( Transaction tx1 = session.beginTransaction() )
599+
Transaction tx1 = session.beginTransaction();
600+
tx1.run( "CREATE (n:FirstNode)" );
601+
beforeCommit.countDown();
602+
afterReset.await();
603+
604+
// session has been reset, it should not be possible to commit the transaction
605+
try
606+
{
607+
tx1.success();
608+
tx1.close();
609+
}
610+
catch ( Neo4jException ignore )
590611
{
591-
tx1.run( "CREATE (n:FirstNode)" );
592-
beforeCommit.countDown();
593-
afterReset.await();
594612
}
595613

596614
try ( Transaction tx2 = session.beginTransaction() )
597615
{
598-
tx2.run( "CREATE (n:FirstNode)" );
616+
tx2.run( "CREATE (n:SecondNode)" );
599617
tx2.success();
600618
}
601619

602620
return null;
603621
} );
604622

605-
executor.submit( () ->
623+
Future<Void> resetFuture = executor.submit( () ->
606624
{
607625
beforeCommit.await();
608626
session.reset();
@@ -611,12 +629,13 @@ public void shouldHandleResetFromMultipleThreads() throws Throwable
611629
} );
612630

613631
executor.shutdown();
614-
executor.awaitTermination( 10, SECONDS );
632+
executor.awaitTermination( 20, SECONDS );
633+
634+
txFuture.get( 20, SECONDS );
635+
resetFuture.get( 20, SECONDS );
615636

616-
// Then the outcome of both statements should be visible
617-
StatementResult result = session.run( "MATCH (n) RETURN count(n)" );
618-
long nodes = result.single().get( "count(n)" ).asLong();
619-
assertThat( nodes, equalTo( 1L ) );
637+
assertEquals( 0, countNodes( "FirstNode" ) );
638+
assertEquals( 1, countNodes( "SecondNode" ) );
620639
}
621640

622641
private void testResetOfQueryWaitingForLock( NodeIdUpdater nodeIdUpdater ) throws Exception
@@ -812,10 +831,15 @@ private void awaitActiveQueriesToContain( String value )
812831
}
813832

814833
private long countNodes()
834+
{
835+
return countNodes( null );
836+
}
837+
838+
private long countNodes( String label )
815839
{
816840
try ( Session session = neo4j.driver().session() )
817841
{
818-
StatementResult result = session.run( "MATCH (n) RETURN count(n) AS result" );
842+
StatementResult result = session.run( "MATCH (n" + (label == null ? "" : ":" + label) + ") RETURN count(n) AS result" );
819843
return result.single().get( 0 ).asLong();
820844
}
821845
}

driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
*/
1919
package org.neo4j.driver.v1.integration;
2020

21+
import io.netty.channel.Channel;
22+
import io.netty.channel.ChannelPipeline;
23+
import io.netty.util.concurrent.Future;
2124
import org.junit.After;
2225
import org.junit.Before;
2326
import org.junit.Rule;
@@ -37,6 +40,12 @@
3740

3841
import org.neo4j.driver.internal.ExplicitTransaction;
3942
import org.neo4j.driver.internal.async.EventLoopGroupFactory;
43+
import org.neo4j.driver.internal.cluster.RoutingSettings;
44+
import org.neo4j.driver.internal.retry.RetrySettings;
45+
import org.neo4j.driver.internal.util.ChannelTrackingDriverFactory;
46+
import org.neo4j.driver.internal.util.Clock;
47+
import org.neo4j.driver.v1.Config;
48+
import org.neo4j.driver.v1.Driver;
4049
import org.neo4j.driver.v1.Record;
4150
import org.neo4j.driver.v1.Session;
4251
import org.neo4j.driver.v1.Statement;
@@ -67,6 +76,7 @@
6776
import static org.junit.Assert.assertTrue;
6877
import static org.junit.Assert.fail;
6978
import static org.junit.Assume.assumeTrue;
79+
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
7080
import static org.neo4j.driver.internal.util.Iterables.single;
7181
import static org.neo4j.driver.internal.util.Matchers.blockingOperationInEventLoopError;
7282
import static org.neo4j.driver.internal.util.Matchers.containsResultAvailableAfterAndResultConsumedAfter;
@@ -1308,6 +1318,18 @@ public void shouldBePossibleToRunMoreTransactionsAfterOneIsTerminated()
13081318
assertEquals( 1, countNodes( 42 ) );
13091319
}
13101320

1321+
@Test
1322+
public void shouldPropagateCommitFailureAfterFatalError()
1323+
{
1324+
testCommitAndRollbackFailurePropagation( true );
1325+
}
1326+
1327+
@Test
1328+
public void shouldPropagateRollbackFailureAfterFatalError()
1329+
{
1330+
testCommitAndRollbackFailurePropagation( false );
1331+
}
1332+
13111333
private int countNodes( Object id )
13121334
{
13131335
StatementResult result = session.run( "MATCH (n:Node {id: $id}) RETURN count(n)", parameters( "id", id ) );
@@ -1356,6 +1378,45 @@ private void testConsume( String query )
13561378
assertNull( await( cursor.nextAsync() ) );
13571379
}
13581380

1381+
private void testCommitAndRollbackFailurePropagation( boolean commit )
1382+
{
1383+
ChannelTrackingDriverFactory driverFactory = new ChannelTrackingDriverFactory( 1, Clock.SYSTEM );
1384+
Config config = Config.build().withLogging( DEV_NULL_LOGGING ).toConfig();
1385+
1386+
try ( Driver driver = driverFactory.newInstance( neo4j.uri(), neo4j.authToken(), RoutingSettings.DEFAULT, RetrySettings.DEFAULT, config ) )
1387+
{
1388+
try ( Session session = driver.session() )
1389+
{
1390+
Transaction tx = session.beginTransaction();
1391+
1392+
// run query but do not consume the result
1393+
tx.run( "UNWIND range(0, 10000) AS x RETURN x + 1" );
1394+
1395+
IOException ioError = new IOException( "Connection reset by peer" );
1396+
for ( Channel channel : driverFactory.channels() )
1397+
{
1398+
// make channel experience a fatal network error
1399+
// run in the event loop thread and wait for the whole operation to complete
1400+
Future<ChannelPipeline> future = channel.eventLoop().submit( () -> channel.pipeline().fireExceptionCaught( ioError ) );
1401+
await( future );
1402+
}
1403+
1404+
CompletionStage<Void> commitOrRollback = commit ? tx.commitAsync() : tx.rollbackAsync();
1405+
1406+
// commit/rollback should fail and propagate the network error
1407+
try
1408+
{
1409+
await( commitOrRollback );
1410+
fail( "Exception expected" );
1411+
}
1412+
catch ( ServiceUnavailableException e )
1413+
{
1414+
assertEquals( ioError, e.getCause() );
1415+
}
1416+
}
1417+
}
1418+
}
1419+
13591420
private void assumeDatabaseSupportsBookmarks()
13601421
{
13611422
assumeTrue( "Neo4j " + neo4j.version() + " does not support bookmarks",

0 commit comments

Comments
 (0)