Skip to content

Commit f6367fe

Browse files
committed
+tck fixes spec303 to not leak "still publishing" publisher
Since the Publisher could stay alive and keep publishing, and keep getting demand in the recursion depth detecting test it would exceed the buffer size by accumulating these onNext signals, and then cause an error during *a different tests execution* Now we do not accumulate these elements, and also set an explicit limit on how long we try to blow up the recursive stack depth. Stack depth is now also logged in as debug information which should help implementers notice how their imple behave in this scenario. The publisher is now always canceled at the end of this test, in case it did not reach the max nr of elements it is allowed to publish here, nor has it signalled completion.
1 parent 8769da9 commit f6367fe

File tree

1 file changed

+42
-6
lines changed

1 file changed

+42
-6
lines changed

tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java

+42-6
Original file line numberDiff line numberDiff line change
@@ -637,18 +637,33 @@ protected Long initialValue() {
637637
}
638638
};
639639

640+
final Latch runCompleted = new Latch(env);
641+
640642
final ManualSubscriber<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(env) {
643+
// counts the number of signals received, used to break out from possibly infinite request/onNext loops
644+
long signalsReceived = 0L;
645+
641646
@Override
642647
public void onNext(T element) {
648+
// NOT calling super.onNext as this test only cares about stack depths, not the actual values of elements
649+
// which also simplifies this test as we do not have to drain the test buffer, which would otherwise be in danger of overflowing
650+
651+
signalsReceived += 1;
643652
stackDepthCounter.set(stackDepthCounter.get() + 1);
644-
super.onNext(element);
653+
env.debug(String.format("%s(recursion depth: %d)::onNext(%s)", this, stackDepthCounter.get(), element));
645654

646-
final Long callsUntilNow = stackDepthCounter.get();
655+
final long callsUntilNow = stackDepthCounter.get();
647656
if (callsUntilNow > boundedDepthOfOnNextAndRequestRecursion()) {
648657
env.flop(String.format("Got %d onNext calls within thread: %s, yet expected recursive bound was %d",
649658
callsUntilNow, Thread.currentThread(), boundedDepthOfOnNextAndRequestRecursion()));
650659

651660
// stop the recursive call chain
661+
runCompleted.close();
662+
return;
663+
} else if (signalsReceived >= oneMoreThanBoundedLimit) {
664+
// since max number of signals reached, and recursion depth not exceeded, we judge this as a success and
665+
// stop the recursive call chain
666+
runCompleted.close();
652667
return;
653668
}
654669

@@ -657,14 +672,35 @@ public void onNext(T element) {
657672

658673
stackDepthCounter.set(stackDepthCounter.get() - 1);
659674
}
675+
676+
@Override
677+
public void onComplete() {
678+
super.onComplete();
679+
runCompleted.close();
680+
}
681+
682+
@Override
683+
public void onError(Throwable cause) {
684+
super.onError(cause);
685+
runCompleted.close();
686+
}
660687
};
661688

662-
env.subscribe(pub, sub);
689+
try {
690+
env.subscribe(pub, sub);
663691

664-
sub.request(1); // kick-off the `request -> onNext -> request -> onNext -> ...`
692+
sub.request(1); // kick-off the `request -> onNext -> request -> onNext -> ...`
665693

666-
sub.nextElementOrEndOfStream();
667-
env.verifyNoAsyncErrors();
694+
final String msg = String.format("Unable to validate call stack depth safety, " +
695+
"awaited at-most %s signals (`maxOnNextSignalsInRecursionTest()`) or completion",
696+
oneMoreThanBoundedLimit);
697+
runCompleted.expectClose(env.defaultTimeoutMillis(), msg);
698+
env.verifyNoAsyncErrors();
699+
} finally {
700+
// since the request/onNext recursive calls may keep the publisher running "forever",
701+
// we MUST cancel it manually before exiting this test case
702+
sub.cancel();
703+
}
668704
}
669705
});
670706
}

0 commit comments

Comments
 (0)