Skip to content

Commit 737c1f2

Browse files
committed
Merge pull request #193 from ktoso/provide-publisher-ktoso-rebased
+TCK: Provide helper publisher (rebased)
2 parents 93415d1 + fdf9a78 commit 737c1f2

22 files changed

+569
-189
lines changed

.travis.yml

+4-2
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
language: java
22
script:
3-
- ./gradlew check
3+
- ./gradlew check
44
cache:
55
directories:
66
- $HOME/.gradle
77
jdk:
8-
- openjdk6
8+
- openjdk6
99
env:
1010
global:
1111
- TERM=dumb
12+
- DEFAULT_TIMEOUT_MILLIS=300
13+
- PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS=300

build.gradle

+7
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,13 @@ subprojects {
2525
}
2626
}
2727

28+
tasks.withType(Test) {
29+
testLogging {
30+
exceptionFormat "full"
31+
events "failed", "started"
32+
}
33+
}
34+
2835
repositories {
2936
mavenCentral()
3037
}

examples/src/main/java/org/reactivestreams/example/unicast/AsyncSubscriber.java

-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package org.reactivestreams.example.unicast;
22

3-
import org.reactivestreams.Publisher;
43
import org.reactivestreams.Subscriber;
54
import org.reactivestreams.Subscription;
65

examples/src/main/java/org/reactivestreams/example/unicast/SyncSubscriber.java

-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package org.reactivestreams.example.unicast;
22

3-
import org.reactivestreams.Publisher;
43
import org.reactivestreams.Subscriber;
54
import org.reactivestreams.Subscription;
65

examples/src/test/java/org/reactivestreams/example/unicast/AsyncSubscriberTest.java

+7-11
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package org.reactivestreams.example.unicast;
22

3-
import java.util.Collections;
4-
import java.util.Iterator;
53
import org.reactivestreams.Publisher;
64
import org.reactivestreams.Subscriber;
75
import org.reactivestreams.tck.SubscriberBlackboxVerification;
@@ -19,29 +17,22 @@
1917

2018
@Test // Must be here for TestNG to find and run this, do not remove
2119
public class AsyncSubscriberTest extends SubscriberBlackboxVerification<Integer> {
22-
final static long DefaultTimeoutMillis = 100;
2320

2421
private ExecutorService e;
2522
@BeforeClass void before() { e = Executors.newFixedThreadPool(4); }
2623
@AfterClass void after() { if (e != null) e.shutdown(); }
2724

2825
public AsyncSubscriberTest() {
29-
super(new TestEnvironment(DefaultTimeoutMillis));
26+
super(new TestEnvironment());
3027
}
3128

3229
@Override public Subscriber<Integer> createSubscriber() {
3330
return new AsyncSubscriber<Integer>(e) {
34-
private long acc;
3531
@Override protected boolean whenNext(final Integer element) {
3632
return true;
3733
}
3834
};
3935
}
40-
@SuppressWarnings("unchecked")
41-
@Override public Publisher<Integer> createHelperPublisher(long elements) {
42-
if (elements > Integer.MAX_VALUE) return new InfiniteIncrementNumberPublisher(e);
43-
else return new NumberIterablePublisher(0, (int)elements, e);
44-
}
4536

4637
@Test public void testAccumulation() throws InterruptedException {
4738

@@ -61,7 +52,12 @@ public AsyncSubscriberTest() {
6152
};
6253

6354
new NumberIterablePublisher(0, 10, e).subscribe(sub);
64-
latch.await(DefaultTimeoutMillis * 10, TimeUnit.MILLISECONDS);
55+
latch.await(env.defaultTimeoutMillis() * 10, TimeUnit.MILLISECONDS);
6556
assertEquals(i.get(), 45);
6657
}
58+
59+
@Override public Integer createElement(int element) {
60+
return element;
61+
}
62+
6763
}

examples/src/test/java/org/reactivestreams/example/unicast/IterablePublisherTest.java

+1-4
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,12 @@
1515
@Test // Must be here for TestNG to find and run this, do not remove
1616
public class IterablePublisherTest extends PublisherVerification<Integer> {
1717

18-
final static long DefaultTimeoutMillis = 100;
19-
final static long PublisherReferenceGCTimeoutMillis = 300;
20-
2118
private ExecutorService e;
2219
@BeforeClass void before() { e = Executors.newFixedThreadPool(4); }
2320
@AfterClass void after() { if (e != null) e.shutdown(); }
2421

2522
public IterablePublisherTest() {
26-
super(new TestEnvironment(DefaultTimeoutMillis), PublisherReferenceGCTimeoutMillis);
23+
super(new TestEnvironment());
2724
}
2825

2926
@SuppressWarnings("unchecked")
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,24 @@
11
package org.reactivestreams.example.unicast;
22

3-
import java.util.Collections;
4-
import java.util.Iterator;
5-
import org.reactivestreams.Publisher;
63
import org.reactivestreams.Subscriber;
74
import org.reactivestreams.tck.SubscriberBlackboxVerification;
85
import org.reactivestreams.tck.TestEnvironment;
6+
import org.testng.annotations.AfterClass;
7+
import org.testng.annotations.BeforeClass;
98
import org.testng.annotations.Test;
109

11-
import org.testng.annotations.BeforeClass;
12-
import org.testng.annotations.AfterClass;
13-
import java.util.concurrent.Executors;
1410
import java.util.concurrent.ExecutorService;
11+
import java.util.concurrent.Executors;
1512

1613
@Test // Must be here for TestNG to find and run this, do not remove
1714
public class SyncSubscriberTest extends SubscriberBlackboxVerification<Integer> {
1815

19-
final static long DefaultTimeoutMillis = 100;
20-
2116
private ExecutorService e;
2217
@BeforeClass void before() { e = Executors.newFixedThreadPool(4); }
2318
@AfterClass void after() { if (e != null) e.shutdown(); }
2419

2520
public SyncSubscriberTest() {
26-
super(new TestEnvironment(DefaultTimeoutMillis));
21+
super(new TestEnvironment());
2722
}
2823

2924
@Override public Subscriber<Integer> createSubscriber() {
@@ -39,9 +34,8 @@ public SyncSubscriberTest() {
3934
}
4035
};
4136
}
42-
@SuppressWarnings("unchecked")
43-
@Override public Publisher<Integer> createHelperPublisher(long elements) {
44-
if (elements > Integer.MAX_VALUE) return new InfiniteIncrementNumberPublisher(e);
45-
else return new NumberIterablePublisher(0, (int)elements, e);
37+
38+
@Override public Integer createElement(int element) {
39+
return element;
4640
}
4741
}

examples/src/test/java/org/reactivestreams/example/unicast/UnboundedIntegerIncrementPublisherTest.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,13 @@
1111

1212
@Test // Must be here for TestNG to find and run this, do not remove
1313
public class UnboundedIntegerIncrementPublisherTest extends PublisherVerification<Integer> {
14-
final static long DefaultTimeoutMillis = 200;
15-
final static long PublisherReferenceGCTimeoutMillis = 500;
1614

1715
private ExecutorService e;
1816
@BeforeClass void before() { e = Executors.newFixedThreadPool(4); }
1917
@AfterClass void after() { if (e != null) e.shutdown(); }
2018

2119
public UnboundedIntegerIncrementPublisherTest() {
22-
super(new TestEnvironment(DefaultTimeoutMillis), PublisherReferenceGCTimeoutMillis);
20+
super(new TestEnvironment());
2321
}
2422

2523
@Override public Publisher<Integer> createPublisher(long elements) {

tck/README.md

+107-15
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ Specification rule abides the following naming convention: `spec###_DESC` where:
5151
```
5252

5353
The prefixes of the names of the test methods are used in order to signify the character of the test. For example, these are the kinds of prefixes you may find:
54-
"required_", "optional_", "stochastic_", "untested_".
54+
"`required_`", "`optional_`", "`stochastic_`", "`untested_`".
5555

5656
Explanations:
5757

@@ -132,11 +132,8 @@ import org.reactivestreams.tck.TestEnvironment;
132132

133133
public class RangePublisherTest extends PublisherVerification<Integer> {
134134

135-
public static final long DEFAULT_TIMEOUT_MILLIS = 300L;
136-
public static final long PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS = 1000L;
137-
138135
public RangePublisherTest() {
139-
super(new TestEnvironment(DEFAULT_TIMEOUT_MILLIS), PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS);
136+
super(new TestEnvironment());
140137
}
141138

142139
@Override
@@ -174,12 +171,83 @@ Notable configuration options include:
174171
* `boundedDepthOfOnNextAndRequestRecursion` – which should only be overridden in case of synchronous Publishers. This number will be used to validate if a
175172
`Subscription` actually solves the "unbounded recursion" problem (Rule 3.3).
176173

174+
### Timeout configuration
175+
Publisher tests make use of two kinds of timeouts, one is the `defaultTimeoutMillis` which corresponds to all methods used
176+
within the TCK which await for something to happen. The other timeout is `publisherReferenceGCTimeoutMillis` which is only used in order to verify
177+
[Rule 3.13](https://github.com/reactive-streams/reactive-streams#3.13) which defines that subscriber references MUST be dropped
178+
by the Publisher.
179+
180+
In order to configure these timeouts (for example when running on a slow continious integtation machine), you can either:
181+
182+
**Use env variables** to set these timeouts, in which case the you can just:
183+
184+
```bash
185+
export DEFAULT_TIMEOUT_MILLIS=300
186+
export PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS=500
187+
```
188+
189+
Or **define the timeouts explicitly in code**:
190+
191+
```java
192+
public class RangePublisherTest extends PublisherVerification<Integer> {
193+
194+
public static final long DEFAULT_TIMEOUT_MILLIS = 300L;
195+
public static final long PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS = 500L;
196+
197+
public RangePublisherTest() {
198+
super(new TestEnvironment(DEFAULT_TIMEOUT_MILLIS), PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS);
199+
}
200+
201+
// ...
202+
}
203+
```
204+
205+
Note that explicitly passed in values take precedence over values provided by the environment
206+
177207
## Subscriber Verification
178208

179209
Subscriber rules Verification is split up into two files (styles) of tests.
180210

181211
The Blackbox Verification tests do not require the implementation under test to be modified at all, yet they are *not* able to verify most rules. In Whitebox Verification, more control over `request()` calls etc. is required in order to validate rules more precisely.
182212

213+
### createElement and Helper Publisher implementations
214+
Since testing a `Subscriber` is not possible without a corresponding `Publisher` the TCK Subscriber Verifications
215+
both provide a default "*helper publisher*" to drive its test and also alow to replace this Publisher with a custom implementation.
216+
The helper publisher is an asynchronous publisher by default - meaning that your subscriber can not blindly assume single threaded execution.
217+
218+
While the `Publisher` implementation is provided, creating the signal elements is not – this is because a given Subscriber
219+
may for example only work with `HashedMessage` or some other specific kind of signal. The TCK is unable to generate such
220+
special messages automatically, so we provide the `T createElement(Integer id)` method to be implemented as part of
221+
Subscriber Verifications which should take the given ID and return an element of type `T` (where `T` is the type of
222+
elements flowing into the `Subscriber<T>`, as known thanks to `... extends WhiteboxSubscriberVerification<T>`) representing
223+
an element of the stream that will be passed on to the Subscriber.
224+
225+
The simplest valid implemenation is to return the incoming `id` *as the element* in a verification using `Integer`s as element types:
226+
227+
```java
228+
public class MySubscriberTest extends SubscriberBlackboxVerification<Integer> {
229+
230+
// ...
231+
232+
@Override
233+
public Integer createElement(int element) { return element; }
234+
}
235+
```
236+
237+
238+
The `createElement` method MAY be called from multiple
239+
threads, so in case of more complicated implementations, please be aware of this fact.
240+
241+
**Very Advanced**: While we do not expect many implementations having to do so, it is possible to take full control of the `Publisher`
242+
which will be driving the TCKs test. You can do this by implementing the `createHelperPublisher` method in which you can implement your
243+
own Publisher which will then be used by the TCK to drive your Subscriber tests:
244+
245+
```java
246+
@Override public Publisher<Message> createHelperPublisher(long elements) {
247+
return new Publisher<Message>() { /* IMPL HERE */ };
248+
}
249+
```
250+
183251
### Subscriber Blackbox Verification
184252

185253
Blackbox Verification does not require any additional work except from providing a `Subscriber` and `Publisher` instances to the TCK:
@@ -195,10 +263,8 @@ import org.reactivestreams.tck.TestEnvironment;
195263

196264
public class MySubscriberBlackboxVerificationTest extends SubscriberBlackboxVerification<Integer> {
197265

198-
public static final long DEFAULT_TIMEOUT_MILLIS = 300L;
199-
200266
public MySubscriberBlackboxVerificationTest() {
201-
super(new TestEnvironment(DEFAULT_TIMEOUT_MILLIS));
267+
super(new TestEnvironment());
202268
}
203269

204270
@Override
@@ -207,8 +273,8 @@ public class MySubscriberBlackboxVerificationTest extends SubscriberBlackboxVeri
207273
}
208274

209275
@Override
210-
public Publisher<Integer> createHelperPublisher(long elements) {
211-
return new MyRangePublisher<Integer>(1, elements);
276+
public Integer createElement(int element) {
277+
return element;
212278
}
213279
}
214280
```
@@ -235,10 +301,8 @@ import org.reactivestreams.tck.TestEnvironment;
235301

236302
public class MySubscriberWhiteboxVerificationTest extends SubscriberWhiteboxVerification<Integer> {
237303

238-
public static final long DEFAULT_TIMEOUT_MILLIS = 300L;
239-
240304
public MySubscriberWhiteboxVerificationTest() {
241-
super(new TestEnvironment(DEFAULT_TIMEOUT_MILLIS));
305+
super(new TestEnvironment());
242306
}
243307

244308
@Override
@@ -286,13 +350,41 @@ public class MySubscriberWhiteboxVerificationTest extends SubscriberWhiteboxVeri
286350
}
287351

288352
@Override
289-
public Publisher<Integer> createHelperPublisher(long elements) {
290-
return new MyRangePublisher<Integer>(1, elements);
353+
public Integer createElement(int element) {
354+
return element;
355+
}
356+
357+
}
358+
```
359+
360+
### Timeout configuration
361+
Similarily to `PublisherVerification`, it is possible to set the timeouts used by the TCK to validate subscriber behaviour.
362+
This can be set either by using env variables or hardcoded explicitly.
363+
364+
**Use env variables** to set the timeout value to be used by the TCK:
365+
366+
```bash
367+
export DEFAULT_TIMEOUT_MILLIS=300
368+
```
369+
370+
Or **define the timeout explicitly in code**:
371+
372+
```java
373+
public class MySubscriberTest extends BlackboxSubscriberVerification<Integer> {
374+
375+
public static final long DEFAULT_TIMEOUT_MILLIS = 300L;
376+
377+
public RangePublisherTest() {
378+
super(new MySubscriberTest(DEFAULT_TIMEOUT_MILLIS));
291379
}
292380

381+
// ...
293382
}
294383
```
295384

385+
Note that hard-coded values *take precedence* over environment set values (!).
386+
387+
296388
## Subscription Verification
297389

298390
Please note that while `Subscription` does **not** have it's own test class, it's rules are validated inside of the `Publisher` and `Subscriber` tests – depending if the Rule demands specific action to be taken by the publishing, or subscribing side of the subscription contract.

tck/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,6 @@ description = 'reactive-streams-tck'
22
dependencies {
33
compile group: 'org.testng', name: 'testng', version:'5.14.10'
44
compile project(':reactive-streams')
5+
compile project(':reactive-streams-examples')
56
}
67
test.useTestNG()

0 commit comments

Comments
 (0)