Skip to content

Commit cc7f204

Browse files
committed
Allow skipping tests which require more subscribers
if your impl is not able to support multiple subscribers
1 parent 4738a43 commit cc7f204

File tree

5 files changed

+128
-66
lines changed

5 files changed

+128
-66
lines changed

tck/src/main/java/org/reactivestreams/tck/Annotations.java

+10
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,16 @@ private Annotations() {}
3333
static @interface Required {
3434
}
3535

36+
/**
37+
* Used to mark how many subscribers will be used on the publisher under test.
38+
* If an implementation declares it does not support enough subscribers as needed by such test, it will be SKIPPed instead of FAILed.
39+
*/
40+
@Target(ElementType.METHOD)
41+
@Retention(RetentionPolicy.SOURCE)
42+
static @interface Subscribers {
43+
long value() default 1;
44+
}
45+
3646
/**
3747
* Used to mark tests which may be skipped / not implemented by certain implementations.
3848
* These tests can be skipped by returning null from the given factory method.

tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java

+107-60
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@
44
import org.reactivestreams.Publisher;
55
import org.reactivestreams.Subscriber;
66
import org.reactivestreams.Subscription;
7+
import org.reactivestreams.tck.Annotations.Subscribers;
78
import org.reactivestreams.tck.TestEnvironment.ManualPublisher;
89
import org.reactivestreams.tck.TestEnvironment.ManualSubscriber;
910
import org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport;
1011
import org.reactivestreams.tck.TestEnvironment.Promise;
12+
import org.reactivestreams.tck.support.Function;
1113
import org.testng.annotations.BeforeMethod;
1214
import org.testng.annotations.Test;
1315

@@ -88,6 +90,11 @@ public long maxElementsFromPublisher() {
8890
public long boundedDepthOfOnNextAndRequestRecursion() {
8991
return IdentityProcessorVerification.this.boundedDepthOfOnNextAndRequestRecursion();
9092
}
93+
94+
@Override
95+
public boolean skipStochasticTests() {
96+
return IdentityProcessorVerification.this.skipStochasticTests();
97+
}
9198
};
9299
}
93100

@@ -130,17 +137,36 @@ public long maxElementsFromPublisher() {
130137
}
131138

132139
/**
133-
* In order to verify rule 3.3 of the reactive streams spec, this number will be used to check if a
134-
* {@code Subscription} actually solves the "unbounded recursion" problem by not allowing the number of
135-
* recursive calls to exceed the number returned by this method.
136-
*
137-
* @see <a href="https://github.com/reactive-streams/reactive-streams#3.3">reactive streams spec, rule 3.3</a>
138-
* @see PublisherVerification#spec303_mustNotAllowUnboundedRecursion()
139-
*/
140+
* In order to verify rule 3.3 of the reactive streams spec, this number will be used to check if a
141+
* {@code Subscription} actually solves the "unbounded recursion" problem by not allowing the number of
142+
* recursive calls to exceed the number returned by this method.
143+
*
144+
* @see <a href="https://github.com/reactive-streams/reactive-streams#3.3">reactive streams spec, rule 3.3</a>
145+
* @see PublisherVerification#spec303_mustNotAllowUnboundedRecursion()
146+
*/
140147
public long boundedDepthOfOnNextAndRequestRecursion() {
141148
return 1;
142149
}
143150

151+
/**
152+
* Override and return {@code true} in order to skip executing tests marked as {@code Stochastic}.
153+
* Such tests MAY sometimes fail even though the impl
154+
*/
155+
public boolean skipStochasticTests() {
156+
return false;
157+
}
158+
159+
/**
160+
* Describes the tested implementation in terms of how many subscribers they can support.
161+
* Some tests require the {@code Publisher} under test to support multiple Subscribers,
162+
* yet the spec does not require all publishers to be able to do so, thus – if an implementation
163+
* supports only a limited number of subscribers (e.g. only 1 subscriber, also known as "no fanout")
164+
* you MUST return that number from this method by overriding it.
165+
*/
166+
public long maxSupportedSubscribers() {
167+
return Long.MAX_VALUE;
168+
}
169+
144170
////////////////////// TEST ENV CLEANUP /////////////////////////////////////
145171

146172
@BeforeMethod
@@ -310,31 +336,37 @@ public void spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue()
310336

311337
// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.4
312338
// for multiple subscribers
313-
@Test
314-
public void spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError() throws Exception {
315-
new TestSetup(env, processorBufferSize) {{
316-
final ManualSubscriberWithErrorCollection<T> sub1 = new ManualSubscriberWithErrorCollection<T>(env);
317-
env.subscribe(processor, sub1);
339+
@Test @Subscribers(2)
340+
public void spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError() throws Throwable {
341+
optionalMultipleSubscribersTest(2, new Function<Long,TestSetup>() {
342+
@Override
343+
public TestSetup apply(Long aLong) throws Throwable {
344+
return new TestSetup(env, processorBufferSize) {{
345+
final ManualSubscriberWithErrorCollection<T> sub1 = new ManualSubscriberWithErrorCollection<T>(env);
346+
env.subscribe(processor, sub1);
318347

319-
final ManualSubscriberWithErrorCollection<T> sub2 = new ManualSubscriberWithErrorCollection<T>(env);
320-
env.subscribe(processor, sub2);
348+
final ManualSubscriberWithErrorCollection<T> sub2 = new ManualSubscriberWithErrorCollection<T>(env);
349+
env.subscribe(processor, sub2);
321350

322-
sub1.request(1);
323-
expectRequest();
324-
final T x = sendNextTFromUpstream();
325-
expectNextElement(sub1, x);
326-
sub1.request(1);
351+
sub1.request(1);
352+
expectRequest();
353+
final T x = sendNextTFromUpstream();
354+
expectNextElement(sub1, x);
355+
sub1.request(1);
327356

328-
// sub1 has received one element, and has one demand pending
329-
// sub2 has not yet requested anything
357+
// sub1 has received one element, and has one demand pending
358+
// sub2 has not yet requested anything
330359

331-
final Exception ex = new RuntimeException("Test exception");
332-
sendError(ex);
333-
sub1.expectError(ex);
334-
sub2.expectError(ex);
360+
final Exception ex = new RuntimeException("Test exception");
361+
sendError(ex);
362+
sub1.expectError(ex);
363+
sub2.expectError(ex);
335364

336-
env.verifyNoAsyncErrors();
337-
}};
365+
env.verifyNoAsyncErrors();
366+
}};
367+
368+
}
369+
});
338370
}
339371

340372
////////////////////// SUBSCRIBER RULES VERIFICATION ///////////////////////////
@@ -580,49 +612,54 @@ public void spec317_mustSupportAPendingElementCountUpToLongMaxValue() throws Thr
580612

581613
// A Processor
582614
// must trigger `requestFromUpstream` for elements that have been requested 'long ago'
583-
@Test
584-
public void mustRequestFromUpstreamForElementsThatHaveBeenRequestedLongAgo() throws Exception {
585-
new TestSetup(env, processorBufferSize) {{
586-
ManualSubscriber<T> sub1 = newSubscriber();
587-
sub1.request(20);
615+
@Test @Subscribers(2)
616+
public void mustRequestFromUpstreamForElementsThatHaveBeenRequestedLongAgo() throws Throwable {
617+
optionalMultipleSubscribersTest(2, new Function<Long,TestSetup>() {
618+
@Override
619+
public TestSetup apply(Long subscribers) throws Throwable {
620+
return new TestSetup(env, processorBufferSize) {{
621+
ManualSubscriber<T> sub1 = newSubscriber();
622+
sub1.request(20);
588623

589-
long totalRequests = expectRequest();
590-
final T x = sendNextTFromUpstream();
591-
expectNextElement(sub1, x);
624+
long totalRequests = expectRequest();
625+
final T x = sendNextTFromUpstream();
626+
expectNextElement(sub1, x);
592627

593-
if (totalRequests == 1) {
594-
totalRequests += expectRequest();
595-
}
596-
final T y = sendNextTFromUpstream();
597-
expectNextElement(sub1, y);
628+
if (totalRequests == 1) {
629+
totalRequests += expectRequest();
630+
}
631+
final T y = sendNextTFromUpstream();
632+
expectNextElement(sub1, y);
598633

599-
if (totalRequests == 2) {
600-
totalRequests += expectRequest();
601-
}
634+
if (totalRequests == 2) {
635+
totalRequests += expectRequest();
636+
}
602637

603-
ManualSubscriber<T> sub2 = newSubscriber();
638+
final ManualSubscriber<T> sub2 = newSubscriber();
604639

605-
// sub1 now has 18 pending
606-
// sub2 has 0 pending
640+
// sub1 now has 18 pending
641+
// sub2 has 0 pending
607642

608-
final T z = sendNextTFromUpstream();
609-
expectNextElement(sub1, z);
610-
sub2.expectNone(); // since sub2 hasn't requested anything yet
643+
final T z = sendNextTFromUpstream();
644+
expectNextElement(sub1, z);
645+
sub2.expectNone(); // since sub2 hasn't requested anything yet
611646

612-
sub2.request(1);
613-
expectNextElement(sub2, z);
647+
sub2.request(1);
648+
expectNextElement(sub2, z);
614649

615-
if (totalRequests == 3) {
616-
expectRequest();
617-
}
650+
if (totalRequests == 3) {
651+
expectRequest();
652+
}
618653

619-
// to avoid error messages during test harness shutdown
620-
sendCompletion();
621-
sub1.expectCompletion(env.defaultTimeoutMillis());
622-
sub2.expectCompletion(env.defaultTimeoutMillis());
654+
// to avoid error messages during test harness shutdown
655+
sendCompletion();
656+
sub1.expectCompletion(env.defaultTimeoutMillis());
657+
sub2.expectCompletion(env.defaultTimeoutMillis());
623658

624-
env.verifyNoAsyncErrors();
625-
}};
659+
env.verifyNoAsyncErrors();
660+
}};
661+
}
662+
});
626663
}
627664

628665
/////////////////////// TEST INFRASTRUCTURE //////////////////////
@@ -635,6 +672,16 @@ public void notVerified(String message) {
635672
publisherVerification.notVerified(message);
636673
}
637674

675+
/**
676+
* Test for feature that REQUIRES multiple subscribers to be supported by Publisher.
677+
*/
678+
public void optionalMultipleSubscribersTest(long requiredSubscribersSupport, Function<Long, TestSetup> body) throws Throwable {
679+
if (requiredSubscribersSupport > maxSupportedSubscribers())
680+
notVerified("The Publisher under test only supports " +maxSupportedSubscribers()+ " subscribers, " +
681+
"while this test requires at least " + requiredSubscribersSupport + "to run.");
682+
else body.apply(requiredSubscribersSupport);
683+
}
684+
638685
public abstract class TestSetup extends ManualPublisher<T> {
639686
final private ManualSubscriber<T> tees; // gives us access to an infinite stream of T values
640687
private Set<T> seenTees = new HashSet<T>();

tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java

+1-4
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import org.reactivestreams.Subscriber;
55
import org.reactivestreams.Subscription;
66
import org.reactivestreams.tck.TestEnvironment.*;
7+
import org.reactivestreams.tck.support.Function;
78
import org.reactivestreams.tck.support.Optional;
89
import org.testng.SkipException;
910
import org.testng.annotations.BeforeMethod;
@@ -808,8 +809,4 @@ public void notVerified(String message) {
808809
throw new SkipException(message);
809810
}
810811

811-
private interface Function<In, Out> {
812-
public Out apply(In in) throws Throwable;
813-
}
814-
815812
}

tck/src/main/java/org/reactivestreams/tck/SubscriberWhiteboxVerification.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -667,7 +667,7 @@ public void expectCompletion(long timeoutMillis, String msg) throws InterruptedE
667667

668668
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
669669
public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, String requiredMessagePart) throws InterruptedException {
670-
E err = expectError(expected);
670+
final E err = expectError(expected);
671671
String message = err.getMessage();
672672
assertTrue(message.contains(requiredMessagePart),
673673
String.format("Got expected exception %s but missing message [%s], was: %s", err.getClass(), requiredMessagePart, expected));
@@ -680,7 +680,10 @@ public <E extends Throwable> E expectError(Class<E> expected) throws Interrupted
680680
@SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
681681
public <E extends Throwable> E expectError(Class<E> expected, long timeoutMillis) throws InterruptedException {
682682
error.expectCompletion(timeoutMillis, String.format("Subscriber %s did not call `registerOnError(%s)`", sub(), expected));
683-
if (expected.isInstance(error.value())) {
683+
if (error.value() == null) {
684+
env.flop(String.format("Subscriber %s did not call `registerOnError(%s)`", sub(), expected));
685+
return null; // make compiler happy
686+
} else if (expected.isInstance(error.value())) {
684687
return (E) error.value();
685688
} else {
686689
env.flop(String.format("Subscriber %s called `registerOnError(%s)` rather than `registerOnError(%s)`", sub(), error.value(), expected));
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package org.reactivestreams.tck.support;
2+
3+
public interface Function<In, Out> {
4+
public Out apply(In in) throws Throwable;
5+
}

0 commit comments

Comments
 (0)