Skip to content

Commit 5705f9b

Browse files
committed
Call close on UnmanagedTransaction where possible
Calling `close` instead of separate `isOpen` and `commitAsync` requires less lock acquisitions and is safer.
1 parent 8929bd4 commit 5705f9b

File tree

5 files changed

+35
-41
lines changed

5 files changed

+35
-41
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncSession.java

Lines changed: 22 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ private <T> void executeWork(CompletableFuture<T> resultFuture, UnmanagedTransac
146146
Throwable error = Futures.completionExceptionCause( completionError );
147147
if ( error != null )
148148
{
149-
rollbackTxAfterFailedTransactionWork( tx, resultFuture, error );
149+
closeTxAfterFailedTransactionWork( tx, resultFuture, error );
150150
}
151151
else
152152
{
@@ -174,43 +174,33 @@ private <T> CompletionStage<T> safeExecuteWork(UnmanagedTransaction tx, AsyncTra
174174
}
175175
}
176176

177-
private <T> void rollbackTxAfterFailedTransactionWork(UnmanagedTransaction tx, CompletableFuture<T> resultFuture, Throwable error )
177+
private <T> void closeTxAfterFailedTransactionWork( UnmanagedTransaction tx, CompletableFuture<T> resultFuture, Throwable error )
178178
{
179-
if ( tx.isOpen() )
180-
{
181-
tx.rollbackAsync().whenComplete( ( ignore, rollbackError ) -> {
182-
if ( rollbackError != null )
179+
tx.closeAsync().whenComplete(
180+
( ignore, rollbackError ) ->
183181
{
184-
error.addSuppressed( rollbackError );
185-
}
186-
resultFuture.completeExceptionally( error );
187-
} );
188-
}
189-
else
190-
{
191-
resultFuture.completeExceptionally( error );
192-
}
182+
if ( rollbackError != null )
183+
{
184+
error.addSuppressed( rollbackError );
185+
}
186+
resultFuture.completeExceptionally( error );
187+
} );
193188
}
194189

195190
private <T> void closeTxAfterSucceededTransactionWork(UnmanagedTransaction tx, CompletableFuture<T> resultFuture, T result )
196191
{
197-
if ( tx.isOpen() )
198-
{
199-
tx.commitAsync().whenComplete( ( ignore, completionError ) -> {
200-
Throwable commitError = Futures.completionExceptionCause( completionError );
201-
if ( commitError != null )
192+
tx.closeAsync( true ).whenComplete(
193+
( ignore, completionError ) ->
202194
{
203-
resultFuture.completeExceptionally( commitError );
204-
}
205-
else
206-
{
207-
resultFuture.complete( result );
208-
}
209-
} );
210-
}
211-
else
212-
{
213-
resultFuture.complete( result );
214-
}
195+
Throwable commitError = Futures.completionExceptionCause( completionError );
196+
if ( commitError != null )
197+
{
198+
resultFuture.completeExceptionally( commitError );
199+
}
200+
else
201+
{
202+
resultFuture.complete( result );
203+
}
204+
} );
215205
}
216206
}

driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,12 @@ else if ( beginError instanceof ConnectionReadTimeoutException )
134134

135135
public CompletionStage<Void> closeAsync()
136136
{
137-
return closeAsync( false, true );
137+
return closeAsync( false );
138+
}
139+
140+
public CompletionStage<Void> closeAsync( boolean commit )
141+
{
142+
return closeAsync( commit, true );
138143
}
139144

140145
public CompletionStage<Void> commitAsync()

driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ public <T> Publisher<T> writeTransaction( RxTransactionWork<? extends Publisher<
130130
private <T> Publisher<T> runTransaction( AccessMode mode, RxTransactionWork<? extends Publisher<T>> work, TransactionConfig config )
131131
{
132132
Flux<T> repeatableWork = Flux.usingWhen( beginTransaction( mode, config ), work::execute,
133-
InternalRxTransaction::commitIfOpen, ( tx, error ) -> tx.close(), InternalRxTransaction::close );
133+
tx -> tx.close( true ), ( tx, error ) -> tx.close(), InternalRxTransaction::close );
134134
return session.retryLogic().retryRx( repeatableWork );
135135
}
136136

driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxTransaction.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import org.neo4j.driver.reactive.RxTransaction;
3131

3232
import static org.neo4j.driver.internal.reactive.RxUtils.createEmptyPublisher;
33-
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
3433

3534
public class InternalRxTransaction extends AbstractRxQueryRunner implements RxTransaction
3635
{
@@ -77,13 +76,13 @@ public <T> Publisher<T> rollback()
7776
return createEmptyPublisher( tx::rollbackAsync );
7877
}
7978

80-
Publisher<Void> commitIfOpen()
79+
Publisher<Void> close()
8180
{
82-
return createEmptyPublisher( () -> tx.isOpen() ? tx.commitAsync() : completedWithNull() );
81+
return close( false );
8382
}
8483

85-
Publisher<Void> close()
84+
Publisher<Void> close( boolean commit )
8685
{
87-
return createEmptyPublisher( tx::closeAsync );
86+
return createEmptyPublisher( () -> tx.closeAsync( commit ) );
8887
}
8988
}

driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxTransactionTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ void shouldCommitWhenOpen()
147147
when( tx.commitAsync() ).thenReturn( Futures.completedWithNull() );
148148

149149
InternalRxTransaction rxTx = new InternalRxTransaction( tx );
150-
Publisher<Void> publisher = rxTx.commitIfOpen();
150+
Publisher<Void> publisher = rxTx.close( true );
151151
StepVerifier.create( publisher ).verifyComplete();
152152

153153
verify( tx ).commitAsync();
@@ -161,7 +161,7 @@ void shouldNotCommitWhenNotOpen()
161161
when( tx.commitAsync() ).thenReturn( Futures.completedWithNull() );
162162

163163
InternalRxTransaction rxTx = new InternalRxTransaction( tx );
164-
Publisher<Void> publisher = rxTx.commitIfOpen();
164+
Publisher<Void> publisher = rxTx.close( true );
165165
StepVerifier.create( publisher ).verifyComplete();
166166

167167
verify( tx, never() ).commitAsync();

0 commit comments

Comments
 (0)