Skip to content

Commit cda0b17

Browse files
authored
Release connection on reactive beginTransaction cancellation (#1341)
Each transaction created by the driver requires a network connection. Unfinished transactions may result in connection leaks, meaning that connections acquired from the connection pool are not available for further use. Subscription cancellation on reactive `beginTransaction` during transaction creation could result in dangling transaction, leading to the connection leak. This update fixes this issue by ensuring that such transactions are rolled back and their connections are returned to the connection pool.
1 parent 2ae2836 commit cda0b17

File tree

3 files changed

+123
-20
lines changed

3 files changed

+123
-20
lines changed

driver/src/main/java/org/neo4j/driver/internal/reactive/AbstractReactiveSession.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.neo4j.driver.internal.util.Futures;
3434
import org.reactivestreams.Publisher;
3535
import reactor.core.publisher.Flux;
36+
import reactor.core.publisher.Mono;
3637

3738
public abstract class AbstractReactiveSession<S> {
3839
protected final NetworkSession session;
@@ -67,10 +68,11 @@ protected Publisher<S> doBeginTransaction(TransactionConfig config, String txTyp
6768
return txFuture;
6869
},
6970
() -> new IllegalStateException(
70-
"Unexpected condition, begin transaction call has completed successfully with transaction being null"));
71+
"Unexpected condition, begin transaction call has completed successfully with transaction being null"),
72+
tx -> Mono.fromDirect(closeTransaction(tx, false)).subscribe());
7173
}
7274

73-
Publisher<S> beginTransaction(AccessMode mode, TransactionConfig config) {
75+
private Publisher<S> beginTransaction(AccessMode mode, TransactionConfig config) {
7476
return createSingleItemPublisher(
7577
() -> {
7678
CompletableFuture<S> txFuture = new CompletableFuture<>();
@@ -84,7 +86,8 @@ Publisher<S> beginTransaction(AccessMode mode, TransactionConfig config) {
8486
return txFuture;
8587
},
8688
() -> new IllegalStateException(
87-
"Unexpected condition, begin transaction call has completed successfully with transaction being null"));
89+
"Unexpected condition, begin transaction call has completed successfully with transaction being null"),
90+
tx -> Mono.fromDirect(closeTransaction(tx, false)).subscribe());
8891
}
8992

9093
protected <T> Publisher<T> runTransaction(

driver/src/main/java/org/neo4j/driver/internal/reactive/RxUtils.java

Lines changed: 73 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@
1818
*/
1919
package org.neo4j.driver.internal.reactive;
2020

21+
import static java.util.Objects.requireNonNull;
22+
2123
import java.util.Optional;
2224
import java.util.concurrent.CompletionStage;
25+
import java.util.function.Consumer;
2326
import java.util.function.Supplier;
2427
import org.neo4j.driver.internal.util.Futures;
2528
import org.reactivestreams.Publisher;
@@ -28,6 +31,7 @@
2831
public class RxUtils {
2932
/**
3033
* The publisher created by this method will either succeed without publishing anything or fail with an error.
34+
*
3135
* @param supplier supplies a {@link CompletionStage<Void>}.
3236
* @return A publisher that publishes nothing on completion or fails with an error.
3337
*/
@@ -48,23 +52,79 @@ public static <T> Publisher<T> createEmptyPublisher(Supplier<CompletionStage<Voi
4852
* @param supplier supplies a {@link CompletionStage<T>} that MUST produce a non-null result when completed successfully.
4953
* @param nullResultThrowableSupplier supplies a {@link Throwable} that is used as an error when the supplied completion stage completes successfully with
5054
* null.
55+
* @param cancellationHandler handles cancellation, may be used to release associated resources
5156
* @param <T> the type of the item to publish.
5257
* @return A publisher that succeeds exactly one item or fails with an error.
5358
*/
5459
public static <T> Publisher<T> createSingleItemPublisher(
55-
Supplier<CompletionStage<T>> supplier, Supplier<Throwable> nullResultThrowableSupplier) {
56-
return Mono.create(sink -> supplier.get().whenComplete((item, completionError) -> {
57-
if (completionError == null) {
58-
if (item != null) {
59-
sink.success(item);
60-
} else {
61-
sink.error(nullResultThrowableSupplier.get());
60+
Supplier<CompletionStage<T>> supplier,
61+
Supplier<Throwable> nullResultThrowableSupplier,
62+
Consumer<T> cancellationHandler) {
63+
requireNonNull(supplier, "supplier must not be null");
64+
requireNonNull(nullResultThrowableSupplier, "nullResultThrowableSupplier must not be null");
65+
requireNonNull(cancellationHandler, "cancellationHandler must not be null");
66+
return Mono.create(sink -> {
67+
var state = new SinkState<T>();
68+
sink.onRequest(ignored -> {
69+
CompletionStage<T> stage;
70+
synchronized (state) {
71+
if (state.isCancelled()) {
72+
return;
73+
}
74+
if (state.getStage() != null) {
75+
return;
76+
}
77+
stage = supplier.get();
78+
state.setStage(stage);
6279
}
63-
} else {
64-
Throwable error = Optional.ofNullable(Futures.completionExceptionCause(completionError))
65-
.orElse(completionError);
66-
sink.error(error);
67-
}
68-
}));
80+
stage.whenComplete((item, completionError) -> {
81+
if (completionError == null) {
82+
if (item != null) {
83+
sink.success(item);
84+
} else {
85+
sink.error(nullResultThrowableSupplier.get());
86+
}
87+
} else {
88+
Throwable error = Optional.ofNullable(Futures.completionExceptionCause(completionError))
89+
.orElse(completionError);
90+
sink.error(error);
91+
}
92+
});
93+
});
94+
sink.onCancel(() -> {
95+
CompletionStage<T> stage;
96+
synchronized (state) {
97+
if (state.isCancelled()) {
98+
return;
99+
}
100+
state.setCancelled(true);
101+
stage = state.getStage();
102+
}
103+
if (stage != null) {
104+
stage.whenComplete((value, ignored) -> cancellationHandler.accept(value));
105+
}
106+
});
107+
});
108+
}
109+
110+
private static class SinkState<T> {
111+
private CompletionStage<T> stage;
112+
private boolean cancelled;
113+
114+
public CompletionStage<T> getStage() {
115+
return stage;
116+
}
117+
118+
public void setStage(CompletionStage<T> stage) {
119+
this.stage = stage;
120+
}
121+
122+
public boolean isCancelled() {
123+
return cancelled;
124+
}
125+
126+
public void setCancelled(boolean cancelled) {
127+
this.cancelled = cancelled;
128+
}
69129
}
70130
}

driver/src/test/java/org/neo4j/driver/internal/reactive/RxUtilsTest.java

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,22 @@
1818
*/
1919
package org.neo4j.driver.internal.reactive;
2020

21+
import static org.mockito.BDDMockito.then;
2122
import static org.mockito.Mockito.mock;
2223
import static org.neo4j.driver.internal.reactive.RxUtils.createEmptyPublisher;
2324
import static org.neo4j.driver.internal.reactive.RxUtils.createSingleItemPublisher;
2425
import static org.neo4j.driver.internal.util.Futures.failedFuture;
2526

2627
import java.util.concurrent.CompletableFuture;
28+
import java.util.concurrent.CompletionStage;
29+
import java.util.function.Consumer;
2730
import java.util.function.Predicate;
31+
import java.util.function.Supplier;
2832
import org.junit.jupiter.api.Test;
2933
import org.neo4j.driver.internal.util.Futures;
3034
import org.reactivestreams.Publisher;
35+
import org.reactivestreams.Subscription;
36+
import reactor.core.publisher.BaseSubscriber;
3137
import reactor.test.StepVerifier;
3238

3339
class RxUtilsTest {
@@ -47,24 +53,58 @@ void emptyPublisherShouldErrorWhenSupplierErrors() {
4753

4854
@Test
4955
void singleItemPublisherShouldCompleteWithValue() {
50-
Publisher<String> publisher =
51-
createSingleItemPublisher(() -> CompletableFuture.completedFuture("One"), () -> mock(Throwable.class));
56+
Publisher<String> publisher = createSingleItemPublisher(
57+
() -> CompletableFuture.completedFuture("One"), () -> mock(Throwable.class), (ignored) -> {});
5258
StepVerifier.create(publisher).expectNext("One").verifyComplete();
5359
}
5460

5561
@Test
5662
void singleItemPublisherShouldErrorWhenFutureCompletesWithNull() {
5763
Throwable error = mock(Throwable.class);
58-
Publisher<String> publisher = createSingleItemPublisher(Futures::completedWithNull, () -> error);
64+
Publisher<String> publisher =
65+
createSingleItemPublisher(Futures::completedWithNull, () -> error, (ignored) -> {});
5966

6067
StepVerifier.create(publisher).verifyErrorMatches(actualError -> error == actualError);
6168
}
6269

6370
@Test
6471
void singleItemPublisherShouldErrorWhenSupplierErrors() {
6572
RuntimeException error = mock(RuntimeException.class);
66-
Publisher<String> publisher = createSingleItemPublisher(() -> failedFuture(error), () -> mock(Throwable.class));
73+
Publisher<String> publisher =
74+
createSingleItemPublisher(() -> failedFuture(error), () -> mock(Throwable.class), (ignored) -> {});
6775

6876
StepVerifier.create(publisher).verifyErrorMatches(actualError -> error == actualError);
6977
}
78+
79+
@Test
80+
void singleItemPublisherShouldHandleCancellationAfterRequestProcessingBegins() {
81+
// GIVEN
82+
var value = "value";
83+
var valueFuture = new CompletableFuture<String>();
84+
var supplierInvokedFuture = new CompletableFuture<Void>();
85+
Supplier<CompletionStage<String>> valueFutureSupplier = () -> {
86+
supplierInvokedFuture.complete(null);
87+
return valueFuture;
88+
};
89+
@SuppressWarnings("unchecked")
90+
Consumer<String> cancellationHandler = mock(Consumer.class);
91+
var publisher =
92+
createSingleItemPublisher(valueFutureSupplier, () -> mock(Throwable.class), cancellationHandler);
93+
94+
// WHEN
95+
publisher.subscribe(new BaseSubscriber<>() {
96+
@Override
97+
protected void hookOnSubscribe(Subscription subscription) {
98+
subscription.request(1);
99+
supplierInvokedFuture.thenAccept(ignored -> {
100+
subscription.cancel();
101+
valueFuture.complete(value);
102+
});
103+
}
104+
});
105+
106+
// THEN
107+
valueFuture.join();
108+
then(cancellationHandler).should().accept(value);
109+
}
70110
}

0 commit comments

Comments
 (0)