Skip to content

Commit 4af56c3

Browse files
committed
+tck fixes spec303 to not leak "still publishing" publisher
Since the Publisher could stay alive and keep publishing, and keep getting demand in the recursion depth detecting test it would exceed the buffer size by accumulating these onNext signals, and then cause an error during *a different tests execution* Now we do not accumulate these elements, and also set an explicit limit on how long we try to blow up the recursive stack depth. Stack depth is now also logged in as debug information which should help implementers notice how their imple behave in this scenario. The publisher is now always canceled at the end of this test, in case it did not reach the max nr of elements it is allowed to publish here, nor has it signalled completion.
1 parent 8769da9 commit 4af56c3

File tree

2 files changed

+52
-8
lines changed

2 files changed

+52
-8
lines changed

tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java

+51-6
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,15 @@ public long boundedDepthOfOnNextAndRequestRecursion() {
124124
return 1;
125125
}
126126

127+
/**
128+
* Used to break possibly infinite request/onNext loops.
129+
*
130+
* Override this method in case your implementation synchronously signals very large batches before reacting to cancellation.
131+
*/
132+
public long maxOnNextSignalsInRecursionTest() {
133+
return 1000;
134+
}
135+
127136
/**
128137
* The amount of time after which a cancelled Subscriber reference should be dropped.
129138
* See Rule 3.13 for details.
@@ -637,18 +646,33 @@ protected Long initialValue() {
637646
}
638647
};
639648

649+
final Latch runCompleted = new Latch(env);
650+
640651
final ManualSubscriber<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(env) {
652+
// counts the number of signals received, used to break out from possibly infinite request/onNext loops
653+
long signalsReceived = 0L;
654+
641655
@Override
642656
public void onNext(T element) {
657+
// NOT calling super.onNext as this test only cares about stack depths, not the actual values of elements
658+
// which also simplifies this test as we do not have to drain the test buffer, which would otherwise be in danger of overflowing
659+
660+
signalsReceived += 1;
643661
stackDepthCounter.set(stackDepthCounter.get() + 1);
644-
super.onNext(element);
662+
env.debug(String.format("%s(recursion depth: %d)::onNext(%s)", this, stackDepthCounter.get(), element));
645663

646-
final Long callsUntilNow = stackDepthCounter.get();
664+
final long callsUntilNow = stackDepthCounter.get();
647665
if (callsUntilNow > boundedDepthOfOnNextAndRequestRecursion()) {
648666
env.flop(String.format("Got %d onNext calls within thread: %s, yet expected recursive bound was %d",
649667
callsUntilNow, Thread.currentThread(), boundedDepthOfOnNextAndRequestRecursion()));
650668

651669
// stop the recursive call chain
670+
runCompleted.close();
671+
return;
672+
} else if (signalsReceived > maxOnNextSignalsInRecursionTest()) {
673+
// since max number of signals reached, and recursion depth not exceeded, we judge this as a success and
674+
// stop the recursive call chain
675+
runCompleted.close();
652676
return;
653677
}
654678

@@ -657,14 +681,35 @@ public void onNext(T element) {
657681

658682
stackDepthCounter.set(stackDepthCounter.get() - 1);
659683
}
684+
685+
@Override
686+
public void onComplete() {
687+
super.onComplete();
688+
runCompleted.close();
689+
}
690+
691+
@Override
692+
public void onError(Throwable cause) {
693+
super.onError(cause);
694+
runCompleted.close();
695+
}
660696
};
661697

662-
env.subscribe(pub, sub);
698+
try {
699+
env.subscribe(pub, sub);
663700

664-
sub.request(1); // kick-off the `request -> onNext -> request -> onNext -> ...`
701+
sub.request(1); // kick-off the `request -> onNext -> request -> onNext -> ...`
665702

666-
sub.nextElementOrEndOfStream();
667-
env.verifyNoAsyncErrors();
703+
final String msg = String.format("Unable to validate call stack depth safety, " +
704+
"awaited at-most %s signals (`maxOnNextSignalsInRecursionTest()`) or completion",
705+
maxOnNextSignalsInRecursionTest());
706+
runCompleted.expectClose(env.defaultTimeoutMillis(), msg);
707+
env.verifyNoAsyncErrors();
708+
} finally {
709+
// since the request/onNext recursive calls may keep the publisher running "forever",
710+
// we MUST cancel it manually before exiting this test case
711+
sub.cancel();
712+
}
668713
}
669714
});
670715
}

tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,9 @@
33
import org.reactivestreams.Publisher;
44
import org.reactivestreams.Subscriber;
55
import org.reactivestreams.Subscription;
6-
import org.reactivestreams.tck.support.SubscriberBufferOverflowException;
76
import org.reactivestreams.tck.support.Optional;
7+
import org.reactivestreams.tck.support.SubscriberBufferOverflowException;
88

9-
import java.text.NumberFormat;
109
import java.util.LinkedList;
1110
import java.util.List;
1211
import java.util.concurrent.ArrayBlockingQueue;

0 commit comments

Comments
 (0)