Skip to content

Commit fcd5387

Browse files
committed
Allow skipping tests which require more subscribers
if your impl is not able to support multiple subscribers
1 parent 4738a43 commit fcd5387

File tree

5 files changed

+115
-66
lines changed

5 files changed

+115
-66
lines changed

tck/src/main/java/org/reactivestreams/tck/Annotations.java

+10
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,16 @@ private Annotations() {}
3333
static @interface Required {
3434
}
3535

36+
/**
37+
* Used to mark how many subscribers will be used on the publisher under test.
38+
* If an implementation declares it does not support enough subscribers as needed by such test, it will be SKIPPed instead of FAILed.
39+
*/
40+
@Target(ElementType.METHOD)
41+
@Retention(RetentionPolicy.SOURCE)
42+
static @interface Subscribers {
43+
long value() default 1;
44+
}
45+
3646
/**
3747
* Used to mark tests which may be skipped / not implemented by certain implementations.
3848
* These tests can be skipped by returning null from the given factory method.

tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java

+94-60
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@
44
import org.reactivestreams.Publisher;
55
import org.reactivestreams.Subscriber;
66
import org.reactivestreams.Subscription;
7+
import org.reactivestreams.tck.Annotations.Subscribers;
78
import org.reactivestreams.tck.TestEnvironment.ManualPublisher;
89
import org.reactivestreams.tck.TestEnvironment.ManualSubscriber;
910
import org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport;
1011
import org.reactivestreams.tck.TestEnvironment.Promise;
12+
import org.reactivestreams.tck.support.Function;
1113
import org.testng.annotations.BeforeMethod;
1214
import org.testng.annotations.Test;
1315

@@ -130,17 +132,28 @@ public long maxElementsFromPublisher() {
130132
}
131133

132134
/**
133-
* In order to verify rule 3.3 of the reactive streams spec, this number will be used to check if a
134-
* {@code Subscription} actually solves the "unbounded recursion" problem by not allowing the number of
135-
* recursive calls to exceed the number returned by this method.
136-
*
137-
* @see <a href="https://github.com/reactive-streams/reactive-streams#3.3">reactive streams spec, rule 3.3</a>
138-
* @see PublisherVerification#spec303_mustNotAllowUnboundedRecursion()
139-
*/
135+
* In order to verify rule 3.3 of the reactive streams spec, this number will be used to check if a
136+
* {@code Subscription} actually solves the "unbounded recursion" problem by not allowing the number of
137+
* recursive calls to exceed the number returned by this method.
138+
*
139+
* @see <a href="https://github.com/reactive-streams/reactive-streams#3.3">reactive streams spec, rule 3.3</a>
140+
* @see PublisherVerification#spec303_mustNotAllowUnboundedRecursion()
141+
*/
140142
public long boundedDepthOfOnNextAndRequestRecursion() {
141143
return 1;
142144
}
143145

146+
/**
147+
* Describes the tested implementation in terms of how many subscribers they can support.
148+
* Some tests require the {@code Publisher} under test to support multiple Subscribers,
149+
* yet the spec does not require all publishers to be able to do so, thus – if an implementation
150+
* supports only a limited number of subscribers (e.g. only 1 subscriber, also known as "no fanout")
151+
* you MUST return that number from this method by overriding it.
152+
*/
153+
public long maxSupportedSubscribers() {
154+
return Long.MAX_VALUE;
155+
}
156+
144157
////////////////////// TEST ENV CLEANUP /////////////////////////////////////
145158

146159
@BeforeMethod
@@ -310,31 +323,37 @@ public void spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue()
310323

311324
// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.4
312325
// for multiple subscribers
313-
@Test
314-
public void spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError() throws Exception {
315-
new TestSetup(env, processorBufferSize) {{
316-
final ManualSubscriberWithErrorCollection<T> sub1 = new ManualSubscriberWithErrorCollection<T>(env);
317-
env.subscribe(processor, sub1);
326+
@Test @Subscribers(2)
327+
public void spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError() throws Throwable {
328+
optionalMultipleSubscribersTest(2, new Function<Long,TestSetup>() {
329+
@Override
330+
public TestSetup apply(Long aLong) throws Throwable {
331+
return new TestSetup(env, processorBufferSize) {{
332+
final ManualSubscriberWithErrorCollection<T> sub1 = new ManualSubscriberWithErrorCollection<T>(env);
333+
env.subscribe(processor, sub1);
318334

319-
final ManualSubscriberWithErrorCollection<T> sub2 = new ManualSubscriberWithErrorCollection<T>(env);
320-
env.subscribe(processor, sub2);
335+
final ManualSubscriberWithErrorCollection<T> sub2 = new ManualSubscriberWithErrorCollection<T>(env);
336+
env.subscribe(processor, sub2);
321337

322-
sub1.request(1);
323-
expectRequest();
324-
final T x = sendNextTFromUpstream();
325-
expectNextElement(sub1, x);
326-
sub1.request(1);
338+
sub1.request(1);
339+
expectRequest();
340+
final T x = sendNextTFromUpstream();
341+
expectNextElement(sub1, x);
342+
sub1.request(1);
327343

328-
// sub1 has received one element, and has one demand pending
329-
// sub2 has not yet requested anything
344+
// sub1 has received one element, and has one demand pending
345+
// sub2 has not yet requested anything
330346

331-
final Exception ex = new RuntimeException("Test exception");
332-
sendError(ex);
333-
sub1.expectError(ex);
334-
sub2.expectError(ex);
347+
final Exception ex = new RuntimeException("Test exception");
348+
sendError(ex);
349+
sub1.expectError(ex);
350+
sub2.expectError(ex);
335351

336-
env.verifyNoAsyncErrors();
337-
}};
352+
env.verifyNoAsyncErrors();
353+
}};
354+
355+
}
356+
});
338357
}
339358

340359
////////////////////// SUBSCRIBER RULES VERIFICATION ///////////////////////////
@@ -580,49 +599,54 @@ public void spec317_mustSupportAPendingElementCountUpToLongMaxValue() throws Thr
580599

581600
// A Processor
582601
// must trigger `requestFromUpstream` for elements that have been requested 'long ago'
583-
@Test
584-
public void mustRequestFromUpstreamForElementsThatHaveBeenRequestedLongAgo() throws Exception {
585-
new TestSetup(env, processorBufferSize) {{
586-
ManualSubscriber<T> sub1 = newSubscriber();
587-
sub1.request(20);
602+
@Test @Subscribers(2)
603+
public void mustRequestFromUpstreamForElementsThatHaveBeenRequestedLongAgo() throws Throwable {
604+
optionalMultipleSubscribersTest(2, new Function<Long,TestSetup>() {
605+
@Override
606+
public TestSetup apply(Long subscribers) throws Throwable {
607+
return new TestSetup(env, processorBufferSize) {{
608+
ManualSubscriber<T> sub1 = newSubscriber();
609+
sub1.request(20);
588610

589-
long totalRequests = expectRequest();
590-
final T x = sendNextTFromUpstream();
591-
expectNextElement(sub1, x);
611+
long totalRequests = expectRequest();
612+
final T x = sendNextTFromUpstream();
613+
expectNextElement(sub1, x);
592614

593-
if (totalRequests == 1) {
594-
totalRequests += expectRequest();
595-
}
596-
final T y = sendNextTFromUpstream();
597-
expectNextElement(sub1, y);
615+
if (totalRequests == 1) {
616+
totalRequests += expectRequest();
617+
}
618+
final T y = sendNextTFromUpstream();
619+
expectNextElement(sub1, y);
598620

599-
if (totalRequests == 2) {
600-
totalRequests += expectRequest();
601-
}
621+
if (totalRequests == 2) {
622+
totalRequests += expectRequest();
623+
}
602624

603-
ManualSubscriber<T> sub2 = newSubscriber();
625+
final ManualSubscriber<T> sub2 = newSubscriber();
604626

605-
// sub1 now has 18 pending
606-
// sub2 has 0 pending
627+
// sub1 now has 18 pending
628+
// sub2 has 0 pending
607629

608-
final T z = sendNextTFromUpstream();
609-
expectNextElement(sub1, z);
610-
sub2.expectNone(); // since sub2 hasn't requested anything yet
630+
final T z = sendNextTFromUpstream();
631+
expectNextElement(sub1, z);
632+
sub2.expectNone(); // since sub2 hasn't requested anything yet
611633

612-
sub2.request(1);
613-
expectNextElement(sub2, z);
634+
sub2.request(1);
635+
expectNextElement(sub2, z);
614636

615-
if (totalRequests == 3) {
616-
expectRequest();
617-
}
637+
if (totalRequests == 3) {
638+
expectRequest();
639+
}
618640

619-
// to avoid error messages during test harness shutdown
620-
sendCompletion();
621-
sub1.expectCompletion(env.defaultTimeoutMillis());
622-
sub2.expectCompletion(env.defaultTimeoutMillis());
641+
// to avoid error messages during test harness shutdown
642+
sendCompletion();
643+
sub1.expectCompletion(env.defaultTimeoutMillis());
644+
sub2.expectCompletion(env.defaultTimeoutMillis());
623645

624-
env.verifyNoAsyncErrors();
625-
}};
646+
env.verifyNoAsyncErrors();
647+
}};
648+
}
649+
});
626650
}
627651

628652
/////////////////////// TEST INFRASTRUCTURE //////////////////////
@@ -635,6 +659,16 @@ public void notVerified(String message) {
635659
publisherVerification.notVerified(message);
636660
}
637661

662+
/**
663+
* Test for feature that REQUIRES multiple subscribers to be supported by Publisher.
664+
*/
665+
public void optionalMultipleSubscribersTest(long requiredSubscribersSupport, Function<Long, TestSetup> body) throws Throwable {
666+
if (requiredSubscribersSupport > maxSupportedSubscribers())
667+
notVerified("The Publisher under test only supports " +maxSupportedSubscribers()+ " subscribers, " +
668+
"while this test requires at least " + requiredSubscribersSupport + "to run.");
669+
else body.apply(requiredSubscribersSupport);
670+
}
671+
638672
public abstract class TestSetup extends ManualPublisher<T> {
639673
final private ManualSubscriber<T> tees; // gives us access to an infinite stream of T values
640674
private Set<T> seenTees = new HashSet<T>();

tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java

+1-4
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import org.reactivestreams.Subscriber;
55
import org.reactivestreams.Subscription;
66
import org.reactivestreams.tck.TestEnvironment.*;
7+
import org.reactivestreams.tck.support.Function;
78
import org.reactivestreams.tck.support.Optional;
89
import org.testng.SkipException;
910
import org.testng.annotations.BeforeMethod;
@@ -808,8 +809,4 @@ public void notVerified(String message) {
808809
throw new SkipException(message);
809810
}
810811

811-
private interface Function<In, Out> {
812-
public Out apply(In in) throws Throwable;
813-
}
814-
815812
}

tck/src/main/java/org/reactivestreams/tck/SubscriberWhiteboxVerification.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -667,7 +667,7 @@ public void expectCompletion(long timeoutMillis, String msg) throws InterruptedE
667667

668668
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
669669
public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, String requiredMessagePart) throws InterruptedException {
670-
E err = expectError(expected);
670+
final E err = expectError(expected);
671671
String message = err.getMessage();
672672
assertTrue(message.contains(requiredMessagePart),
673673
String.format("Got expected exception %s but missing message [%s], was: %s", err.getClass(), requiredMessagePart, expected));
@@ -680,7 +680,10 @@ public <E extends Throwable> E expectError(Class<E> expected) throws Interrupted
680680
@SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
681681
public <E extends Throwable> E expectError(Class<E> expected, long timeoutMillis) throws InterruptedException {
682682
error.expectCompletion(timeoutMillis, String.format("Subscriber %s did not call `registerOnError(%s)`", sub(), expected));
683-
if (expected.isInstance(error.value())) {
683+
if (error.value() == null) {
684+
env.flop(String.format("Subscriber %s did not call `registerOnError(%s)`", sub(), expected));
685+
return null; // make compiler happy
686+
} else if (expected.isInstance(error.value())) {
684687
return (E) error.value();
685688
} else {
686689
env.flop(String.format("Subscriber %s called `registerOnError(%s)` rather than `registerOnError(%s)`", sub(), error.value(), expected));
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package org.reactivestreams.tck.support;
2+
3+
public interface Function<In, Out> {
4+
public Out apply(In in) throws Throwable;
5+
}

0 commit comments

Comments
 (0)