Skip to content

Commit 9efb141

Browse files
committed
+tck #181 provides default helperPublisher for subscriber tests
+ When subscriber verifications are extended using Integer we can use the existing publisher implementations from examples to drive the tests. This allows implementers who only care about implementing Subscribers to not care at all about implementing Publishers + In case the subscriber verification is extended using a custom signal type (defined as "not Integer") we do not magically generate a Publisher of this type, but instead fail the tests telling the implementer that he/she will need to implement a custom helperPublisher which is exactly what we have up until now always demanded from implementations. + We also provide tests to verify the test failures work as expected, and that the messages are indeed helpful. + Updated TCK README.md to clarify when to implement a custom publisher + fixed mistakenly added `<T>` on NumberPublisher - it is bound to Ints + TCK now depends on examples, as it uses the well documented async publisher to provide the default publisher for numbers. As well as in order to allow implementers to reuse those publishers when implementing custom Publishers to drive their Subscriber specs.
1 parent 317593f commit 9efb141

File tree

11 files changed

+433
-58
lines changed

11 files changed

+433
-58
lines changed

.travis.yml

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
language: java
22
script:
3-
- ./gradlew check
3+
- ./gradlew check
44
cache:
55
directories:
66
- $HOME/.gradle
77
jdk:
8-
- openjdk6
8+
- openjdk6
99
env:
1010
global:
1111
- TERM=dumb

examples/src/main/java/org/reactivestreams/example/unicast/AsyncSubscriber.java

-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package org.reactivestreams.example.unicast;
22

3-
import org.reactivestreams.Publisher;
43
import org.reactivestreams.Subscriber;
54
import org.reactivestreams.Subscription;
65

examples/src/main/java/org/reactivestreams/example/unicast/SyncSubscriber.java

-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package org.reactivestreams.example.unicast;
22

3-
import org.reactivestreams.Publisher;
43
import org.reactivestreams.Subscriber;
54
import org.reactivestreams.Subscription;
65

examples/src/test/java/org/reactivestreams/example/unicast/AsyncSubscriberTest.java

+1-9
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package org.reactivestreams.example.unicast;
22

3-
import java.util.Collections;
4-
import java.util.Iterator;
53
import org.reactivestreams.Publisher;
64
import org.reactivestreams.Subscriber;
75
import org.reactivestreams.tck.SubscriberBlackboxVerification;
@@ -31,17 +29,11 @@ public AsyncSubscriberTest() {
3129

3230
@Override public Subscriber<Integer> createSubscriber() {
3331
return new AsyncSubscriber<Integer>(e) {
34-
private long acc;
3532
@Override protected boolean whenNext(final Integer element) {
3633
return true;
3734
}
3835
};
3936
}
40-
@SuppressWarnings("unchecked")
41-
@Override public Publisher<Integer> createHelperPublisher(long elements) {
42-
if (elements > Integer.MAX_VALUE) return new InfiniteIncrementNumberPublisher(e);
43-
else return new NumberIterablePublisher(0, (int)elements, e);
44-
}
4537

4638
@Test public void testAccumulation() throws InterruptedException {
4739

@@ -60,7 +52,7 @@ public AsyncSubscriberTest() {
6052
}
6153
};
6254

63-
new NumberIterablePublisher<Integer>(0, 10, e).subscribe(sub);
55+
new NumberIterablePublisher(0, 10, e).subscribe(sub);
6456
latch.await(DefaultTimeoutMillis * 10, TimeUnit.MILLISECONDS);
6557
assertEquals(i.get(), 45);
6658
}

examples/src/test/java/org/reactivestreams/example/unicast/SyncSubscriberTest.java

+4-11
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,14 @@
11
package org.reactivestreams.example.unicast;
22

3-
import java.util.Collections;
4-
import java.util.Iterator;
5-
import org.reactivestreams.Publisher;
63
import org.reactivestreams.Subscriber;
74
import org.reactivestreams.tck.SubscriberBlackboxVerification;
85
import org.reactivestreams.tck.TestEnvironment;
6+
import org.testng.annotations.AfterClass;
7+
import org.testng.annotations.BeforeClass;
98
import org.testng.annotations.Test;
109

11-
import org.testng.annotations.BeforeClass;
12-
import org.testng.annotations.AfterClass;
13-
import java.util.concurrent.Executors;
1410
import java.util.concurrent.ExecutorService;
11+
import java.util.concurrent.Executors;
1512

1613
@Test // Must be here for TestNG to find and run this, do not remove
1714
public class SyncSubscriberTest extends SubscriberBlackboxVerification<Integer> {
@@ -39,9 +36,5 @@ public SyncSubscriberTest() {
3936
}
4037
};
4138
}
42-
@SuppressWarnings("unchecked")
43-
@Override public Publisher<Integer> createHelperPublisher(long elements) {
44-
if (elements > Integer.MAX_VALUE) return new InfiniteIncrementNumberPublisher(e);
45-
else return new NumberIterablePublisher(0, (int)elements, e);
46-
}
39+
4740
}

tck/README.md

+30-10
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,36 @@ Subscriber rules Verification is split up into two files (styles) of tests.
178178

179179
The Blackbox Verification tests do not require the implementation under test to be modified at all, yet they are *not* able to verify most rules. In Whitebox Verification, more control over `request()` calls etc. is required in order to validate rules more precisely.
180180

181+
### Helper Publisher implementations
182+
Since testing a `Subscriber` is not possible without a corresponding `Publisher` the TCK Subscriber Verifications
183+
both provide a default "helper publisher" to drive its test and also alow to replace this Publisher with a custom implementation.
184+
185+
For simple Subscribers which are able to consume elements of *any type*, it is **highly recommmended** to extend the
186+
SubscriberVerification (described below) classes providing the element type `java.lang.Integer`, like so: `... extends SubscriberBlackboxVerification<Integer>`.
187+
The reason for this is, that the TCK contains a default Publisher implementation which is able to signal `Integer` elements,
188+
thus alowing the implementer to strictly focus on only implementing a proper `Subscriber`, instead of having to implement
189+
an additional Publisher only in order to drive the Subscribers tests. This is especially important for library implementers
190+
which only want to implement a Subscriber – and do not want to spend time or thought on implementing a valid Publisher.
191+
192+
If however any SubscriberVerification class is extended using a custom element type, e.g. like this `... extends SubscriberBlackboxVerification<Message>`,
193+
*the TCK will immediatly fail the entire subscriber test class* as it is unable to properly create signals of type `Message`
194+
(which can be some custom message type the `Subscriber` is able to consume). The exception thrown (`UnableToProvidePublisherException`)
195+
contains some information and directs the implementer towards implementing a custom helper publisher,
196+
which is done by overriding the `Publisher<T> createHelperPublisher(long elements)` method:
197+
198+
```java
199+
@Override public Publisher<Message> createHelperPublisher(long elements) {
200+
return new Publisher<Message>() { /* IMPL HERE */ };
201+
}
202+
```
203+
204+
Summing up, we recommend implementing Subscribers which are able to consume any type of element, in this case the TCK
205+
should be driven using `Integer` elements as default publishers are already implemented for this type. If the
206+
`Subscriber` is unable to consume `Integer` elements, the implementer MUST implement a custom `Publisher<T>` that will
207+
be able to signal the required element types. It is of course both possible and recommended to re-use existing
208+
implemenations (which can be seen in the examples sub-project) to create these custom Publishers – an example of
209+
such re-use can be found in [ProvidedHelperPublisherForSubscriberVerificationTest#createStringPublisher](https://github.com/reactive-streams/reactive-streams/blob/master/tck/src/test/java/org/reactivestreams/tck/ProvidedHelperPublisherForSubscriberVerificationTest.java#L215)
210+
181211
### Subscriber Blackbox Verification
182212

183213
Blackbox Verification does not require any additional work except from providing a `Subscriber` and `Publisher` instances to the TCK:
@@ -203,11 +233,6 @@ public class MySubscriberBlackboxVerificationTest extends SubscriberBlackboxVeri
203233
public Subscriber<Integer> createSubscriber() {
204234
return new MySubscriber<Integer>();
205235
}
206-
207-
@Override
208-
public Publisher<Integer> createHelperPublisher(long elements) {
209-
return new MyRangePublisher<Integer>(1, elements);
210-
}
211236
}
212237
```
213238

@@ -283,11 +308,6 @@ public class MySubscriberWhiteboxVerificationTest extends SubscriberWhiteboxVeri
283308
};
284309
}
285310

286-
@Override
287-
public Publisher<Integer> createHelperPublisher(long elements) {
288-
return new MyRangePublisher<Integer>(1, elements);
289-
}
290-
291311
}
292312
```
293313

tck/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,6 @@ description = 'reactive-streams-tck'
22
dependencies {
33
compile group: 'org.testng', name: 'testng', version:'5.14.10'
44
compile project(':reactive-streams')
5+
compile project(':reactive-streams-examples')
56
}
67
test.useTestNG()

tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java

+69-7
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,24 @@
33
import org.reactivestreams.Publisher;
44
import org.reactivestreams.Subscriber;
55
import org.reactivestreams.Subscription;
6+
import org.reactivestreams.example.unicast.InfiniteIncrementNumberPublisher;
7+
import org.reactivestreams.example.unicast.NumberIterablePublisher;
68
import org.reactivestreams.tck.TestEnvironment.ManualPublisher;
79
import org.reactivestreams.tck.TestEnvironment.ManualSubscriber;
810
import org.reactivestreams.tck.support.Optional;
911
import org.reactivestreams.tck.support.TestException;
12+
import org.reactivestreams.tck.support.UnableToProvidePublisherException;
1013
import org.testng.SkipException;
14+
import org.testng.annotations.AfterClass;
15+
import org.testng.annotations.BeforeClass;
1116
import org.testng.annotations.BeforeMethod;
17+
import org.testng.annotations.BeforeTest;
1218
import org.testng.annotations.Test;
1319

20+
import java.lang.reflect.ParameterizedType;
21+
import java.util.concurrent.ExecutorService;
22+
import java.util.concurrent.Executors;
23+
1424
import static org.reactivestreams.tck.Annotations.NotVerified;
1525
import static org.reactivestreams.tck.Annotations.Required;
1626
import static org.reactivestreams.tck.SubscriberWhiteboxVerification.BlackboxSubscriberProxy;
@@ -34,28 +44,80 @@ protected SubscriberBlackboxVerification(TestEnvironment env) {
3444
this.env = env;
3545
}
3646

47+
// USER API
48+
3749
/**
3850
* This is the main method you must implement in your test incarnation.
3951
* It must create a new {@link org.reactivestreams.Subscriber} instance to be subjected to the testing logic.
4052
*/
4153
public abstract Subscriber<T> createSubscriber();
4254

55+
// ENV SETUP
56+
57+
/**
58+
* Executor service used by the default provided asynchronous Publisher.
59+
* @see #createHelperPublisher(long)
60+
*/
61+
private ExecutorService publisherExecutor;
62+
@BeforeClass public void startPublisherExecutorService() { publisherExecutor = Executors.newFixedThreadPool(4); }
63+
@AfterClass public void shutdownPublisherExecutorService() { if (publisherExecutor != null) publisherExecutor.shutdown(); }
64+
public ExecutorService publisherExecutorService() { return publisherExecutor; }
65+
66+
/**
67+
* Returns the Class of the {@code <T>} type parameter with which this SubscriberVerification was instanciated.
68+
* This is used to validate if the default provided helper {@link Publisher} can drive the tests for the tested
69+
* {@link Subscriber} or not.
70+
*
71+
* @see #createHelperPublisher(long)
72+
*/
73+
@SuppressWarnings("unchecked")
74+
public Class<T> elementClass() {
75+
return (Class<T>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
76+
}
77+
4378
/**
44-
* Helper method required for generating test elements.
45-
* It must create a {@link org.reactivestreams.Publisher} for a stream with exactly the given number of elements.
79+
* Validates if the required element type is able to be signalled by the default Publisher.
4680
* <p>
47-
* It also must treat the following numbers of elements in these specific ways:
81+
* If it is able to create such Publisher returns {@code true},
82+
* otherwise throws an {@link org.reactivestreams.tck.support.UnableToProvidePublisherException}.
83+
*/
84+
@BeforeTest public boolean validateAbilityToProvidePublisher() {
85+
if (!elementClass().isAssignableFrom(Integer.class)) {
86+
throw new UnableToProvidePublisherException(String.format("Unable to provide Publisher of `%s` elements (can only emit `java.lang.Integer`)!", elementClass().getCanonicalName()));
87+
} else return true;
88+
}
89+
90+
/**
91+
* Helper method required for creating the Publisher to which the tested Subscriber will be subscribed and tested against.
92+
* <p>
93+
* By default an <b>Asynchronous Integer signalling Publisher</b> is provided, however if this SubscriberVerification
94+
* was created using a different type (and your Subscriber is only able to consume those signals (e.g. "MyMessage"))
95+
* the default provided Publisher will NOT be able to generate those messages and will fail the entire test class and
96+
* asking you to provide a custom {@link Publisher} which is able to generate elements of type {@code T}.
97+
* <p>
98+
* When implementing a custom helper Publisher it MUST emit the exact number of elements asked for (via the {@code elements}
99+
* parameter) and MUST also must treat the following numbers of elements in these specific ways:
48100
* <ul>
49101
* <li>
50-
* If {@code elements} is {@code Long.MAX_VALUE} the produced stream must be infinite.
102+
* If {@code elements} is {@code Long.MAX_VALUE} the produced stream must be infinite.
51103
* </li>
52104
* <li>
53-
* If {@code elements} is {@code 0} the {@code Publisher} should signal {@code onComplete} immediatly.
54-
* In other words, it should represent a "completed stream".
105+
* If {@code elements} is {@code 0} the {@code Publisher} should signal {@code onComplete} immediatly.
106+
* In other words, it should represent a "completed stream".
55107
* </li>
56108
* </ul>
57109
*/
58-
public abstract Publisher<T> createHelperPublisher(long elements);
110+
@SuppressWarnings("unchecked")
111+
public Publisher<T> createHelperPublisher(long elements) {
112+
if (validateAbilityToProvidePublisher()) {
113+
// we can safely provide the Integer based Publishers
114+
if (elements > Integer.MAX_VALUE) {
115+
return (Publisher<T>) new InfiniteIncrementNumberPublisher(publisherExecutorService());
116+
} else {
117+
return (Publisher<T>) new NumberIterablePublisher(0, (int)elements, publisherExecutorService());
118+
}
119+
} else throw new RuntimeException("Unreachable. `validateAbilityToProvidePublisher` should have thrown!");
120+
}
59121

60122
////////////////////// TEST ENV CLEANUP /////////////////////////////////////
61123

tck/src/main/java/org/reactivestreams/tck/SubscriberWhiteboxVerification.java

+68-17
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,25 @@
33
import org.reactivestreams.Publisher;
44
import org.reactivestreams.Subscriber;
55
import org.reactivestreams.Subscription;
6+
import org.reactivestreams.example.unicast.InfiniteIncrementNumberPublisher;
7+
import org.reactivestreams.example.unicast.NumberIterablePublisher;
68
import org.reactivestreams.tck.Annotations.Additional;
79
import org.reactivestreams.tck.TestEnvironment.*;
810
import org.reactivestreams.tck.support.Function;
911
import org.reactivestreams.tck.support.Optional;
1012
import org.reactivestreams.tck.support.TestException;
13+
import org.reactivestreams.tck.support.UnableToProvidePublisherException;
1114
import org.testng.SkipException;
15+
import org.testng.annotations.AfterClass;
16+
import org.testng.annotations.BeforeClass;
1217
import org.testng.annotations.BeforeMethod;
18+
import org.testng.annotations.BeforeTest;
1319
import org.testng.annotations.Test;
1420

21+
import java.lang.reflect.ParameterizedType;
22+
import java.util.concurrent.ExecutorService;
23+
import java.util.concurrent.Executors;
24+
1525
import static org.reactivestreams.tck.Annotations.NotVerified;
1626
import static org.reactivestreams.tck.Annotations.Required;
1727
import static org.testng.Assert.assertEquals;
@@ -31,6 +41,8 @@ protected SubscriberWhiteboxVerification(TestEnvironment env) {
3141
this.env = env;
3242
}
3343

44+
// USER API
45+
3446
/**
3547
* This is the main method you must implement in your test incarnation.
3648
* It must create a new {@link org.reactivestreams.Subscriber} instance to be subjected to the testing logic.
@@ -40,32 +52,71 @@ protected SubscriberWhiteboxVerification(TestEnvironment env) {
4052
*/
4153
public abstract Subscriber<T> createSubscriber(WhiteboxSubscriberProbe<T> probe);
4254

55+
// ENV SETUP
56+
57+
/**
58+
* Executor service used by the default provided asynchronous Publisher.
59+
* @see #createHelperPublisher(long)
60+
*/
61+
private ExecutorService publisherExecutor;
62+
@BeforeClass public void startPublisherExecutorService() { publisherExecutor = Executors.newFixedThreadPool(4); }
63+
@AfterClass public void shutdownPublisherExecutorService() { if (publisherExecutor != null) publisherExecutor.shutdown(); }
64+
public ExecutorService publisherExecutorService() { return publisherExecutor; }
65+
4366
/**
44-
* Helper method required for generating test elements.
45-
* It must create a {@link org.reactivestreams.Publisher} for a stream with exactly the given number of elements.
67+
* Returns the Class of the {@code <T>} type parameter with which this SubscriberVerification was instanciated.
68+
* This is used to validate if the default provided helper {@link Publisher} can drive the tests for the tested
69+
* {@link Subscriber} or not.
70+
*
71+
* @see #createHelperPublisher(long)
72+
*/
73+
@SuppressWarnings("unchecked")
74+
public Class<T> elementClass() {
75+
return (Class<T>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
76+
}
77+
78+
/**
79+
* Validates if the required element type is able to be signalled by the default Publisher.
4680
* <p>
47-
* It also must treat the following numbers of elements in these specific ways:
81+
* If it is able to create such Publisher returns {@code true},
82+
* otherwise throws an {@link org.reactivestreams.tck.support.UnableToProvidePublisherException}.
83+
*/
84+
@BeforeTest public boolean validateAbilityToProvidePublisher() {
85+
if (!elementClass().isAssignableFrom(Integer.class)) {
86+
throw new UnableToProvidePublisherException(String.format("Unable to provide Publisher of `%s` elements (can only emit `java.lang.Integer`)!", elementClass().getCanonicalName()));
87+
} else return true;
88+
}
89+
90+
/**
91+
* Helper method required for creating the Publisher to which the tested Subscriber will be subscribed and tested against.
92+
* <p>
93+
* By default an <b>Asynchronous Integer signalling Publisher</b> is provided, however if this SubscriberVerification
94+
* was created using a different type (and your Subscriber is only able to consume those signals (e.g. "MyMessage"))
95+
* the default provided Publisher will NOT be able to generate those messages and will fail the entire test class and
96+
* asking you to provide a custom {@link Publisher} which is able to generate elements of type {@code T}.
97+
* <p>
98+
* When implementing a custom helper Publisher it MUST emit the exact number of elements asked for (via the {@code elements}
99+
* parameter) and MUST also must treat the following numbers of elements in these specific ways:
48100
* <ul>
49101
* <li>
50-
* If {@code elements} is {@code Long.MAX_VALUE} the produced stream must be infinite.
102+
* If {@code elements} is {@code Long.MAX_VALUE} the produced stream must be infinite.
51103
* </li>
52104
* <li>
53-
* If {@code elements} is {@code 0} the {@code Publisher} should signal {@code onComplete} immediatly.
54-
* In other words, it should represent a "completed stream".
105+
* If {@code elements} is {@code 0} the {@code Publisher} should signal {@code onComplete} immediatly.
106+
* In other words, it should represent a "completed stream".
55107
* </li>
56108
* </ul>
57109
*/
58-
public abstract Publisher<T> createHelperPublisher(long elements);
59-
60-
/**
61-
* Used to break possibly infinite wait-loops.
62-
* Some Rules use the "eventually stop signalling" wording, which requires the test to spin accepting {@code onNext}
63-
* signals until no more are signalled. In these tests, this value will be used as upper bound on the number of spin iterations.
64-
*
65-
* Override this method in case your implementation synchronously signals very large batches before reacting to cancellation (for example).
66-
*/
67-
public long maxOnNextSignalsInTest() {
68-
return 100;
110+
@SuppressWarnings("unchecked")
111+
public Publisher<T> createHelperPublisher(long elements) {
112+
if (validateAbilityToProvidePublisher()) {
113+
// we can safely provide the Integer based Publishers
114+
if (elements > Integer.MAX_VALUE) {
115+
return (Publisher<T>) new InfiniteIncrementNumberPublisher(publisherExecutorService());
116+
} else {
117+
return (Publisher<T>) new NumberIterablePublisher(0, (int)elements, publisherExecutorService());
118+
}
119+
} else throw new RuntimeException("Unreachable. `validateAbilityToProvidePublisher` should have thrown!");
69120
}
70121

71122
////////////////////// TEST ENV CLEANUP /////////////////////////////////////

0 commit comments

Comments
 (0)