Skip to content

Commit 33fab3d

Browse files
committed
Coupled cancel from onComplete or onError workaround
* microprofile/microprofile-reactive-streams-operators#131 Signed-off-by: Daniel Kec <[email protected]>
1 parent e3ead54 commit 33fab3d

File tree

4 files changed

+24
-174
lines changed

4 files changed

+24
-174
lines changed

common/reactive/src/main/java/io/helidon/common/reactive/MultiCoupledProcessor.java

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717

1818
package io.helidon.common.reactive;
1919

20+
import java.util.LinkedList;
2021
import java.util.Objects;
2122
import java.util.Optional;
23+
import java.util.Queue;
2224
import java.util.concurrent.Flow;
2325
import java.util.concurrent.atomic.AtomicBoolean;
2426

@@ -50,6 +52,7 @@ public class MultiCoupledProcessor<T, R> implements Flow.Processor<T, R>, Multi<
5052
private Flow.Subscription inletSubscription;
5153
private Flow.Subscription passedInPublisherSubscription;
5254
private AtomicBoolean cancelled = new AtomicBoolean(false);
55+
private Queue<Runnable> forbiddenCalls = new LinkedList<>();
5356

5457
private MultiCoupledProcessor(Flow.Subscriber<T> passedInSubscriber, Flow.Publisher<R> passedInPublisher) {
5558
this.passedInSubscriber = SequentialSubscriber.create(passedInSubscriber);
@@ -78,7 +81,7 @@ public void subscribe(Flow.Subscriber<? super R> outletSubscriber) {
7881

7982
@Override
8083
public void onSubscribe(Flow.Subscription passedInPublisherSubscription) {
81-
//Passed in publisher called onSubscribed
84+
//Passed in publisher called onSubscribe
8285
Objects.requireNonNull(passedInPublisherSubscription);
8386
// https://github.com/reactive-streams/reactive-streams-jvm#2.5
8487
if (Objects.nonNull(MultiCoupledProcessor.this.passedInPublisherSubscription) || cancelled.get()) {
@@ -91,6 +94,7 @@ public void onSubscribe(Flow.Subscription passedInPublisherSubscription) {
9194
@Override
9295
@SuppressWarnings("unchecked")
9396
public void onNext(R t) {
97+
tryForbiddenCalls();
9498
//Passed in publisher sent onNext
9599
Objects.requireNonNull(t);
96100
outletSubscriber.onNext(t);
@@ -105,7 +109,7 @@ public void onError(Throwable t) {
105109
passedInSubscriber.onError(t);
106110
inletSubscriber.onError(t);
107111
//203 https://github.com/eclipse/microprofile-reactive-streams-operators/issues/131
108-
Optional.ofNullable(inletSubscription).ifPresent(Flow.Subscription::cancel);
112+
forbiddenCalls.offer(() -> Optional.ofNullable(inletSubscription).ifPresent(Flow.Subscription::cancel));
109113
}
110114

111115
@Override
@@ -115,7 +119,7 @@ public void onComplete() {
115119
outletSubscriber.onComplete();
116120
passedInSubscriber.onComplete();
117121
//203 https://github.com/eclipse/microprofile-reactive-streams-operators/issues/131
118-
Optional.ofNullable(inletSubscription).ifPresent(Flow.Subscription::cancel);
122+
forbiddenCalls.offer(() -> Optional.ofNullable(inletSubscription).ifPresent(Flow.Subscription::cancel));
119123
}
120124
});
121125

@@ -124,11 +128,8 @@ public void onComplete() {
124128
@Override
125129
public void request(long n) {
126130
// Request from outlet subscriber
127-
StreamValidationUtils.checkRecursionDepth(
128-
"MultiCoupledProcessor1",
129-
8,
130-
() -> passedInPublisherSubscription.request(n),
131-
(actDepth, t) -> outletSubscriber.onError(t));
131+
passedInPublisherSubscription.request(n);
132+
tryForbiddenCalls();
132133
}
133134

134135
@Override
@@ -137,6 +138,7 @@ public void cancel() {
137138
passedInSubscriber.onComplete();
138139
Optional.ofNullable(inletSubscription).ifPresent(Flow.Subscription::cancel);
139140
passedInPublisherSubscription.cancel();
141+
tryForbiddenCalls();
140142
}
141143
});
142144
}
@@ -153,11 +155,8 @@ public void onSubscribe(Flow.Subscription inletSubscription) {
153155
passedInSubscriber.onSubscribe(new Flow.Subscription() {
154156
@Override
155157
public void request(long n) {
156-
StreamValidationUtils.checkRecursionDepth(
157-
"MultiCoupledProcessor2",
158-
8,
159-
() -> inletSubscription.request(n),
160-
(actDepth, t) -> passedInSubscriber.onError(t));
158+
inletSubscription.request(n);
159+
tryForbiddenCalls();
161160
}
162161

163162
@Override
@@ -169,6 +168,7 @@ public void cancel() {
169168
inletSubscription.cancel();
170169
outletSubscriber.onComplete();
171170
passedInPublisherSubscription.cancel();
171+
tryForbiddenCalls();
172172
}
173173
});
174174
}
@@ -197,4 +197,14 @@ public void onComplete() {
197197
passedInPublisherSubscription.cancel();
198198
}
199199

200+
private void tryForbiddenCalls() {
201+
while (true) {
202+
Runnable polledCall = forbiddenCalls.poll();
203+
if (Objects.isNull(polledCall)) {
204+
return;
205+
}
206+
polledCall.run();
207+
}
208+
}
209+
200210
}

common/reactive/src/main/java/io/helidon/common/reactive/StreamValidationUtils.java

Lines changed: 0 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -17,70 +17,18 @@
1717

1818
package io.helidon.common.reactive;
1919

20-
import java.util.HashMap;
21-
import java.util.Map;
2220
import java.util.Objects;
23-
import java.util.Optional;
24-
import java.util.concurrent.atomic.AtomicLong;
25-
import java.util.function.BiConsumer;
2621
import java.util.function.Consumer;
2722

2823
/**
2924
* Helper methods for stream validation.
3025
*/
3126
public class StreamValidationUtils {
3227

33-
private static ThreadLocal<Map<String, AtomicLong>> recursionDepthThreadLocal = new ThreadLocal<>();
34-
3528

3629
private StreamValidationUtils() {
3730
}
3831

39-
/**
40-
* Validation of Reactive Streams Specification for JVM rule 3.3.
41-
* <br>
42-
* {@code Subscription.request} MUST place an upper bound on possible synchronous
43-
* recursion between {@code Publisher} and {@code Subscriber}.
44-
*
45-
* @param key key to differentiate multiple checks on one thread
46-
* @param maxDepth maximal expected recursion depth
47-
* @param guardedBlock code block to check recursion for
48-
* @param onExceeded called if recursion is deeper than maxDepth,
49-
* provided with actual depth and spec compliant exception.
50-
* @param <T> payload type of the subscriber
51-
* @return true if valid
52-
* @see <a href="https://github.com/reactive-streams/reactive-streams-jvm#3.3">reactive-streams/reactive-streams-jvm#3.3</a>
53-
*/
54-
public static <T> boolean checkRecursionDepth(String key, int maxDepth, Runnable guardedBlock,
55-
BiConsumer<Long, Throwable> onExceeded) {
56-
Map<String, AtomicLong> counterMap = recursionDepthThreadLocal.get();
57-
if (Objects.isNull(counterMap)) {
58-
counterMap = new HashMap<>();
59-
counterMap.put(key, new AtomicLong(0));
60-
recursionDepthThreadLocal.set(counterMap);
61-
}
62-
63-
counterMap.putIfAbsent(key, new AtomicLong(0));
64-
AtomicLong recursionDepthCounter = counterMap.get(key);
65-
try {
66-
if (recursionDepthCounter.incrementAndGet() > maxDepth) {
67-
long exceededRecursionDepth = recursionDepthCounter.get();
68-
Optional.of(onExceeded)
69-
.ifPresent(onExc -> onExc
70-
.accept(exceededRecursionDepth, new IllegalCallerException(String
71-
.format("Recursion depth exceeded, max depth expected %d but actual is %d, rule 3.3",
72-
maxDepth, exceededRecursionDepth))));
73-
return false;
74-
}
75-
76-
77-
guardedBlock.run();
78-
return true;
79-
} finally {
80-
recursionDepthCounter.decrementAndGet();
81-
}
82-
}
83-
8432
/**
8533
* Validation of Reactive Streams Specification for JVM rule 3.9.
8634
* <br>

common/reactive/src/test/java/io/helidon/common/reactive/RecursionDepthTest.java

Lines changed: 0 additions & 100 deletions
This file was deleted.

microprofile/tests/tck/tck-reactive-operators/src/test/java/io/helidon/microprofile/reactive/HelidonReactiveStreamsTckTest.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,16 @@
1818
package io.helidon.microprofile.reactive;
1919

2020
import org.eclipse.microprofile.reactive.streams.operators.tck.ReactiveStreamsTck;
21-
import org.eclipse.microprofile.reactive.streams.operators.tck.spi.CoupledStageVerification;
2221
import org.reactivestreams.tck.TestEnvironment;
2322

2423
public class HelidonReactiveStreamsTckTest extends ReactiveStreamsTck<HelidonReactiveStreamEngine> {
2524

2625
public HelidonReactiveStreamsTckTest() {
27-
super(new TestEnvironment(200, 200, false));
26+
super(new TestEnvironment(200));
2827
}
2928

3029
@Override
3130
protected HelidonReactiveStreamEngine createEngine() {
3231
return new HelidonReactiveStreamEngine();
3332
}
34-
35-
@Override
36-
protected boolean isEnabled(Object test) {
37-
// Remove when TCK test issues are solved
38-
return // https://github.com/eclipse/microprofile-reactive-streams-operators/issues/131
39-
!(test instanceof CoupledStageVerification.ProcessorVerification);
40-
}
4133
}

0 commit comments

Comments
 (0)