Skip to content

Commit c78518f

Browse files
committed
Fixed error when connection killed during tx
Connection error can happen while transaction is executing. Closing transaction after such error should not perform rollback because connection is dead and database has cleaned up all resources. Previously close operation tried to perform rollback and failed with same `ServiceUnavailable` exception as the unsuccessful `#run()`. Error thrown from `#close()` has to be added as a suppressed error to the one thrown from `#run()`, when used in try-with-resources block. So code attempted to add error to itself as suppressed. This resulted in an `IllegalArgumentException`. This commit fixes the problem by making `#close()` not perform a rollback after a fatal connection error.
1 parent 4711db9 commit c78518f

File tree

8 files changed

+147
-32
lines changed

8 files changed

+147
-32
lines changed

driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.util.Map;
2222
import java.util.concurrent.CompletableFuture;
2323
import java.util.concurrent.CompletionStage;
24-
import java.util.function.BiConsumer;
2524
import java.util.function.BiFunction;
2625

2726
import org.neo4j.driver.internal.async.QueryRunner;
@@ -69,7 +68,8 @@ private enum State
6968
MARKED_FAILED( true ),
7069

7170
/**
72-
* This transaction has been explicitly terminated by calling {@link Session#reset()}.
71+
* This transaction has been terminated either because of explicit {@link Session#reset()} or because of a
72+
* fatal connection error.
7373
*/
7474
TERMINATED( false ),
7575

@@ -181,14 +181,13 @@ else if ( state == State.ROLLED_BACK )
181181
}
182182
else if ( state == State.TERMINATED )
183183
{
184-
return failedFuture(
185-
new ClientException( "Can't commit, transaction has been terminated by `Session#reset()`" ) );
184+
return failedFuture( new ClientException( "Can't commit, transaction has been terminated" ) );
186185
}
187186
else
188187
{
189188
return resultCursors.retrieveNotConsumedError()
190189
.thenCompose( error -> doCommitAsync().handle( handleCommitOrRollback( error ) ) )
191-
.whenComplete( transactionClosed( State.COMMITTED ) );
190+
.whenComplete( ( ignore, error ) -> transactionClosed( State.COMMITTED ) );
192191
}
193192
}
194193

@@ -205,15 +204,15 @@ else if ( state == State.ROLLED_BACK )
205204
}
206205
else if ( state == State.TERMINATED )
207206
{
208-
// transaction has been terminated by RESET and should be rolled back by the database
209-
state = State.ROLLED_BACK;
207+
// no need for explicit rollback, transaction should've been rolled back by the database
208+
transactionClosed( State.ROLLED_BACK );
210209
return completedWithNull();
211210
}
212211
else
213212
{
214213
return resultCursors.retrieveNotConsumedError()
215214
.thenCompose( error -> doRollbackAsync().handle( handleCommitOrRollback( error ) ) )
216-
.whenComplete( transactionClosed( State.ROLLED_BACK ) );
215+
.whenComplete( ( ignore, error ) -> transactionClosed( State.ROLLED_BACK ) );
217216
}
218217
}
219218

@@ -314,8 +313,7 @@ else if ( state == State.MARKED_FAILED )
314313
}
315314
else if ( state == State.TERMINATED )
316315
{
317-
throw new ClientException(
318-
"Cannot run more statements in this transaction, it has been terminated by `Session#reset()`" );
316+
throw new ClientException( "Cannot run more statements in this transaction, it has been terminated" );
319317
}
320318
}
321319

@@ -394,14 +392,11 @@ else if ( commitOrRollbackError != null )
394392
};
395393
}
396394

397-
private BiConsumer<Object,Throwable> transactionClosed( State newState )
395+
private void transactionClosed( State newState )
398396
{
399-
return ( ignore, error ) ->
400-
{
401-
state = newState;
402-
connection.release(); // release in background
403-
session.setBookmark( bookmark );
404-
};
397+
state = newState;
398+
connection.release(); // release in background
399+
session.setBookmark( bookmark );
405400
}
406401

407402
private void terminateConnectionOnThreadInterrupt( String reason )

driver/src/main/java/org/neo4j/driver/internal/handlers/TransactionPullAllResponseHandler.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.neo4j.driver.internal.ExplicitTransaction;
2222
import org.neo4j.driver.internal.spi.Connection;
23+
import org.neo4j.driver.internal.util.ErrorUtil;
2324
import org.neo4j.driver.v1.Statement;
2425

2526
import static java.util.Objects.requireNonNull;
@@ -43,6 +44,13 @@ protected void afterSuccess()
4344
@Override
4445
protected void afterFailure( Throwable error )
4546
{
46-
tx.failure();
47+
if ( ErrorUtil.isFatal( error ) )
48+
{
49+
tx.markTerminated();
50+
}
51+
else
52+
{
53+
tx.failure();
54+
}
4755
}
4856
}

driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,18 @@ public void shouldNotReleaseConnectionWhenBeginSucceeds()
256256
verify( connection, never() ).release();
257257
}
258258

259+
@Test
260+
public void shouldReleaseConnectionWhenTerminatedAndRolledBack()
261+
{
262+
Connection connection = connectionMock();
263+
ExplicitTransaction tx = new ExplicitTransaction( connection, mock( NetworkSession.class ) );
264+
265+
tx.markTerminated();
266+
await( tx.rollbackAsync() );
267+
268+
verify( connection ).release();
269+
}
270+
259271
private static ExplicitTransaction beginTx( Connection connection )
260272
{
261273
return beginTx( connection, Bookmark.empty() );

driver/src/test/java/org/neo4j/driver/internal/handlers/TransactionPullAllResponseHandlerTest.java

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,18 @@
2020

2121
import org.junit.Test;
2222

23+
import java.io.IOException;
2324
import java.util.concurrent.CompletableFuture;
2425

2526
import org.neo4j.driver.internal.BoltServerAddress;
2627
import org.neo4j.driver.internal.ExplicitTransaction;
2728
import org.neo4j.driver.internal.spi.Connection;
2829
import org.neo4j.driver.internal.util.ServerVersion;
2930
import org.neo4j.driver.v1.Statement;
31+
import org.neo4j.driver.v1.exceptions.ClientException;
32+
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
33+
import org.neo4j.driver.v1.exceptions.SessionExpiredException;
34+
import org.neo4j.driver.v1.exceptions.TransientException;
3035

3136
import static org.mockito.Mockito.mock;
3237
import static org.mockito.Mockito.verify;
@@ -35,7 +40,26 @@
3540
public class TransactionPullAllResponseHandlerTest
3641
{
3742
@Test
38-
public void shouldMarkTransactionAsFailedOnFailure()
43+
public void shouldMarkTransactionAsFailedOnNonFatalFailures()
44+
{
45+
testErrorHandling( new ClientException( "Neo.ClientError.Cluster.NotALeader", "" ), false );
46+
testErrorHandling( new ClientException( "Neo.ClientError.Procedure.ProcedureCallFailed", "" ), false );
47+
testErrorHandling( new TransientException( "Neo.TransientError.Transaction.Terminated", "" ), false );
48+
testErrorHandling( new TransientException( "Neo.TransientError.General.DatabaseUnavailable", "" ), false );
49+
}
50+
51+
@Test
52+
public void shouldMarkTransactionAsTerminatedOnFatalFailures()
53+
{
54+
testErrorHandling( new RuntimeException(), true );
55+
testErrorHandling( new IOException(), true );
56+
testErrorHandling( new ServiceUnavailableException( "" ), true );
57+
testErrorHandling( new SessionExpiredException( "" ), true );
58+
testErrorHandling( new SessionExpiredException( "" ), true );
59+
testErrorHandling( new ClientException( "Neo.ClientError.Request.Invalid" ), true );
60+
}
61+
62+
private static void testErrorHandling( Throwable error, boolean fatal )
3963
{
4064
Connection connection = mock( Connection.class );
4165
when( connection.serverAddress() ).thenReturn( BoltServerAddress.LOCAL_DEFAULT );
@@ -44,8 +68,15 @@ public void shouldMarkTransactionAsFailedOnFailure()
4468
TransactionPullAllResponseHandler handler = new TransactionPullAllResponseHandler( new Statement( "RETURN 1" ),
4569
new RunResponseHandler( new CompletableFuture<>() ), connection, tx );
4670

47-
handler.onFailure( new RuntimeException() );
71+
handler.onFailure( error );
4872

49-
verify( tx ).failure();
73+
if ( fatal )
74+
{
75+
verify( tx ).markTerminated();
76+
}
77+
else
78+
{
79+
verify( tx ).failure();
80+
}
5081
}
5182
}

driver/src/test/java/org/neo4j/driver/internal/util/ChannelTrackingDriverFactory.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
import org.neo4j.driver.internal.BoltServerAddress;
2929
import org.neo4j.driver.internal.ConnectionSettings;
30+
import org.neo4j.driver.internal.async.BootstrapFactory;
3031
import org.neo4j.driver.internal.async.ChannelConnector;
3132
import org.neo4j.driver.internal.security.SecurityPlan;
3233
import org.neo4j.driver.internal.spi.ConnectionPool;
@@ -36,11 +37,24 @@
3637
public class ChannelTrackingDriverFactory extends DriverFactoryWithClock
3738
{
3839
private final List<Channel> channels = new CopyOnWriteArrayList<>();
40+
private final int eventLoopThreads;
3941
private ConnectionPool pool;
4042

4143
public ChannelTrackingDriverFactory( Clock clock )
44+
{
45+
this( 0, clock );
46+
}
47+
48+
public ChannelTrackingDriverFactory( int eventLoopThreads, Clock clock )
4249
{
4350
super( clock );
51+
this.eventLoopThreads = eventLoopThreads;
52+
}
53+
54+
@Override
55+
protected Bootstrap createBootstrap()
56+
{
57+
return eventLoopThreads == 0 ? super.createBootstrap() : BootstrapFactory.newBootstrap( eventLoopThreads );
4458
}
4559

4660
@Override

driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -263,8 +263,7 @@ public void shouldAllowBeginTxIfResetFailureIsNotConsumed() throws Throwable
263263
assertThat( tx2, notNullValue() );
264264

265265
exception.expect( ClientException.class ); // errors differ depending of neo4j version
266-
exception.expectMessage(
267-
"Cannot run more statements in this transaction, it has been terminated by `Session#reset()`" );
266+
exception.expectMessage( "Cannot run more statements in this transaction, it has been terminated" );
268267

269268
tx1.run( "RETURN 1" );
270269
}

driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -653,15 +653,16 @@ public void shouldFailWhenListTransformationFunctionFails()
653653
}
654654

655655
@Test
656-
public void shouldFailWhenServerIsRestarted()
656+
public void shouldFailToCommitWhenServerIsRestarted()
657657
{
658658
Transaction tx = await( session.beginTransactionAsync() );
659659

660+
await( tx.runAsync( "CREATE ()" ) );
661+
660662
neo4j.killDb();
661663

662664
try
663665
{
664-
await( tx.runAsync( "CREATE ()" ) );
665666
await( tx.commitAsync() );
666667
fail( "Exception expected" );
667668
}
@@ -806,7 +807,7 @@ public void shouldFailToCommitAfterTermination()
806807
}
807808
catch ( ClientException e )
808809
{
809-
assertEquals( "Can't commit, transaction has been terminated by `Session#reset()`", e.getMessage() );
810+
assertEquals( "Can't commit, transaction has been terminated", e.getMessage() );
810811
}
811812
assertFalse( tx.isOpen() );
812813
}
@@ -924,8 +925,7 @@ public void shouldFailToRunQueryWhenTerminated()
924925
}
925926
catch ( ClientException e )
926927
{
927-
assertEquals( "Cannot run more statements in this transaction, it has been terminated by `Session#reset()`",
928-
e.getMessage() );
928+
assertEquals( "Cannot run more statements in this transaction, it has been terminated", e.getMessage() );
929929
}
930930
}
931931

driver/src/test/java/org/neo4j/driver/v1/integration/TransactionIT.java

Lines changed: 60 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.neo4j.driver.v1.integration;
2020

21+
import io.netty.channel.Channel;
2122
import org.junit.Rule;
2223
import org.junit.Test;
2324
import org.junit.rules.ExpectedException;
@@ -27,6 +28,11 @@
2728
import java.util.concurrent.Executors;
2829
import java.util.concurrent.TimeUnit;
2930

31+
import org.neo4j.driver.internal.cluster.RoutingSettings;
32+
import org.neo4j.driver.internal.util.ChannelTrackingDriverFactory;
33+
import org.neo4j.driver.internal.util.Clock;
34+
import org.neo4j.driver.v1.Config;
35+
import org.neo4j.driver.v1.Driver;
3036
import org.neo4j.driver.v1.Record;
3137
import org.neo4j.driver.v1.Session;
3238
import org.neo4j.driver.v1.StatementResult;
@@ -39,12 +45,14 @@
3945

4046
import static org.hamcrest.CoreMatchers.containsString;
4147
import static org.hamcrest.CoreMatchers.equalTo;
42-
import static org.hamcrest.MatcherAssert.assertThat;
4348
import static org.junit.Assert.assertEquals;
4449
import static org.junit.Assert.assertFalse;
4550
import static org.junit.Assert.assertNotNull;
51+
import static org.junit.Assert.assertThat;
4652
import static org.junit.Assert.assertTrue;
4753
import static org.junit.Assert.fail;
54+
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
55+
import static org.neo4j.driver.internal.retry.RetrySettings.DEFAULT;
4856

4957
public class TransactionIT
5058
{
@@ -249,7 +257,7 @@ public void shouldHandleResetBeforeRun() throws Throwable
249257
{
250258
// Expect
251259
exception.expect( ClientException.class );
252-
exception.expectMessage( "Cannot run more statements in this transaction, it has been terminated by" );
260+
exception.expectMessage( "Cannot run more statements in this transaction, it has been terminated" );
253261
// When
254262
Transaction tx = session.beginTransaction();
255263
session.reset();
@@ -391,7 +399,7 @@ public void shouldPropagateFailureFromSummary()
391399
@Test
392400
public void shouldBeResponsiveToThreadInterruptWhenWaitingForResult() throws Exception
393401
{
394-
try ( Session otherSession = this.session.driver().session() )
402+
try ( Session otherSession = session.driver().session() )
395403
{
396404
session.run( "CREATE (:Person {name: 'Beta Ray Bill'})" ).consume();
397405

@@ -425,7 +433,7 @@ public void shouldBeResponsiveToThreadInterruptWhenWaitingForResult() throws Exc
425433
@Test
426434
public void shouldBeResponsiveToThreadInterruptWhenWaitingForCommit() throws Exception
427435
{
428-
try ( Session otherSession = this.session.driver().session() )
436+
try ( Session otherSession = session.driver().session() )
429437
{
430438
session.run( "CREATE (:Person {name: 'Beta Ray Bill'})" ).consume();
431439

@@ -458,4 +466,52 @@ public void shouldBeResponsiveToThreadInterruptWhenWaitingForCommit() throws Exc
458466
}
459467
}
460468
}
469+
470+
@Test
471+
public void shouldThrowWhenConnectionKilledDuringTransaction()
472+
{
473+
testFailWhenConnectionKilledDuringTransaction( false );
474+
}
475+
476+
@Test
477+
public void shouldThrowWhenConnectionKilledDuringTransactionMarkedForSuccess()
478+
{
479+
testFailWhenConnectionKilledDuringTransaction( true );
480+
}
481+
482+
private void testFailWhenConnectionKilledDuringTransaction( boolean markForSuccess )
483+
{
484+
ChannelTrackingDriverFactory factory = new ChannelTrackingDriverFactory( 1, Clock.SYSTEM );
485+
RoutingSettings instance = new RoutingSettings( 1, 0 );
486+
Config config = Config.build().withLogging( DEV_NULL_LOGGING ).toConfig();
487+
488+
try ( Driver driver = factory.newInstance( session.uri(), session.authToken(), instance, DEFAULT, config ) )
489+
{
490+
try ( Session session = driver.session();
491+
Transaction tx = session.beginTransaction() )
492+
{
493+
tx.run( "CREATE (:MyNode {id: 1})" ).consume();
494+
495+
if ( markForSuccess )
496+
{
497+
tx.success();
498+
}
499+
500+
// kill all network channels
501+
for ( Channel channel : factory.channels() )
502+
{
503+
channel.close().syncUninterruptibly();
504+
}
505+
506+
tx.run( "CREATE (:MyNode {id: 1})" ).consume();
507+
fail( "Exception expected" );
508+
}
509+
catch ( ServiceUnavailableException e )
510+
{
511+
assertThat( e.getMessage(), containsString( "Connection to the database terminated" ) );
512+
}
513+
}
514+
515+
assertEquals( 0, session.run( "MATCH (n:MyNode {id: 1}) RETURN count(n)" ).single().get( 0 ).asInt() );
516+
}
461517
}

0 commit comments

Comments
 (0)