Skip to content

Commit 5864fb6

Browse files
authored
Merge pull request #460 from lutovich/1.5-close-tx-in-session-close
Close open transaction when session is closed
2 parents bdb51bf + b0d0935 commit 5864fb6

File tree

5 files changed

+196
-46
lines changed

5 files changed

+196
-46
lines changed

driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java

Lines changed: 5 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.util.Map;
2222
import java.util.concurrent.CompletableFuture;
23+
import java.util.concurrent.CompletionException;
2324
import java.util.concurrent.CompletionStage;
2425
import java.util.function.BiFunction;
2526

@@ -361,28 +362,12 @@ private BiFunction<Void,Throwable,Void> handleCommitOrRollback( Throwable cursor
361362
{
362363
return ( ignore, commitOrRollbackError ) ->
363364
{
364-
if ( cursorFailure != null && commitOrRollbackError != null )
365+
CompletionException combinedError = Futures.combineErrors( cursorFailure, commitOrRollbackError );
366+
if ( combinedError != null )
365367
{
366-
Throwable cause1 = Futures.completionExceptionCause( cursorFailure );
367-
Throwable cause2 = Futures.completionExceptionCause( commitOrRollbackError );
368-
if ( cause1 != cause2 )
369-
{
370-
cause1.addSuppressed( cause2 );
371-
}
372-
throw Futures.asCompletionException( cause1 );
373-
}
374-
else if ( cursorFailure != null )
375-
{
376-
throw Futures.asCompletionException( cursorFailure );
377-
}
378-
else if ( commitOrRollbackError != null )
379-
{
380-
throw Futures.asCompletionException( commitOrRollbackError );
381-
}
382-
else
383-
{
384-
return null;
368+
throw combinedError;
385369
}
370+
return null;
386371
};
387372
}
388373

driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java

Lines changed: 23 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -167,24 +167,23 @@ public CompletionStage<Void> closeAsync()
167167
{
168168
return resultCursorStage.thenCompose( cursor ->
169169
{
170-
if ( cursor == null )
170+
if ( cursor != null )
171171
{
172-
return completedWithNull();
172+
// there exists a cursor with potentially unconsumed error, try to extract and propagate it
173+
return cursor.failureAsync();
173174
}
174-
return cursor.failureAsync();
175-
} ).thenCompose( error -> releaseResources().thenApply( ignore ->
175+
// no result cursor exists so no error exists
176+
return completedWithNull();
177+
} ).thenCompose( cursorError -> closeTransactionAndReleaseConnection().thenApply( txCloseError ->
176178
{
177-
if ( error != null )
178-
{
179-
// connection has been acquired and there is an unconsumed error in result cursor
180-
throw Futures.asCompletionException( error );
181-
}
182-
else
179+
// now we have cursor error, active transaction has been closed and connection has been released
180+
// back to the pool; try to propagate cursor and transaction close errors, if any
181+
CompletionException combinedError = Futures.combineErrors( cursorError, txCloseError );
182+
if ( combinedError != null )
183183
{
184-
// either connection acquisition failed or
185-
// there are no unconsumed errors in the result cursor
186-
return null;
184+
throw combinedError;
187185
}
186+
return null;
188187
} ) );
189188
}
190189
return completedWithNull();
@@ -520,26 +519,22 @@ private CompletionStage<Connection> acquireConnection( AccessMode mode )
520519
return newConnectionStage;
521520
}
522521

523-
private CompletionStage<Void> releaseResources()
524-
{
525-
return rollbackTransaction().thenCompose( ignore -> releaseConnection() );
526-
}
527-
528-
private CompletionStage<Void> rollbackTransaction()
522+
private CompletionStage<Throwable> closeTransactionAndReleaseConnection()
529523
{
530524
return existingTransactionOrNull().thenCompose( tx ->
531525
{
532526
if ( tx != null )
533527
{
534-
return tx.rollbackAsync();
528+
// there exists an open transaction, let's close it and propagate the error, if any
529+
return tx.closeAsync()
530+
.thenApply( ignore -> (Throwable) null )
531+
.exceptionally( error -> error );
535532
}
533+
// no open transaction so nothing to close
536534
return completedWithNull();
537-
} ).exceptionally( error ->
538-
{
539-
Throwable cause = Futures.completionExceptionCause( error );
540-
logger.warn( "Active transaction rolled back with an error", cause );
541-
return null;
542-
} );
535+
} ).thenCompose( txCloseError ->
536+
// then release the connection and propagate transaction close error, if any
537+
releaseConnection().thenApply( ignore -> txCloseError ) );
543538
}
544539

545540
private CompletionStage<Void> releaseConnection()
@@ -548,8 +543,10 @@ private CompletionStage<Void> releaseConnection()
548543
{
549544
if ( connection != null )
550545
{
546+
// there exists connection, try to release it back to the pool
551547
return connection.release();
552548
}
549+
// no connection so return null
553550
return completedWithNull();
554551
} );
555552
}

driver/src/main/java/org/neo4j/driver/internal/util/Futures.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,40 @@ public static CompletionException asCompletionException( Throwable error )
172172
return new CompletionException( error );
173173
}
174174

175+
/**
176+
* Combine given errors into a single {@link CompletionException} to be rethrown from inside a
177+
* {@link CompletionStage} chain.
178+
*
179+
* @param error1 the first error or {@code null}.
180+
* @param error2 the second error or {@code null}.
181+
* @return {@code null} if both errors are null, {@link CompletionException} otherwise.
182+
*/
183+
public static CompletionException combineErrors( Throwable error1, Throwable error2 )
184+
{
185+
if ( error1 != null && error2 != null )
186+
{
187+
Throwable cause1 = completionExceptionCause( error1 );
188+
Throwable cause2 = completionExceptionCause( error2 );
189+
if ( cause1 != cause2 )
190+
{
191+
cause1.addSuppressed( cause2 );
192+
}
193+
return asCompletionException( cause1 );
194+
}
195+
else if ( error1 != null )
196+
{
197+
return asCompletionException( error1 );
198+
}
199+
else if ( error2 != null )
200+
{
201+
return asCompletionException( error2 );
202+
}
203+
else
204+
{
205+
return null;
206+
}
207+
}
208+
175209
private static void safeRun( Runnable runnable )
176210
{
177211
try

driver/src/test/java/org/neo4j/driver/internal/util/FuturesTest.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.neo4j.driver.internal.async.EventLoopGroupFactory;
3838

3939
import static org.hamcrest.Matchers.is;
40+
import static org.junit.Assert.assertArrayEquals;
4041
import static org.junit.Assert.assertEquals;
4142
import static org.junit.Assert.assertFalse;
4243
import static org.junit.Assert.assertNull;
@@ -360,4 +361,43 @@ public void shouldKeepCompletionExceptionAsIs()
360361
CompletionException error = new CompletionException( new RuntimeException( "Hello" ) );
361362
assertEquals( error, Futures.asCompletionException( error ) );
362363
}
364+
365+
@Test
366+
public void shouldCombineTwoErrors()
367+
{
368+
RuntimeException error1 = new RuntimeException( "Error1" );
369+
RuntimeException error2Cause = new RuntimeException( "Error2" );
370+
CompletionException error2 = new CompletionException( error2Cause );
371+
372+
CompletionException combined = Futures.combineErrors( error1, error2 );
373+
374+
assertEquals( error1, combined.getCause() );
375+
assertArrayEquals( new Throwable[]{error2Cause}, combined.getCause().getSuppressed() );
376+
}
377+
378+
@Test
379+
public void shouldCombineErrorAndNull()
380+
{
381+
RuntimeException error1 = new RuntimeException( "Error1" );
382+
383+
CompletionException combined = Futures.combineErrors( error1, null );
384+
385+
assertEquals( error1, combined.getCause() );
386+
}
387+
388+
@Test
389+
public void shouldCombineNullAndError()
390+
{
391+
RuntimeException error2 = new RuntimeException( "Error2" );
392+
393+
CompletionException combined = Futures.combineErrors( null, error2 );
394+
395+
assertEquals( error2, combined.getCause() );
396+
}
397+
398+
@Test
399+
public void shouldCombineNullAndNullErrors()
400+
{
401+
assertNull( Futures.combineErrors( null, null ) );
402+
}
363403
}

driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,16 +37,19 @@
3737
import java.util.concurrent.Future;
3838
import java.util.concurrent.TimeUnit;
3939
import java.util.concurrent.atomic.AtomicInteger;
40+
import java.util.logging.Level;
4041

4142
import org.neo4j.driver.internal.DriverFactory;
4243
import org.neo4j.driver.internal.cluster.RoutingContext;
4344
import org.neo4j.driver.internal.cluster.RoutingSettings;
45+
import org.neo4j.driver.internal.logging.ConsoleLogging;
4446
import org.neo4j.driver.internal.retry.RetrySettings;
4547
import org.neo4j.driver.internal.util.DriverFactoryWithFixedRetryLogic;
4648
import org.neo4j.driver.internal.util.DriverFactoryWithOneEventLoopThread;
4749
import org.neo4j.driver.internal.util.ServerVersion;
4850
import org.neo4j.driver.v1.AccessMode;
4951
import org.neo4j.driver.v1.AuthToken;
52+
import org.neo4j.driver.v1.AuthTokens;
5053
import org.neo4j.driver.v1.Config;
5154
import org.neo4j.driver.v1.Driver;
5255
import org.neo4j.driver.v1.GraphDatabase;
@@ -62,6 +65,7 @@
6265
import org.neo4j.driver.v1.exceptions.TransientException;
6366
import org.neo4j.driver.v1.summary.ResultSummary;
6467
import org.neo4j.driver.v1.summary.StatementType;
68+
import org.neo4j.driver.v1.util.StubServer;
6569
import org.neo4j.driver.v1.util.TestNeo4j;
6670
import org.neo4j.driver.v1.util.TestUtil;
6771

@@ -1296,6 +1300,50 @@ public void shouldNotAllowStartingMultipleTransactions()
12961300
}
12971301
}
12981302

1303+
@Test
1304+
public void shouldCloseOpenTransactionWhenClosed()
1305+
{
1306+
try ( Session session = neo4j.driver().session() )
1307+
{
1308+
Transaction tx = session.beginTransaction();
1309+
tx.run( "CREATE (:Node {id: 123})" );
1310+
tx.run( "CREATE (:Node {id: 456})" );
1311+
1312+
tx.success();
1313+
}
1314+
1315+
assertEquals( 1, countNodesWithId( 123 ) );
1316+
assertEquals( 1, countNodesWithId( 456 ) );
1317+
}
1318+
1319+
@Test
1320+
public void shouldRollbackOpenTransactionWhenClosed()
1321+
{
1322+
try ( Session session = neo4j.driver().session() )
1323+
{
1324+
Transaction tx = session.beginTransaction();
1325+
tx.run( "CREATE (:Node {id: 123})" );
1326+
tx.run( "CREATE (:Node {id: 456})" );
1327+
1328+
tx.failure();
1329+
}
1330+
1331+
assertEquals( 0, countNodesWithId( 123 ) );
1332+
assertEquals( 0, countNodesWithId( 456 ) );
1333+
}
1334+
1335+
@Test
1336+
public void shouldPropagateTransactionCommitErrorWhenClosed() throws Exception
1337+
{
1338+
testTransactionCloseErrorPropagationWhenSessionClosed( "commit_error.script", true, "Unable to commit" );
1339+
}
1340+
1341+
@Test
1342+
public void shouldPropagateTransactionRollbackErrorWhenClosed() throws Exception
1343+
{
1344+
testTransactionCloseErrorPropagationWhenSessionClosed( "rollback_error.script", false, "Unable to rollback" );
1345+
}
1346+
12991347
private void testExecuteReadTx( AccessMode sessionMode )
13001348
{
13011349
Driver driver = neo4j.driver();
@@ -1501,6 +1549,52 @@ private static void await( CountDownLatch latch )
15011549
}
15021550
}
15031551

1552+
private static void testTransactionCloseErrorPropagationWhenSessionClosed( String script, boolean commit,
1553+
String expectedErrorMessage ) throws Exception
1554+
{
1555+
StubServer server = StubServer.start( script, 9001 );
1556+
try
1557+
{
1558+
Config config = Config.build()
1559+
.withLogging( DEV_NULL_LOGGING )
1560+
.withLogging( new ConsoleLogging( Level.INFO ) )
1561+
.withoutEncryption()
1562+
.toConfig();
1563+
try ( Driver driver = GraphDatabase.driver( "bolt://localhost:9001", AuthTokens.none(), config ) )
1564+
{
1565+
Session session = driver.session();
1566+
1567+
Transaction tx = session.beginTransaction();
1568+
StatementResult result = tx.run( "CREATE (n {name:'Alice'}) RETURN n.name AS name" );
1569+
assertEquals( "Alice", result.single().get( "name" ).asString() );
1570+
1571+
if ( commit )
1572+
{
1573+
tx.success();
1574+
}
1575+
else
1576+
{
1577+
tx.failure();
1578+
}
1579+
1580+
try
1581+
{
1582+
session.close();
1583+
fail( "Exception expected" );
1584+
}
1585+
catch ( TransientException e )
1586+
{
1587+
assertEquals( "Neo.TransientError.General.DatabaseUnavailable", e.code() );
1588+
assertEquals( expectedErrorMessage, e.getMessage() );
1589+
}
1590+
}
1591+
}
1592+
finally
1593+
{
1594+
assertEquals( 0, server.exitStatus() );
1595+
}
1596+
}
1597+
15041598
private static class ThrowingWork implements TransactionWork<Record>
15051599
{
15061600
final String query;

0 commit comments

Comments
 (0)