Skip to content

Commit 469d576

Browse files
committed
+tck fixes spec303 to not leak "still publishing" publisher
Since the Publisher could stay alive and keep publishing, and keep getting demand in the recursion depth detecting test it would exceed the buffer size by accumulating these onNext signals, and then cause an error during *a different tests execution* Now we do not accumulate these elements, and also set an explicit limit on how long we try to blow up the recursive stack depth. Stack depth is now also logged in as debug information which should help implementers notice how their imple behave in this scenario. The publisher is now always canceled at the end of this test, in case it did not reach the max nr of elements it is allowed to publish here, nor has it signalled completion.
1 parent 8769da9 commit 469d576

File tree

4 files changed

+44
-34
lines changed

4 files changed

+44
-34
lines changed

examples/src/test/java/org/reactivestreams/example/unicast/AsyncSubscriberTest.java

-3
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,4 @@ public AsyncSubscriberTest() {
6060
return element;
6161
}
6262

63-
@Override public Publisher<Integer> createHelperPublisher(long elements) {
64-
return super.createHelperPublisher(elements);
65-
}
6663
}

tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java

-22
Original file line numberDiff line numberDiff line change
@@ -124,28 +124,6 @@ public boolean skipStochasticTests() {
124124
*/
125125
public abstract Processor<T, T> createIdentityProcessor(int bufferSize);
126126

127-
/**
128-
* Helper method required for creating the Publisher to which the tested Subscriber will be subscribed and tested against.
129-
* <p>
130-
* By default an <b>asynchronously signalling Publisher</b> is provided, which will use
131-
* {@link org.reactivestreams.tck.SubscriberBlackboxVerification#createElement(int)} to generate elements type
132-
* your Subscriber is able to consume.
133-
* <p>
134-
* Sometimes you may want to implement your own custom custom helper Publisher - to validate behaviour of a Subscriber
135-
* when facing a synchronous Publisher for example. If you do, it MUST emit the exact number of elements asked for
136-
* (via the {@code elements} parameter) and MUST also must treat the following numbers of elements in these specific ways:
137-
* <ul>
138-
* <li>
139-
* If {@code elements} is {@code Long.MAX_VALUE} the produced stream must be infinite.
140-
* </li>
141-
* <li>
142-
* If {@code elements} is {@code 0} the {@code Publisher} should signal {@code onComplete} immediatly.
143-
* In other words, it should represent a "completed stream".
144-
* </li>
145-
* </ul>
146-
*/
147-
public abstract Publisher<T> createHelperPublisher(long elements);
148-
149127
/**
150128
* Return a Publisher that immediately signals {@code onError} to incoming subscriptions,
151129
* or {@code null} in order to skip them.

tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java

+42-6
Original file line numberDiff line numberDiff line change
@@ -637,18 +637,33 @@ protected Long initialValue() {
637637
}
638638
};
639639

640+
final Latch runCompleted = new Latch(env);
641+
640642
final ManualSubscriber<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(env) {
643+
// counts the number of signals received, used to break out from possibly infinite request/onNext loops
644+
long signalsReceived = 0L;
645+
641646
@Override
642647
public void onNext(T element) {
648+
// NOT calling super.onNext as this test only cares about stack depths, not the actual values of elements
649+
// which also simplifies this test as we do not have to drain the test buffer, which would otherwise be in danger of overflowing
650+
651+
signalsReceived += 1;
643652
stackDepthCounter.set(stackDepthCounter.get() + 1);
644-
super.onNext(element);
653+
env.debug(String.format("%s(recursion depth: %d)::onNext(%s)", this, stackDepthCounter.get(), element));
645654

646-
final Long callsUntilNow = stackDepthCounter.get();
655+
final long callsUntilNow = stackDepthCounter.get();
647656
if (callsUntilNow > boundedDepthOfOnNextAndRequestRecursion()) {
648657
env.flop(String.format("Got %d onNext calls within thread: %s, yet expected recursive bound was %d",
649658
callsUntilNow, Thread.currentThread(), boundedDepthOfOnNextAndRequestRecursion()));
650659

651660
// stop the recursive call chain
661+
runCompleted.close();
662+
return;
663+
} else if (signalsReceived >= oneMoreThanBoundedLimit) {
664+
// since max number of signals reached, and recursion depth not exceeded, we judge this as a success and
665+
// stop the recursive call chain
666+
runCompleted.close();
652667
return;
653668
}
654669

@@ -657,14 +672,35 @@ public void onNext(T element) {
657672

658673
stackDepthCounter.set(stackDepthCounter.get() - 1);
659674
}
675+
676+
@Override
677+
public void onComplete() {
678+
super.onComplete();
679+
runCompleted.close();
680+
}
681+
682+
@Override
683+
public void onError(Throwable cause) {
684+
super.onError(cause);
685+
runCompleted.close();
686+
}
660687
};
661688

662-
env.subscribe(pub, sub);
689+
try {
690+
env.subscribe(pub, sub);
663691

664-
sub.request(1); // kick-off the `request -> onNext -> request -> onNext -> ...`
692+
sub.request(1); // kick-off the `request -> onNext -> request -> onNext -> ...`
665693

666-
sub.nextElementOrEndOfStream();
667-
env.verifyNoAsyncErrors();
694+
final String msg = String.format("Unable to validate call stack depth safety, " +
695+
"awaited at-most %s signals (`maxOnNextSignalsInRecursionTest()`) or completion",
696+
oneMoreThanBoundedLimit);
697+
runCompleted.expectClose(env.defaultTimeoutMillis(), msg);
698+
env.verifyNoAsyncErrors();
699+
} finally {
700+
// since the request/onNext recursive calls may keep the publisher running "forever",
701+
// we MUST cancel it manually before exiting this test case
702+
sub.cancel();
703+
}
668704
}
669705
});
670706
}

tck/src/main/java/org/reactivestreams/tck/WithHelperPublisher.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,8 @@ public abstract class WithHelperPublisher<T> {
3838
/**
3939
* Helper method required for creating the Publisher to which the tested Subscriber will be subscribed and tested against.
4040
* <p>
41-
* By default an <b>asynchronously signalling Publisher</b> is provided, which will use
42-
* {@link org.reactivestreams.tck.SubscriberBlackboxVerification#createElement(int)} to generate elements type
43-
* your Subscriber is able to consume.
41+
* By default an <b>asynchronously signalling Publisher</b> is provided, which will use {@link #createElement(int)}
42+
* to generate elements type your Subscriber is able to consume.
4443
* <p>
4544
* Sometimes you may want to implement your own custom custom helper Publisher - to validate behaviour of a Subscriber
4645
* when facing a synchronous Publisher for example. If you do, it MUST emit the exact number of elements asked for

0 commit comments

Comments
 (0)