Skip to content

Commit f58cc7c

Browse files
committed
+tck fixes spec303 to not leak "still publishing" publisher
Since the Publisher could stay alive and keep publishing, and keep getting demand in the recursion depth detecting test it would exceed the buffer size by accumulating these onNext signals, and then cause an error during *a different tests execution* Now we do not accumulate these elements, and also set an explicit limit on how long we try to blow up the recursive stack depth. Stack depth is now also logged in as debug information which should help implementers notice how their imple behave in this scenario. The publisher is now always canceled at the end of this test, in case it did not reach the max nr of elements it is allowed to publish here, nor has it signalled completion.
1 parent 8769da9 commit f58cc7c

File tree

3 files changed

+48
-6
lines changed

3 files changed

+48
-6
lines changed

tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java

+46-3
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,15 @@ public long boundedDepthOfOnNextAndRequestRecursion() {
124124
return 1;
125125
}
126126

127+
/**
128+
* Used to break possibly infinite request/onNext loops.
129+
*
130+
* Override this method in case your implementation synchronously signals very large batches before reacting to cancellation.
131+
*/
132+
public long maxOnNextSignalsInReccursionTest() {
133+
return 1000;
134+
}
135+
127136
/**
128137
* The amount of time after which a cancelled Subscriber reference should be dropped.
129138
* See Rule 3.13 for details.
@@ -637,18 +646,33 @@ protected Long initialValue() {
637646
}
638647
};
639648

649+
final Latch runCompleted = new Latch(env);
650+
640651
final ManualSubscriber<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(env) {
652+
// counts the number of signals received, used to break out from possibly infinite request/onNext loops
653+
long signalsReceived = 0L;
654+
641655
@Override
642656
public void onNext(T element) {
657+
// NOT calling super.onNext as this test only cares about stack depths, not the actual values of elements
658+
// which also simplifies this test as we do not have to drain the test buffer, which would otherwise be in danger of overflowing
659+
660+
signalsReceived += 1;
643661
stackDepthCounter.set(stackDepthCounter.get() + 1);
644-
super.onNext(element);
662+
env.debug(String.format("%s(recursion depth: %d)::onNext(%s)", this, stackDepthCounter.get(), element));
645663

646-
final Long callsUntilNow = stackDepthCounter.get();
664+
final long callsUntilNow = stackDepthCounter.get();
647665
if (callsUntilNow > boundedDepthOfOnNextAndRequestRecursion()) {
648666
env.flop(String.format("Got %d onNext calls within thread: %s, yet expected recursive bound was %d",
649667
callsUntilNow, Thread.currentThread(), boundedDepthOfOnNextAndRequestRecursion()));
650668

651669
// stop the recursive call chain
670+
runCompleted.close();
671+
return;
672+
} else if (signalsReceived > maxOnNextSignalsInReccursionTest()) {
673+
// since max number of signals reached, and recursion depth not exceeded, we judge this as a success and
674+
// stop the recursive call chain
675+
runCompleted.close();
652676
return;
653677
}
654678

@@ -657,14 +681,33 @@ public void onNext(T element) {
657681

658682
stackDepthCounter.set(stackDepthCounter.get() - 1);
659683
}
684+
685+
@Override
686+
public void onComplete() {
687+
super.onComplete();
688+
runCompleted.close();
689+
}
690+
691+
@Override
692+
public void onError(Throwable cause) {
693+
super.onError(cause);
694+
runCompleted.close();
695+
}
660696
};
661697

662698
env.subscribe(pub, sub);
663699

664700
sub.request(1); // kick-off the `request -> onNext -> request -> onNext -> ...`
665701

666-
sub.nextElementOrEndOfStream();
702+
final String msg = String.format("Unable to validate call stack depth safety, " +
703+
"awaited at-most %s signals (`maxOnNextSignalsInReccursionTest()`) or completion",
704+
maxOnNextSignalsInReccursionTest());
705+
runCompleted.expectClose(env.defaultTimeoutMillis(), msg);
667706
env.verifyNoAsyncErrors();
707+
708+
// since the request/onNext recursive calls may keep the publisher running "forever",
709+
// we MUST cancel it manually before exiting this test case
710+
sub.cancel();
668711
}
669712
});
670713
}

tck/src/main/java/org/reactivestreams/tck/SubscriberWhiteboxVerification.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,9 @@
44
import org.reactivestreams.Subscriber;
55
import org.reactivestreams.Subscription;
66
import org.reactivestreams.tck.TestEnvironment.*;
7-
import org.reactivestreams.tck.support.Function;
87
import org.reactivestreams.tck.support.Optional;
9-
import org.reactivestreams.tck.support.TestException;
108
import org.reactivestreams.tck.support.SubscriberWhiteboxVerificationRules;
9+
import org.reactivestreams.tck.support.TestException;
1110
import org.testng.SkipException;
1211
import org.testng.annotations.AfterClass;
1312
import org.testng.annotations.BeforeClass;

tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -538,7 +538,7 @@ public BlackholeSubscriberWithSubscriptionSupport(TestEnvironment env) {
538538

539539
@Override
540540
public void onNext(T element) {
541-
env.debug(String.format("%s::onNext(%s)", this, element));
541+
env.debug(String.format("%s(blackhole)::onNext(%s)", this, element));
542542
if (!subscription.isCompleted()) {
543543
env.flop(String.format("Subscriber::onNext(%s) called before Subscriber::onSubscribe", element));
544544
}

0 commit comments

Comments
 (0)