Skip to content

Commit 3c72b72

Browse files
ktosoviktorklang
authored andcommitted
+tck #181 provides default helperPublisher for subscriber tests
+ When subscriber verifications are extended using Integer we can use the existing publisher implementations from examples to drive the tests. This allows implementers who only care about implementing Subscribers to not care at all about implementing Publishers + In case the subscriber verification is extended using a custom signal type (defined as "not Integer") we do not magically generate a Publisher of this type, but instead fail the tests telling the implementer that he/she will need to implement a custom helperPublisher which is exactly what we have up until now always demanded from implementations. + We also provide tests to verify the test failures work as expected, and that the messages are indeed helpful. + Updated TCK README.md to clarify when to implement a custom publisher + fixed mistakenly added `<T>` on NumberPublisher - it is bound to Ints + TCK now depends on examples, as it uses the well documented async publisher to provide the default publisher for numbers. As well as in order to allow implementers to reuse those publishers when implementing custom Publishers to drive their Subscriber specs. ! Moved away from createHelper to createElement style of testing subscribers
1 parent 93415d1 commit 3c72b72

17 files changed

+288
-106
lines changed

.travis.yml

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
language: java
22
script:
3-
- ./gradlew check
3+
- ./gradlew check
44
cache:
55
directories:
66
- $HOME/.gradle
77
jdk:
8-
- openjdk6
8+
- openjdk6
99
env:
1010
global:
1111
- TERM=dumb

examples/src/main/java/org/reactivestreams/example/unicast/AsyncSubscriber.java

-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package org.reactivestreams.example.unicast;
22

3-
import org.reactivestreams.Publisher;
43
import org.reactivestreams.Subscriber;
54
import org.reactivestreams.Subscription;
65

examples/src/main/java/org/reactivestreams/example/unicast/SyncSubscriber.java

-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package org.reactivestreams.example.unicast;
22

3-
import org.reactivestreams.Publisher;
43
import org.reactivestreams.Subscriber;
54
import org.reactivestreams.Subscription;
65

examples/src/test/java/org/reactivestreams/example/unicast/AsyncSubscriberTest.java

+4-8
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package org.reactivestreams.example.unicast;
22

3-
import java.util.Collections;
4-
import java.util.Iterator;
53
import org.reactivestreams.Publisher;
64
import org.reactivestreams.Subscriber;
75
import org.reactivestreams.tck.SubscriberBlackboxVerification;
@@ -31,17 +29,11 @@ public AsyncSubscriberTest() {
3129

3230
@Override public Subscriber<Integer> createSubscriber() {
3331
return new AsyncSubscriber<Integer>(e) {
34-
private long acc;
3532
@Override protected boolean whenNext(final Integer element) {
3633
return true;
3734
}
3835
};
3936
}
40-
@SuppressWarnings("unchecked")
41-
@Override public Publisher<Integer> createHelperPublisher(long elements) {
42-
if (elements > Integer.MAX_VALUE) return new InfiniteIncrementNumberPublisher(e);
43-
else return new NumberIterablePublisher(0, (int)elements, e);
44-
}
4537

4638
@Test public void testAccumulation() throws InterruptedException {
4739

@@ -64,4 +56,8 @@ public AsyncSubscriberTest() {
6456
latch.await(DefaultTimeoutMillis * 10, TimeUnit.MILLISECONDS);
6557
assertEquals(i.get(), 45);
6658
}
59+
60+
@Override public Integer createElement(int element) {
61+
return element;
62+
}
6763
}

examples/src/test/java/org/reactivestreams/example/unicast/SyncSubscriberTest.java

+6-10
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,14 @@
11
package org.reactivestreams.example.unicast;
22

3-
import java.util.Collections;
4-
import java.util.Iterator;
5-
import org.reactivestreams.Publisher;
63
import org.reactivestreams.Subscriber;
74
import org.reactivestreams.tck.SubscriberBlackboxVerification;
85
import org.reactivestreams.tck.TestEnvironment;
6+
import org.testng.annotations.AfterClass;
7+
import org.testng.annotations.BeforeClass;
98
import org.testng.annotations.Test;
109

11-
import org.testng.annotations.BeforeClass;
12-
import org.testng.annotations.AfterClass;
13-
import java.util.concurrent.Executors;
1410
import java.util.concurrent.ExecutorService;
11+
import java.util.concurrent.Executors;
1512

1613
@Test // Must be here for TestNG to find and run this, do not remove
1714
public class SyncSubscriberTest extends SubscriberBlackboxVerification<Integer> {
@@ -39,9 +36,8 @@ public SyncSubscriberTest() {
3936
}
4037
};
4138
}
42-
@SuppressWarnings("unchecked")
43-
@Override public Publisher<Integer> createHelperPublisher(long elements) {
44-
if (elements > Integer.MAX_VALUE) return new InfiniteIncrementNumberPublisher(e);
45-
else return new NumberIterablePublisher(0, (int)elements, e);
39+
40+
@Override public Integer createElement(int element) {
41+
return element;
4642
}
4743
}

tck/README.md

+30-10
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,36 @@ Subscriber rules Verification is split up into two files (styles) of tests.
180180

181181
The Blackbox Verification tests do not require the implementation under test to be modified at all, yet they are *not* able to verify most rules. In Whitebox Verification, more control over `request()` calls etc. is required in order to validate rules more precisely.
182182

183+
### Helper Publisher implementations
184+
Since testing a `Subscriber` is not possible without a corresponding `Publisher` the TCK Subscriber Verifications
185+
both provide a default "helper publisher" to drive its test and also alow to replace this Publisher with a custom implementation.
186+
187+
For simple Subscribers which are able to consume elements of *any type*, it is **highly recommmended** to extend the
188+
SubscriberVerification (described below) classes providing the element type `java.lang.Integer`, like so: `... extends SubscriberBlackboxVerification<Integer>`.
189+
The reason for this is, that the TCK contains a default Publisher implementation which is able to signal `Integer` elements,
190+
thus alowing the implementer to strictly focus on only implementing a proper `Subscriber`, instead of having to implement
191+
an additional Publisher only in order to drive the Subscribers tests. This is especially important for library implementers
192+
which only want to implement a Subscriber – and do not want to spend time or thought on implementing a valid Publisher.
193+
194+
If however any SubscriberVerification class is extended using a custom element type, e.g. like this `... extends SubscriberBlackboxVerification<Message>`,
195+
*the TCK will immediatly fail the entire subscriber test class* as it is unable to properly create signals of type `Message`
196+
(which can be some custom message type the `Subscriber` is able to consume). The exception thrown (`UnableToProvidePublisherException`)
197+
contains some information and directs the implementer towards implementing a custom helper publisher,
198+
which is done by overriding the `Publisher<T> createHelperPublisher(long elements)` method:
199+
200+
```java
201+
@Override public Publisher<Message> createHelperPublisher(long elements) {
202+
return new Publisher<Message>() { /* IMPL HERE */ };
203+
}
204+
```
205+
206+
Summing up, we recommend implementing Subscribers which are able to consume any type of element, in this case the TCK
207+
should be driven using `Integer` elements as default publishers are already implemented for this type. If the
208+
`Subscriber` is unable to consume `Integer` elements, the implementer MUST implement a custom `Publisher<T>` that will
209+
be able to signal the required element types. It is of course both possible and recommended to re-use existing
210+
implemenations (which can be seen in the examples sub-project) to create these custom Publishers – an example of
211+
such re-use can be found in [ProvidedHelperPublisherForSubscriberVerificationTest#createStringPublisher](https://github.com/reactive-streams/reactive-streams/blob/master/tck/src/test/java/org/reactivestreams/tck/ProvidedHelperPublisherForSubscriberVerificationTest.java#L215)
212+
183213
### Subscriber Blackbox Verification
184214

185215
Blackbox Verification does not require any additional work except from providing a `Subscriber` and `Publisher` instances to the TCK:
@@ -205,11 +235,6 @@ public class MySubscriberBlackboxVerificationTest extends SubscriberBlackboxVeri
205235
public Subscriber<Integer> createSubscriber() {
206236
return new MySubscriber<Integer>();
207237
}
208-
209-
@Override
210-
public Publisher<Integer> createHelperPublisher(long elements) {
211-
return new MyRangePublisher<Integer>(1, elements);
212-
}
213238
}
214239
```
215240

@@ -285,11 +310,6 @@ public class MySubscriberWhiteboxVerificationTest extends SubscriberWhiteboxVeri
285310
};
286311
}
287312

288-
@Override
289-
public Publisher<Integer> createHelperPublisher(long elements) {
290-
return new MyRangePublisher<Integer>(1, elements);
291-
}
292-
293313
}
294314
```
295315

tck/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,6 @@ description = 'reactive-streams-tck'
22
dependencies {
33
compile group: 'org.testng', name: 'testng', version:'5.14.10'
44
compile project(':reactive-streams')
5+
compile project(':reactive-streams-examples')
56
}
67
test.useTestNG()

tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java

+23-10
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import java.util.HashSet;
1818
import java.util.Set;
1919

20-
public abstract class IdentityProcessorVerification<T> implements SubscriberWhiteboxVerificationRules, PublisherVerificationRules {
20+
public abstract class IdentityProcessorVerification<T> extends WithHelperPublisher<T> implements SubscriberWhiteboxVerificationRules, PublisherVerificationRules {
2121

2222
private final TestEnvironment env;
2323

@@ -65,6 +65,10 @@ public Subscriber<T> createSubscriber(WhiteboxSubscriberProbe<T> probe) {
6565
return IdentityProcessorVerification.this.createSubscriber(probe);
6666
}
6767

68+
@Override public T createElement(int element) {
69+
return IdentityProcessorVerification.this.createElement(element);
70+
}
71+
6872
@Override
6973
public Publisher<T> createHelperPublisher(long elements) {
7074
return IdentityProcessorVerification.this.createHelperPublisher(elements);
@@ -109,15 +113,24 @@ public boolean skipStochasticTests() {
109113
public abstract Processor<T, T> createIdentityProcessor(int bufferSize);
110114

111115
/**
112-
* Helper method required for running the Publisher rules against a Publisher.
113-
* It must create a Publisher for a stream with exactly the given number of elements.
114-
* If {@code elements} is {@code Long.MAX_VALUE} the produced stream must be infinite.
115-
*
116-
* The stream must not produce the same element twice (in case of an infinite stream this requirement
117-
* is relaxed to only apply to the elements that are actually requested during all tests).
118-
*
119-
* @param elements exact number of elements this publisher should emit,
120-
* unless equal to {@code Long.MAX_VALUE} in which case the stream should be effectively infinite
116+
* Helper method required for creating the Publisher to which the tested Subscriber will be subscribed and tested against.
117+
* <p>
118+
* By default an <b>asynchronously signalling Publisher</b> is provided, which will use
119+
* {@link org.reactivestreams.tck.SubscriberBlackboxVerification#createElement(int)} to generate elements type
120+
* your Subscriber is able to consume.
121+
* <p>
122+
* Sometimes you may want to implement your own custom custom helper Publisher - to validate behaviour of a Subscriber
123+
* when facing a synchronous Publisher for example. If you do, it MUST emit the exact number of elements asked for
124+
* (via the {@code elements} parameter) and MUST also must treat the following numbers of elements in these specific ways:
125+
* <ul>
126+
* <li>
127+
* If {@code elements} is {@code Long.MAX_VALUE} the produced stream must be infinite.
128+
* </li>
129+
* <li>
130+
* If {@code elements} is {@code 0} the {@code Publisher} should signal {@code onComplete} immediatly.
131+
* In other words, it should represent a "completed stream".
132+
* </li>
133+
* </ul>
121134
*/
122135
public abstract Publisher<T> createHelperPublisher(long elements);
123136

tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java

+15-15
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,13 @@
99
import org.reactivestreams.tck.support.TestException;
1010
import org.reactivestreams.tck.support.SubscriberBlackboxVerificationRules;
1111
import org.testng.SkipException;
12+
import org.testng.annotations.AfterClass;
13+
import org.testng.annotations.BeforeClass;
1214
import org.testng.annotations.BeforeMethod;
1315
import org.testng.annotations.Test;
1416

17+
import java.util.concurrent.ExecutorService;
18+
import java.util.concurrent.Executors;
1519
import static org.reactivestreams.tck.SubscriberWhiteboxVerification.BlackboxSubscriberProxy;
1620

1721
/**
@@ -25,36 +29,32 @@
2529
* @see org.reactivestreams.Subscriber
2630
* @see org.reactivestreams.Subscription
2731
*/
28-
public abstract class SubscriberBlackboxVerification<T> implements SubscriberBlackboxVerificationRules {
32+
public abstract class SubscriberBlackboxVerification<T> extends WithHelperPublisher<T> implements SubscriberBlackboxVerificationRules {
2933

3034
private final TestEnvironment env;
3135

3236
protected SubscriberBlackboxVerification(TestEnvironment env) {
3337
this.env = env;
3438
}
3539

40+
// USER API
41+
3642
/**
3743
* This is the main method you must implement in your test incarnation.
3844
* It must create a new {@link org.reactivestreams.Subscriber} instance to be subjected to the testing logic.
3945
*/
4046
public abstract Subscriber<T> createSubscriber();
4147

48+
// ENV SETUP
49+
4250
/**
43-
* Helper method required for generating test elements.
44-
* It must create a {@link org.reactivestreams.Publisher} for a stream with exactly the given number of elements.
45-
* <p>
46-
* It also must treat the following numbers of elements in these specific ways:
47-
* <ul>
48-
* <li>
49-
* If {@code elements} is {@code Long.MAX_VALUE} the produced stream must be infinite.
50-
* </li>
51-
* <li>
52-
* If {@code elements} is {@code 0} the {@code Publisher} should signal {@code onComplete} immediatly.
53-
* In other words, it should represent a "completed stream".
54-
* </li>
55-
* </ul>
51+
* Executor service used by the default provided asynchronous Publisher.
52+
* @see #createHelperPublisher(long)
5653
*/
57-
public abstract Publisher<T> createHelperPublisher(long elements);
54+
private ExecutorService publisherExecutor;
55+
@BeforeClass public void startPublisherExecutorService() { publisherExecutor = Executors.newFixedThreadPool(4); }
56+
@AfterClass public void shutdownPublisherExecutorService() { if (publisherExecutor != null) publisherExecutor.shutdown(); }
57+
@Override public ExecutorService publisherExecutorService() { return publisherExecutor; }
5858

5959
////////////////////// TEST ENV CLEANUP /////////////////////////////////////
6060

tck/src/main/java/org/reactivestreams/tck/SubscriberWhiteboxVerification.java

+15-26
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,18 @@
44
import org.reactivestreams.Subscriber;
55
import org.reactivestreams.Subscription;
66
import org.reactivestreams.tck.TestEnvironment.*;
7-
import org.reactivestreams.tck.support.Function;
87
import org.reactivestreams.tck.support.Optional;
98
import org.reactivestreams.tck.support.TestException;
109
import org.reactivestreams.tck.support.SubscriberWhiteboxVerificationRules;
1110
import org.testng.SkipException;
11+
import org.testng.annotations.AfterClass;
12+
import org.testng.annotations.BeforeClass;
1213
import org.testng.annotations.BeforeMethod;
1314
import org.testng.annotations.Test;
1415

16+
import java.util.concurrent.ExecutorService;
17+
import java.util.concurrent.Executors;
18+
1519
import static org.testng.Assert.assertEquals;
1620
import static org.testng.Assert.assertTrue;
1721

@@ -21,14 +25,16 @@
2125
* @see org.reactivestreams.Subscriber
2226
* @see org.reactivestreams.Subscription
2327
*/
24-
public abstract class SubscriberWhiteboxVerification<T> implements SubscriberWhiteboxVerificationRules {
28+
public abstract class SubscriberWhiteboxVerification<T> extends WithHelperPublisher<T> implements SubscriberWhiteboxVerificationRules {
2529

2630
private final TestEnvironment env;
2731

2832
protected SubscriberWhiteboxVerification(TestEnvironment env) {
2933
this.env = env;
3034
}
3135

36+
// USER API
37+
3238
/**
3339
* This is the main method you must implement in your test incarnation.
3440
* It must create a new {@link org.reactivestreams.Subscriber} instance to be subjected to the testing logic.
@@ -38,33 +44,16 @@ protected SubscriberWhiteboxVerification(TestEnvironment env) {
3844
*/
3945
public abstract Subscriber<T> createSubscriber(WhiteboxSubscriberProbe<T> probe);
4046

41-
/**
42-
* Helper method required for generating test elements.
43-
* It must create a {@link org.reactivestreams.Publisher} for a stream with exactly the given number of elements.
44-
* <p>
45-
* It also must treat the following numbers of elements in these specific ways:
46-
* <ul>
47-
* <li>
48-
* If {@code elements} is {@code Long.MAX_VALUE} the produced stream must be infinite.
49-
* </li>
50-
* <li>
51-
* If {@code elements} is {@code 0} the {@code Publisher} should signal {@code onComplete} immediatly.
52-
* In other words, it should represent a "completed stream".
53-
* </li>
54-
* </ul>
55-
*/
56-
public abstract Publisher<T> createHelperPublisher(long elements);
47+
// ENV SETUP
5748

5849
/**
59-
* Used to break possibly infinite wait-loops.
60-
* Some Rules use the "eventually stop signalling" wording, which requires the test to spin accepting {@code onNext}
61-
* signals until no more are signalled. In these tests, this value will be used as upper bound on the number of spin iterations.
62-
*
63-
* Override this method in case your implementation synchronously signals very large batches before reacting to cancellation (for example).
50+
* Executor service used by the default provided asynchronous Publisher.
51+
* @see #createHelperPublisher(long)
6452
*/
65-
public long maxOnNextSignalsInTest() {
66-
return 100;
67-
}
53+
private ExecutorService publisherExecutor;
54+
@BeforeClass public void startPublisherExecutorService() { publisherExecutor = Executors.newFixedThreadPool(4); }
55+
@AfterClass public void shutdownPublisherExecutorService() { if (publisherExecutor != null) publisherExecutor.shutdown(); }
56+
@Override public ExecutorService publisherExecutorService() { return publisherExecutor; }
6857

6958
////////////////////// TEST ENV CLEANUP /////////////////////////////////////
7059

0 commit comments

Comments
 (0)