Skip to content

Commit de16d52

Browse files
authored
Fix reactive transaction function resource cleanup logic (#1009) (#1013)
This update fixes an issue when transaction function could fail if transaction has been explicitly committed or an explicit commit has failed.
1 parent 44fc1fd commit de16d52

File tree

5 files changed

+65
-30
lines changed

5 files changed

+65
-30
lines changed

driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,12 +81,12 @@ public Publisher<RxTransaction> beginTransaction( TransactionConfig config )
8181
}, () -> new IllegalStateException( "Unexpected condition, begin transaction call has completed successfully with transaction being null" ) );
8282
}
8383

84-
private Publisher<RxTransaction> beginTransaction( AccessMode mode, TransactionConfig config )
84+
private Publisher<InternalRxTransaction> beginTransaction( AccessMode mode, TransactionConfig config )
8585
{
8686
return createSingleItemPublisher(
8787
() ->
8888
{
89-
CompletableFuture<RxTransaction> txFuture = new CompletableFuture<>();
89+
CompletableFuture<InternalRxTransaction> txFuture = new CompletableFuture<>();
9090
session.beginTransactionAsync( mode, config ).whenComplete(
9191
( tx, completionError ) ->
9292
{
@@ -130,7 +130,7 @@ public <T> Publisher<T> writeTransaction( RxTransactionWork<? extends Publisher<
130130
private <T> Publisher<T> runTransaction( AccessMode mode, RxTransactionWork<? extends Publisher<T>> work, TransactionConfig config )
131131
{
132132
Flux<T> repeatableWork = Flux.usingWhen( beginTransaction( mode, config ), work::execute,
133-
RxTransaction::commit, ( tx, error ) -> tx.rollback(), null );
133+
InternalRxTransaction::commitIfOpen, ( tx, error ) -> tx.close(), null );
134134
return session.retryLogic().retryRx( repeatableWork );
135135
}
136136

driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxTransaction.java

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,19 @@
1818
*/
1919
package org.neo4j.driver.internal.reactive;
2020

21-
import org.neo4j.driver.Query;
2221
import org.reactivestreams.Publisher;
2322

2423
import java.util.concurrent.CompletableFuture;
2524

25+
import org.neo4j.driver.Query;
2626
import org.neo4j.driver.internal.async.UnmanagedTransaction;
2727
import org.neo4j.driver.internal.cursor.RxResultCursor;
2828
import org.neo4j.driver.internal.util.Futures;
2929
import org.neo4j.driver.reactive.RxResult;
3030
import org.neo4j.driver.reactive.RxTransaction;
3131

3232
import static org.neo4j.driver.internal.reactive.RxUtils.createEmptyPublisher;
33+
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
3334

3435
public class InternalRxTransaction extends AbstractRxQueryRunner implements RxTransaction
3536
{
@@ -67,26 +68,22 @@ public RxResult run(Query query)
6768
@Override
6869
public <T> Publisher<T> commit()
6970
{
70-
return close( true );
71+
return createEmptyPublisher( tx::commitAsync );
7172
}
7273

7374
@Override
7475
public <T> Publisher<T> rollback()
7576
{
76-
return close( false );
77+
return createEmptyPublisher( tx::rollbackAsync );
7778
}
7879

79-
private <T> Publisher<T> close( boolean commit )
80+
Publisher<Void> commitIfOpen()
8081
{
81-
return createEmptyPublisher( () -> {
82-
if ( commit )
83-
{
84-
return tx.commitAsync();
85-
}
86-
else
87-
{
88-
return tx.rollbackAsync();
89-
}
90-
} );
82+
return createEmptyPublisher( () -> tx.isOpen() ? tx.commitAsync() : completedWithNull() );
83+
}
84+
85+
Publisher<Void> close()
86+
{
87+
return createEmptyPublisher( tx::closeAsync );
9188
}
9289
}

driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxSessionTest.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.junit.jupiter.api.Test;
2222
import org.junit.jupiter.params.ParameterizedTest;
2323
import org.junit.jupiter.params.provider.MethodSource;
24-
import org.neo4j.driver.Query;
2524
import org.reactivestreams.Publisher;
2625
import reactor.core.publisher.Flux;
2726
import reactor.core.publisher.Mono;
@@ -35,11 +34,12 @@
3534
import java.util.stream.Stream;
3635

3736
import org.neo4j.driver.AccessMode;
37+
import org.neo4j.driver.Query;
3838
import org.neo4j.driver.TransactionConfig;
3939
import org.neo4j.driver.Value;
4040
import org.neo4j.driver.internal.InternalRecord;
41-
import org.neo4j.driver.internal.async.UnmanagedTransaction;
4241
import org.neo4j.driver.internal.async.NetworkSession;
42+
import org.neo4j.driver.internal.async.UnmanagedTransaction;
4343
import org.neo4j.driver.internal.cursor.RxResultCursor;
4444
import org.neo4j.driver.internal.cursor.RxResultCursorImpl;
4545
import org.neo4j.driver.internal.util.FixedRetryLogic;
@@ -199,6 +199,7 @@ void shouldDelegateRunTx( Function<RxSession,Publisher<String>> runTx ) throws T
199199
// Given
200200
NetworkSession session = mock( NetworkSession.class );
201201
UnmanagedTransaction tx = mock( UnmanagedTransaction.class );
202+
when( tx.isOpen() ).thenReturn( true );
202203
when( tx.commitAsync() ).thenReturn( completedWithNull() );
203204
when( tx.rollbackAsync() ).thenReturn( completedWithNull() );
204205

@@ -222,6 +223,7 @@ void shouldRetryOnError() throws Throwable
222223
int retryCount = 2;
223224
NetworkSession session = mock( NetworkSession.class );
224225
UnmanagedTransaction tx = mock( UnmanagedTransaction.class );
226+
when( tx.isOpen() ).thenReturn( true );
225227
when( tx.commitAsync() ).thenReturn( completedWithNull() );
226228
when( tx.rollbackAsync() ).thenReturn( completedWithNull() );
227229

@@ -239,7 +241,7 @@ void shouldRetryOnError() throws Throwable
239241

240242
// Then
241243
verify( session, times( retryCount + 1 ) ).beginTransactionAsync( any( AccessMode.class ), any( TransactionConfig.class ) );
242-
verify( tx, times( retryCount + 1 ) ).rollbackAsync();
244+
verify( tx, times( retryCount + 1 ) ).closeAsync();
243245
}
244246

245247
@Test
@@ -249,6 +251,7 @@ void shouldObtainResultIfRetrySucceed() throws Throwable
249251
int retryCount = 2;
250252
NetworkSession session = mock( NetworkSession.class );
251253
UnmanagedTransaction tx = mock( UnmanagedTransaction.class );
254+
when( tx.isOpen() ).thenReturn( true );
252255
when( tx.commitAsync() ).thenReturn( completedWithNull() );
253256
when( tx.rollbackAsync() ).thenReturn( completedWithNull() );
254257

@@ -273,7 +276,7 @@ void shouldObtainResultIfRetrySucceed() throws Throwable
273276

274277
// Then
275278
verify( session, times( retryCount + 1 ) ).beginTransactionAsync( any( AccessMode.class ), any( TransactionConfig.class ) );
276-
verify( tx, times( retryCount ) ).rollbackAsync();
279+
verify( tx, times( retryCount ) ).closeAsync();
277280
verify( tx ).commitAsync();
278281
}
279282

driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxTransactionTest.java

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,9 @@
1818
*/
1919
package org.neo4j.driver.internal.reactive;
2020

21-
import org.junit.Test;
21+
import org.junit.jupiter.api.Test;
2222
import org.junit.jupiter.params.ParameterizedTest;
2323
import org.junit.jupiter.params.provider.MethodSource;
24-
import org.neo4j.driver.Query;
2524
import org.reactivestreams.Publisher;
2625
import reactor.test.StepVerifier;
2726

@@ -30,6 +29,7 @@
3029
import java.util.function.Function;
3130
import java.util.stream.Stream;
3231

32+
import org.neo4j.driver.Query;
3333
import org.neo4j.driver.Value;
3434
import org.neo4j.driver.internal.InternalRecord;
3535
import org.neo4j.driver.internal.async.UnmanagedTransaction;
@@ -48,6 +48,7 @@
4848
import static org.junit.jupiter.api.Assertions.assertThrows;
4949
import static org.mockito.ArgumentMatchers.any;
5050
import static org.mockito.Mockito.mock;
51+
import static org.mockito.Mockito.never;
5152
import static org.mockito.Mockito.verify;
5253
import static org.mockito.Mockito.when;
5354
import static org.neo4j.driver.Values.parameters;
@@ -137,4 +138,45 @@ void shouldMarkTxIfFailedToRun( Function<RxTransaction, RxResult> runReturnOne )
137138
assertThat( t.getCause(), equalTo( error ) );
138139
verify( tx ).markTerminated( error );
139140
}
141+
142+
@Test
143+
void shouldCommitWhenOpen()
144+
{
145+
UnmanagedTransaction tx = mock( UnmanagedTransaction.class );
146+
when( tx.isOpen() ).thenReturn( true );
147+
when( tx.commitAsync() ).thenReturn( Futures.completedWithNull() );
148+
149+
InternalRxTransaction rxTx = new InternalRxTransaction( tx );
150+
Publisher<Void> publisher = rxTx.commitIfOpen();
151+
StepVerifier.create( publisher ).verifyComplete();
152+
153+
verify( tx ).commitAsync();
154+
}
155+
156+
@Test
157+
void shouldNotCommitWhenNotOpen()
158+
{
159+
UnmanagedTransaction tx = mock( UnmanagedTransaction.class );
160+
when( tx.isOpen() ).thenReturn( false );
161+
when( tx.commitAsync() ).thenReturn( Futures.completedWithNull() );
162+
163+
InternalRxTransaction rxTx = new InternalRxTransaction( tx );
164+
Publisher<Void> publisher = rxTx.commitIfOpen();
165+
StepVerifier.create( publisher ).verifyComplete();
166+
167+
verify( tx, never() ).commitAsync();
168+
}
169+
170+
@Test
171+
void shouldDelegateClose()
172+
{
173+
UnmanagedTransaction tx = mock( UnmanagedTransaction.class );
174+
when( tx.closeAsync() ).thenReturn( Futures.completedWithNull() );
175+
176+
InternalRxTransaction rxTx = new InternalRxTransaction( tx );
177+
Publisher<Void> publisher = rxTx.close();
178+
StepVerifier.create( publisher ).verifyComplete();
179+
180+
verify( tx ).closeAsync();
181+
}
140182
}

testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,6 @@ public class StartTest implements TestkitRequest
6666
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRun\\.test_discard_on_session_close_unfinished_result$",
6767
"Does not support partially consumed state" );
6868
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.NoRouting[^.]+\\.test_should_error_on_database_shutdown_using_tx_run$", "Session close throws error" );
69-
REACTIVE_SKIP_PATTERN_TO_REASON.put(
70-
"^.*\\.Routing[^.]+\\.test_should_fail_when_reading_from_unexpectedly_interrupting_readers_on_run_using_tx_function$",
71-
"Rollback failures following commit failure" );
72-
REACTIVE_SKIP_PATTERN_TO_REASON.put(
73-
"^.*\\.Routing[^.]+\\.test_should_fail_when_writing_to_unexpectedly_interrupting_writers_on_run_using_tx_function$",
74-
"Rollback failures following commit failure" );
7569
skipMessage = "Requires investigation";
7670
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestProtocolVersions\\.test_server_agent", skipMessage );
7771
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestProtocolVersions\\.test_server_version", skipMessage );
@@ -82,7 +76,6 @@ public class StartTest implements TestkitRequest
8276
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestOptimizations\\..*$", skipMessage );
8377
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestDirectConnectionRecvTimeout\\..*$", skipMessage );
8478
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestRoutingConnectionRecvTimeout\\..*$", skipMessage );
85-
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.Routing[^.]+\\.test_should_successfully_acquire_rt_when_router_ip_changes$", skipMessage );
8679
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestRoutingConnectionRecvTimeout\\.test_timeout$", skipMessage );
8780
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestRoutingConnectionRecvTimeout\\.test_timeout_unmanaged_tx$", skipMessage );
8881
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestDisconnects\\.test_disconnect_session_on_tx_commit$", skipMessage );

0 commit comments

Comments
 (0)