Skip to content

Commit 9379327

Browse files
committed
Release connection on reactive beginTransaction cancellation
Each transaction created by the driver requires a network connection. Unfinished transactions may result in connection leaks, meaning that connections acquired from the connection pool are not available for further use. Subscription cancellation on reactive `beginTransaction` during transaction creation could result in dangling transaction, leading to the connection leak. This update fixes this issue by ensuring that such transactions are rolled back and their connections are returned to the connection pool.
1 parent a0f8599 commit 9379327

File tree

3 files changed

+122
-19
lines changed

3 files changed

+122
-19
lines changed

driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.neo4j.driver.reactive.RxTransactionWork;
3838
import org.reactivestreams.Publisher;
3939
import reactor.core.publisher.Flux;
40+
import reactor.core.publisher.Mono;
4041

4142
public class InternalRxSession extends AbstractRxQueryRunner implements RxSession {
4243
private final NetworkSession session;
@@ -69,7 +70,8 @@ public Publisher<RxTransaction> beginTransaction(TransactionConfig config) {
6970
return txFuture;
7071
},
7172
() -> new IllegalStateException(
72-
"Unexpected condition, begin transaction call has completed successfully with transaction being null"));
73+
"Unexpected condition, begin transaction call has completed successfully with transaction being null"),
74+
(tx) -> Mono.fromDirect(tx.close()).subscribe());
7375
}
7476

7577
private Publisher<InternalRxTransaction> beginTransaction(AccessMode mode, TransactionConfig config) {
@@ -86,7 +88,8 @@ private Publisher<InternalRxTransaction> beginTransaction(AccessMode mode, Trans
8688
return txFuture;
8789
},
8890
() -> new IllegalStateException(
89-
"Unexpected condition, begin transaction call has completed successfully with transaction being null"));
91+
"Unexpected condition, begin transaction call has completed successfully with transaction being null"),
92+
(tx) -> Mono.fromDirect(tx.close()).subscribe());
9093
}
9194

9295
@Override

driver/src/main/java/org/neo4j/driver/internal/reactive/RxUtils.java

Lines changed: 73 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@
1818
*/
1919
package org.neo4j.driver.internal.reactive;
2020

21+
import static java.util.Objects.requireNonNull;
22+
2123
import java.util.Optional;
2224
import java.util.concurrent.CompletionStage;
25+
import java.util.function.Consumer;
2326
import java.util.function.Supplier;
2427
import org.neo4j.driver.internal.util.Futures;
2528
import org.reactivestreams.Publisher;
@@ -28,6 +31,7 @@
2831
public class RxUtils {
2932
/**
3033
* The publisher created by this method will either succeed without publishing anything or fail with an error.
34+
*
3135
* @param supplier supplies a {@link CompletionStage<Void>}.
3236
* @return A publisher that publishes nothing on completion or fails with an error.
3337
*/
@@ -48,23 +52,79 @@ public static <T> Publisher<T> createEmptyPublisher(Supplier<CompletionStage<Voi
4852
* @param supplier supplies a {@link CompletionStage<T>} that MUST produce a non-null result when completed successfully.
4953
* @param nullResultThrowableSupplier supplies a {@link Throwable} that is used as an error when the supplied completion stage completes successfully with
5054
* null.
55+
* @param cancellationHandler handles cancellation, may be used to release associated resources
5156
* @param <T> the type of the item to publish.
5257
* @return A publisher that succeeds exactly one item or fails with an error.
5358
*/
5459
public static <T> Publisher<T> createSingleItemPublisher(
55-
Supplier<CompletionStage<T>> supplier, Supplier<Throwable> nullResultThrowableSupplier) {
56-
return Mono.create(sink -> supplier.get().whenComplete((item, completionError) -> {
57-
if (completionError == null) {
58-
if (item != null) {
59-
sink.success(item);
60-
} else {
61-
sink.error(nullResultThrowableSupplier.get());
60+
Supplier<CompletionStage<T>> supplier,
61+
Supplier<Throwable> nullResultThrowableSupplier,
62+
Consumer<T> cancellationHandler) {
63+
requireNonNull(supplier, "supplier must not be null");
64+
requireNonNull(nullResultThrowableSupplier, "nullResultThrowableSupplier must not be null");
65+
requireNonNull(cancellationHandler, "cancellationHandler must not be null");
66+
return Mono.create(sink -> {
67+
SinkState state = new SinkState<T>();
68+
sink.onRequest(ignored -> {
69+
CompletionStage<T> stage;
70+
synchronized (state) {
71+
if (state.isCancelled()) {
72+
return;
73+
}
74+
if (state.getStage() != null) {
75+
return;
76+
}
77+
stage = supplier.get();
78+
state.setStage(stage);
6279
}
63-
} else {
64-
Throwable error = Optional.ofNullable(Futures.completionExceptionCause(completionError))
65-
.orElse(completionError);
66-
sink.error(error);
67-
}
68-
}));
80+
stage.whenComplete((item, completionError) -> {
81+
if (completionError == null) {
82+
if (item != null) {
83+
sink.success(item);
84+
} else {
85+
sink.error(nullResultThrowableSupplier.get());
86+
}
87+
} else {
88+
Throwable error = Optional.ofNullable(Futures.completionExceptionCause(completionError))
89+
.orElse(completionError);
90+
sink.error(error);
91+
}
92+
});
93+
});
94+
sink.onCancel(() -> {
95+
CompletionStage<T> stage;
96+
synchronized (state) {
97+
if (state.isCancelled()) {
98+
return;
99+
}
100+
state.setCancelled(true);
101+
stage = state.getStage();
102+
}
103+
if (stage != null) {
104+
stage.whenComplete((value, ignored) -> cancellationHandler.accept(value));
105+
}
106+
});
107+
});
108+
}
109+
110+
private static class SinkState<T> {
111+
private CompletionStage<T> stage;
112+
private boolean cancelled;
113+
114+
public CompletionStage<T> getStage() {
115+
return stage;
116+
}
117+
118+
public void setStage(CompletionStage<T> stage) {
119+
this.stage = stage;
120+
}
121+
122+
public boolean isCancelled() {
123+
return cancelled;
124+
}
125+
126+
public void setCancelled(boolean cancelled) {
127+
this.cancelled = cancelled;
128+
}
69129
}
70130
}

driver/src/test/java/org/neo4j/driver/internal/reactive/RxUtilsTest.java

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,22 @@
1818
*/
1919
package org.neo4j.driver.internal.reactive;
2020

21+
import static org.mockito.BDDMockito.then;
2122
import static org.mockito.Mockito.mock;
2223
import static org.neo4j.driver.internal.reactive.RxUtils.createEmptyPublisher;
2324
import static org.neo4j.driver.internal.reactive.RxUtils.createSingleItemPublisher;
2425
import static org.neo4j.driver.internal.util.Futures.failedFuture;
2526

2627
import java.util.concurrent.CompletableFuture;
28+
import java.util.concurrent.CompletionStage;
29+
import java.util.function.Consumer;
2730
import java.util.function.Predicate;
31+
import java.util.function.Supplier;
2832
import org.junit.jupiter.api.Test;
2933
import org.neo4j.driver.internal.util.Futures;
3034
import org.reactivestreams.Publisher;
35+
import org.reactivestreams.Subscription;
36+
import reactor.core.publisher.BaseSubscriber;
3137
import reactor.test.StepVerifier;
3238

3339
class RxUtilsTest {
@@ -47,24 +53,58 @@ void emptyPublisherShouldErrorWhenSupplierErrors() {
4753

4854
@Test
4955
void singleItemPublisherShouldCompleteWithValue() {
50-
Publisher<String> publisher =
51-
createSingleItemPublisher(() -> CompletableFuture.completedFuture("One"), () -> mock(Throwable.class));
56+
Publisher<String> publisher = createSingleItemPublisher(
57+
() -> CompletableFuture.completedFuture("One"), () -> mock(Throwable.class), (ignored) -> {});
5258
StepVerifier.create(publisher).expectNext("One").verifyComplete();
5359
}
5460

5561
@Test
5662
void singleItemPublisherShouldErrorWhenFutureCompletesWithNull() {
5763
Throwable error = mock(Throwable.class);
58-
Publisher<String> publisher = createSingleItemPublisher(Futures::completedWithNull, () -> error);
64+
Publisher<String> publisher =
65+
createSingleItemPublisher(Futures::completedWithNull, () -> error, (ignored) -> {});
5966

6067
StepVerifier.create(publisher).verifyErrorMatches(actualError -> error == actualError);
6168
}
6269

6370
@Test
6471
void singleItemPublisherShouldErrorWhenSupplierErrors() {
6572
RuntimeException error = mock(RuntimeException.class);
66-
Publisher<String> publisher = createSingleItemPublisher(() -> failedFuture(error), () -> mock(Throwable.class));
73+
Publisher<String> publisher =
74+
createSingleItemPublisher(() -> failedFuture(error), () -> mock(Throwable.class), (ignored) -> {});
6775

6876
StepVerifier.create(publisher).verifyErrorMatches(actualError -> error == actualError);
6977
}
78+
79+
@Test
80+
void singleItemPublisherShouldHandleCancellationAfterRequestProcessingBegins() {
81+
// GIVEN
82+
String value = "value";
83+
CompletableFuture<String> valueFuture = new CompletableFuture<>();
84+
CompletableFuture<Void> supplierInvokedFuture = new CompletableFuture<>();
85+
Supplier<CompletionStage<String>> valueFutureSupplier = () -> {
86+
supplierInvokedFuture.complete(null);
87+
return valueFuture;
88+
};
89+
@SuppressWarnings("unchecked")
90+
Consumer<String> cancellationHandler = mock(Consumer.class);
91+
Publisher<String> publisher =
92+
createSingleItemPublisher(valueFutureSupplier, () -> mock(Throwable.class), cancellationHandler);
93+
94+
// WHEN
95+
publisher.subscribe(new BaseSubscriber<String>() {
96+
@Override
97+
protected void hookOnSubscribe(Subscription subscription) {
98+
subscription.request(1);
99+
supplierInvokedFuture.thenAccept(ignored -> {
100+
subscription.cancel();
101+
valueFuture.complete(value);
102+
});
103+
}
104+
});
105+
106+
// THEN
107+
valueFuture.join();
108+
then(cancellationHandler).should().accept(value);
109+
}
70110
}

0 commit comments

Comments
 (0)