Skip to content

Commit 21a6c9b

Browse files
committed
Fix reactive transaction function resource cleanup logic
This update fixes an issue when transaction function could fail if transaction has been explicitly committed or an explicit commit has failed.
1 parent 55829c2 commit 21a6c9b

File tree

5 files changed

+65
-31
lines changed

5 files changed

+65
-31
lines changed

driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,12 +81,12 @@ public Publisher<RxTransaction> beginTransaction( TransactionConfig config )
8181
}, () -> new IllegalStateException( "Unexpected condition, begin transaction call has completed successfully with transaction being null" ) );
8282
}
8383

84-
private Publisher<RxTransaction> beginTransaction( AccessMode mode, TransactionConfig config )
84+
private Publisher<InternalRxTransaction> beginTransaction( AccessMode mode, TransactionConfig config )
8585
{
8686
return createSingleItemPublisher(
8787
() ->
8888
{
89-
CompletableFuture<RxTransaction> txFuture = new CompletableFuture<>();
89+
CompletableFuture<InternalRxTransaction> txFuture = new CompletableFuture<>();
9090
session.beginTransactionAsync( mode, config ).whenComplete(
9191
( tx, completionError ) ->
9292
{
@@ -130,7 +130,7 @@ public <T> Publisher<T> writeTransaction( RxTransactionWork<? extends Publisher<
130130
private <T> Publisher<T> runTransaction( AccessMode mode, RxTransactionWork<? extends Publisher<T>> work, TransactionConfig config )
131131
{
132132
Flux<T> repeatableWork = Flux.usingWhen( beginTransaction( mode, config ), work::execute,
133-
RxTransaction::commit, ( tx, error ) -> tx.rollback(), null );
133+
InternalRxTransaction::commitIfOpen, ( tx, error ) -> tx.close(), null );
134134
return session.retryLogic().retryRx( repeatableWork );
135135
}
136136

driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxTransaction.java

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,19 @@
1818
*/
1919
package org.neo4j.driver.internal.reactive;
2020

21-
import org.neo4j.driver.Query;
2221
import org.reactivestreams.Publisher;
2322

2423
import java.util.concurrent.CompletableFuture;
2524

25+
import org.neo4j.driver.Query;
2626
import org.neo4j.driver.internal.async.UnmanagedTransaction;
2727
import org.neo4j.driver.internal.cursor.RxResultCursor;
2828
import org.neo4j.driver.internal.util.Futures;
2929
import org.neo4j.driver.reactive.RxResult;
3030
import org.neo4j.driver.reactive.RxTransaction;
3131

3232
import static org.neo4j.driver.internal.reactive.RxUtils.createEmptyPublisher;
33+
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
3334

3435
public class InternalRxTransaction extends AbstractRxQueryRunner implements RxTransaction
3536
{
@@ -67,26 +68,22 @@ public RxResult run(Query query)
6768
@Override
6869
public <T> Publisher<T> commit()
6970
{
70-
return close( true );
71+
return createEmptyPublisher( tx::commitAsync );
7172
}
7273

7374
@Override
7475
public <T> Publisher<T> rollback()
7576
{
76-
return close( false );
77+
return createEmptyPublisher( tx::rollbackAsync );
7778
}
7879

79-
private <T> Publisher<T> close( boolean commit )
80+
Publisher<Void> commitIfOpen()
8081
{
81-
return createEmptyPublisher( () -> {
82-
if ( commit )
83-
{
84-
return tx.commitAsync();
85-
}
86-
else
87-
{
88-
return tx.rollbackAsync();
89-
}
90-
} );
82+
return createEmptyPublisher( () -> tx.isOpen() ? tx.commitAsync() : completedWithNull() );
83+
}
84+
85+
Publisher<Void> close()
86+
{
87+
return createEmptyPublisher( tx::closeAsync );
9188
}
9289
}

driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxSessionTest.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.junit.jupiter.api.Test;
2222
import org.junit.jupiter.params.ParameterizedTest;
2323
import org.junit.jupiter.params.provider.MethodSource;
24-
import org.neo4j.driver.Query;
2524
import org.reactivestreams.Publisher;
2625
import reactor.core.publisher.Flux;
2726
import reactor.core.publisher.Mono;
@@ -35,11 +34,12 @@
3534
import java.util.stream.Stream;
3635

3736
import org.neo4j.driver.AccessMode;
37+
import org.neo4j.driver.Query;
3838
import org.neo4j.driver.TransactionConfig;
3939
import org.neo4j.driver.Value;
4040
import org.neo4j.driver.internal.InternalRecord;
41-
import org.neo4j.driver.internal.async.UnmanagedTransaction;
4241
import org.neo4j.driver.internal.async.NetworkSession;
42+
import org.neo4j.driver.internal.async.UnmanagedTransaction;
4343
import org.neo4j.driver.internal.cursor.RxResultCursor;
4444
import org.neo4j.driver.internal.cursor.RxResultCursorImpl;
4545
import org.neo4j.driver.internal.util.FixedRetryLogic;
@@ -199,6 +199,7 @@ void shouldDelegateRunTx( Function<RxSession,Publisher<String>> runTx ) throws T
199199
// Given
200200
NetworkSession session = mock( NetworkSession.class );
201201
UnmanagedTransaction tx = mock( UnmanagedTransaction.class );
202+
when( tx.isOpen() ).thenReturn( true );
202203
when( tx.commitAsync() ).thenReturn( completedWithNull() );
203204
when( tx.rollbackAsync() ).thenReturn( completedWithNull() );
204205

@@ -222,6 +223,7 @@ void shouldRetryOnError() throws Throwable
222223
int retryCount = 2;
223224
NetworkSession session = mock( NetworkSession.class );
224225
UnmanagedTransaction tx = mock( UnmanagedTransaction.class );
226+
when( tx.isOpen() ).thenReturn( true );
225227
when( tx.commitAsync() ).thenReturn( completedWithNull() );
226228
when( tx.rollbackAsync() ).thenReturn( completedWithNull() );
227229

@@ -239,7 +241,7 @@ void shouldRetryOnError() throws Throwable
239241

240242
// Then
241243
verify( session, times( retryCount + 1 ) ).beginTransactionAsync( any( AccessMode.class ), any( TransactionConfig.class ) );
242-
verify( tx, times( retryCount + 1 ) ).rollbackAsync();
244+
verify( tx, times( retryCount + 1 ) ).closeAsync();
243245
}
244246

245247
@Test
@@ -249,6 +251,7 @@ void shouldObtainResultIfRetrySucceed() throws Throwable
249251
int retryCount = 2;
250252
NetworkSession session = mock( NetworkSession.class );
251253
UnmanagedTransaction tx = mock( UnmanagedTransaction.class );
254+
when( tx.isOpen() ).thenReturn( true );
252255
when( tx.commitAsync() ).thenReturn( completedWithNull() );
253256
when( tx.rollbackAsync() ).thenReturn( completedWithNull() );
254257

@@ -273,7 +276,7 @@ void shouldObtainResultIfRetrySucceed() throws Throwable
273276

274277
// Then
275278
verify( session, times( retryCount + 1 ) ).beginTransactionAsync( any( AccessMode.class ), any( TransactionConfig.class ) );
276-
verify( tx, times( retryCount ) ).rollbackAsync();
279+
verify( tx, times( retryCount ) ).closeAsync();
277280
verify( tx ).commitAsync();
278281
}
279282

driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxTransactionTest.java

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,9 @@
1818
*/
1919
package org.neo4j.driver.internal.reactive;
2020

21-
import org.junit.Test;
21+
import org.junit.jupiter.api.Test;
2222
import org.junit.jupiter.params.ParameterizedTest;
2323
import org.junit.jupiter.params.provider.MethodSource;
24-
import org.neo4j.driver.Query;
2524
import org.reactivestreams.Publisher;
2625
import reactor.test.StepVerifier;
2726

@@ -30,6 +29,7 @@
3029
import java.util.function.Function;
3130
import java.util.stream.Stream;
3231

32+
import org.neo4j.driver.Query;
3333
import org.neo4j.driver.Value;
3434
import org.neo4j.driver.internal.InternalRecord;
3535
import org.neo4j.driver.internal.async.UnmanagedTransaction;
@@ -48,6 +48,7 @@
4848
import static org.junit.jupiter.api.Assertions.assertThrows;
4949
import static org.mockito.ArgumentMatchers.any;
5050
import static org.mockito.Mockito.mock;
51+
import static org.mockito.Mockito.never;
5152
import static org.mockito.Mockito.verify;
5253
import static org.mockito.Mockito.when;
5354
import static org.neo4j.driver.Values.parameters;
@@ -137,4 +138,45 @@ void shouldMarkTxIfFailedToRun( Function<RxTransaction, RxResult> runReturnOne )
137138
assertThat( t.getCause(), equalTo( error ) );
138139
verify( tx ).markTerminated( error );
139140
}
141+
142+
@Test
143+
void shouldCommitWhenOpen()
144+
{
145+
UnmanagedTransaction tx = mock( UnmanagedTransaction.class );
146+
when( tx.isOpen() ).thenReturn( true );
147+
when( tx.commitAsync() ).thenReturn( Futures.completedWithNull() );
148+
149+
InternalRxTransaction rxTx = new InternalRxTransaction( tx );
150+
Publisher<Void> publisher = rxTx.commitIfOpen();
151+
StepVerifier.create( publisher ).verifyComplete();
152+
153+
verify( tx ).commitAsync();
154+
}
155+
156+
@Test
157+
void shouldNotCommitWhenNotOpen()
158+
{
159+
UnmanagedTransaction tx = mock( UnmanagedTransaction.class );
160+
when( tx.isOpen() ).thenReturn( false );
161+
when( tx.commitAsync() ).thenReturn( Futures.completedWithNull() );
162+
163+
InternalRxTransaction rxTx = new InternalRxTransaction( tx );
164+
Publisher<Void> publisher = rxTx.commitIfOpen();
165+
StepVerifier.create( publisher ).verifyComplete();
166+
167+
verify( tx, never() ).commitAsync();
168+
}
169+
170+
@Test
171+
void shouldDelegateClose()
172+
{
173+
UnmanagedTransaction tx = mock( UnmanagedTransaction.class );
174+
when( tx.closeAsync() ).thenReturn( Futures.completedWithNull() );
175+
176+
InternalRxTransaction rxTx = new InternalRxTransaction( tx );
177+
Publisher<Void> publisher = rxTx.close();
178+
StepVerifier.create( publisher ).verifyComplete();
179+
180+
verify( tx ).closeAsync();
181+
}
140182
}

testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -66,13 +66,6 @@ public class StartTest implements TestkitRequest
6666
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRun\\.test_discard_on_session_close_unfinished_result$",
6767
"Does not support partially consumed state" );
6868
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.NoRouting[^.]+\\.test_should_error_on_database_shutdown_using_tx_run$", "Session close throws error" );
69-
REACTIVE_SKIP_PATTERN_TO_REASON.put(
70-
"^.*\\.Routing[^.]+\\.test_should_fail_when_reading_from_unexpectedly_interrupting_readers_on_run_using_tx_function$",
71-
"Rollback failures following commit failure" );
72-
REACTIVE_SKIP_PATTERN_TO_REASON.put(
73-
"^.*\\.Routing[^.]+\\.test_should_fail_when_writing_to_unexpectedly_interrupting_writers_on_run_using_tx_function$",
74-
"Rollback failures following commit failure" );
75-
skipMessage = "Requires investigation";
7669
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestProtocolVersions\\.test_server_agent", skipMessage );
7770
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestProtocolVersions\\.test_server_version", skipMessage );
7871
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestAuthorizationV4x1\\..*$", skipMessage );
@@ -81,7 +74,6 @@ public class StartTest implements TestkitRequest
8174
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestOptimizations\\..*$", skipMessage );
8275
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestDirectConnectionRecvTimeout\\..*$", skipMessage );
8376
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestRoutingConnectionRecvTimeout\\..*$", skipMessage );
84-
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.Routing[^.]+\\.test_should_successfully_acquire_rt_when_router_ip_changes$", skipMessage );
8577
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestRoutingConnectionRecvTimeout\\.test_timeout$", skipMessage );
8678
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestRoutingConnectionRecvTimeout\\.test_timeout_unmanaged_tx$", skipMessage );
8779
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestDisconnects\\.test_disconnect_session_on_tx_commit$", skipMessage );

0 commit comments

Comments
 (0)