Skip to content

Commit cf715fa

Browse files
committed
+tck 2.9 blackbox verification should fully drive publisher
It should not use the helper publisher as it may signal asynchronously as well as onComplete in undefined ways - we need to control the publisher in this case because of the very specific test scenario
1 parent e3c4e58 commit cf715fa

File tree

7 files changed

+71
-41
lines changed

7 files changed

+71
-41
lines changed

examples/src/test/java/org/reactivestreams/example/unicast/AsyncSubscriberTest.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,14 @@
1717

1818
@Test // Must be here for TestNG to find and run this, do not remove
1919
public class AsyncSubscriberTest extends SubscriberBlackboxVerification<Integer> {
20-
final static long DefaultTimeoutMillis = 100;
20+
final static long DEFAULT_TIMEOUT_MILLIS = 100;
2121

2222
private ExecutorService e;
2323
@BeforeClass void before() { e = Executors.newFixedThreadPool(4); }
2424
@AfterClass void after() { if (e != null) e.shutdown(); }
2525

2626
public AsyncSubscriberTest() {
27-
super(new TestEnvironment(DefaultTimeoutMillis));
27+
super(new TestEnvironment(DEFAULT_TIMEOUT_MILLIS));
2828
}
2929

3030
@Override public Subscriber<Integer> createSubscriber() {
@@ -53,11 +53,15 @@ public AsyncSubscriberTest() {
5353
};
5454

5555
new NumberIterablePublisher(0, 10, e).subscribe(sub);
56-
latch.await(DefaultTimeoutMillis * 10, TimeUnit.MILLISECONDS);
56+
latch.await(DEFAULT_TIMEOUT_MILLIS * 10, TimeUnit.MILLISECONDS);
5757
assertEquals(i.get(), 45);
5858
}
5959

6060
@Override public Integer createElement(int element) {
6161
return element;
6262
}
63+
64+
@Override public Publisher<Integer> createHelperPublisher(long elements) {
65+
return super.createHelperPublisher(elements);
66+
}
6367
}

tck/README.md

+38-20
Original file line numberDiff line numberDiff line change
@@ -178,36 +178,44 @@ Subscriber rules Verification is split up into two files (styles) of tests.
178178

179179
The Blackbox Verification tests do not require the implementation under test to be modified at all, yet they are *not* able to verify most rules. In Whitebox Verification, more control over `request()` calls etc. is required in order to validate rules more precisely.
180180

181-
### Helper Publisher implementations
181+
### createElement and Helper Publisher implementations
182182
Since testing a `Subscriber` is not possible without a corresponding `Publisher` the TCK Subscriber Verifications
183-
both provide a default "helper publisher" to drive its test and also alow to replace this Publisher with a custom implementation.
183+
both provide a default "*helper publisher*" to drive its test and also alow to replace this Publisher with a custom implementation.
184+
The helper publisher is an asynchronous publisher by default - meaning that your subscriber can not blindly assume single threaded execution.
184185

185-
For simple Subscribers which are able to consume elements of *any type*, it is **highly recommmended** to extend the
186-
SubscriberVerification (described below) classes providing the element type `java.lang.Integer`, like so: `... extends SubscriberBlackboxVerification<Integer>`.
187-
The reason for this is, that the TCK contains a default Publisher implementation which is able to signal `Integer` elements,
188-
thus alowing the implementer to strictly focus on only implementing a proper `Subscriber`, instead of having to implement
189-
an additional Publisher only in order to drive the Subscribers tests. This is especially important for library implementers
190-
which only want to implement a Subscriber – and do not want to spend time or thought on implementing a valid Publisher.
186+
While the `Publisher` implementation is provided, creating the signal elements is not – this is because a given Subscriber
187+
may for example only work with `HashedMessage` or some other specific kind of signal. The TCK is unable to generate such
188+
special messages automatically, so we provide the `T createElement(Integer id)` method to be implemented as part of
189+
Subscriber Verifications which should take the given ID and return an element of type `T` (where `T` is the type of
190+
elements flowing into the `Subscriber<T>`, as known thanks to `... extends WhiteboxSubscriberVerification<T>`) representing
191+
an element of the stream that will be passed on to the Subscriber.
191192

192-
If however any SubscriberVerification class is extended using a custom element type, e.g. like this `... extends SubscriberBlackboxVerification<Message>`,
193-
*the TCK will immediatly fail the entire subscriber test class* as it is unable to properly create signals of type `Message`
194-
(which can be some custom message type the `Subscriber` is able to consume). The exception thrown (`UnableToProvidePublisherException`)
195-
contains some information and directs the implementer towards implementing a custom helper publisher,
196-
which is done by overriding the `Publisher<T> createHelperPublisher(long elements)` method:
193+
The simplest valid implemenation is to return the incoming `id` *as the element* in a verification using `Integer`s as element types:
194+
195+
```java
196+
public class MySubscriberTest extends SubscriberBlackboxVerification<Integer> {
197+
198+
// ...
199+
200+
@Override
201+
public Integer createElement(int element) { return element; }
202+
}
203+
```
204+
205+
206+
The `createElement` method MAY be called from multiple
207+
threads, so in case of more complicated implementations, please be aware of this fact.
208+
209+
**Very Advanced**: While we do not expect many implementations having to do so, it is possible to take full control of the `Publisher`
210+
which will be driving the TCKs test. You can do this by implementing the `createHelperPublisher` method in which you can implement your
211+
own Publisher which will then be used by the TCK to drive your Subscriber tests:
197212

198213
```java
199214
@Override public Publisher<Message> createHelperPublisher(long elements) {
200215
return new Publisher<Message>() { /* IMPL HERE */ };
201216
}
202217
```
203218

204-
Summing up, we recommend implementing Subscribers which are able to consume any type of element, in this case the TCK
205-
should be driven using `Integer` elements as default publishers are already implemented for this type. If the
206-
`Subscriber` is unable to consume `Integer` elements, the implementer MUST implement a custom `Publisher<T>` that will
207-
be able to signal the required element types. It is of course both possible and recommended to re-use existing
208-
implemenations (which can be seen in the examples sub-project) to create these custom Publishers – an example of
209-
such re-use can be found in [ProvidedHelperPublisherForSubscriberVerificationTest#createStringPublisher](https://github.com/reactive-streams/reactive-streams/blob/master/tck/src/test/java/org/reactivestreams/tck/ProvidedHelperPublisherForSubscriberVerificationTest.java#L215)
210-
211219
### Subscriber Blackbox Verification
212220

213221
Blackbox Verification does not require any additional work except from providing a `Subscriber` and `Publisher` instances to the TCK:
@@ -233,6 +241,11 @@ public class MySubscriberBlackboxVerificationTest extends SubscriberBlackboxVeri
233241
public Subscriber<Integer> createSubscriber() {
234242
return new MySubscriber<Integer>();
235243
}
244+
245+
@Override
246+
public Integer createElement(int element) {
247+
return element;
248+
}
236249
}
237250
```
238251

@@ -308,6 +321,11 @@ public class MySubscriberWhiteboxVerificationTest extends SubscriberWhiteboxVeri
308321
};
309322
}
310323

324+
@Override
325+
public Integer createElement(int element) {
326+
return element;
327+
}
328+
311329
}
312330
```
313331

tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java

+19-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import java.util.concurrent.ExecutorService;
1717
import java.util.concurrent.Executors;
18+
import java.util.concurrent.atomic.AtomicBoolean;
1819

1920
import static org.reactivestreams.tck.Annotations.NotVerified;
2021
import static org.reactivestreams.tck.Annotations.Required;
@@ -224,7 +225,24 @@ public void spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithPreced
224225
blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
225226
@Override
226227
public void run(BlackboxTestStage stage) throws Throwable {
227-
final Publisher<T> pub = createHelperPublisher(0);
228+
final Publisher<T> pub = new Publisher<T>() {
229+
@Override public void subscribe(final Subscriber<? super T> s) {
230+
s.onSubscribe(new Subscription() {
231+
private boolean completed = false;
232+
233+
@Override public void request(long n) {
234+
if (!completed) {
235+
completed = true;
236+
s.onComplete(); // Publisher now realises that it is in fact already completed
237+
}
238+
}
239+
240+
@Override public void cancel() {
241+
// noop, ignore
242+
}
243+
});
244+
}
245+
};
228246

229247
final Subscriber<T> sub = createSubscriber();
230248
final BlackboxSubscriberProxy<T> probe = stage.createBlackboxSubscriberProxy(env, sub);

tck/src/main/java/org/reactivestreams/tck/support/HelperPublisher.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@ public HelperPublisher(final int from, final int to, final Function<Integer, T>
2222
else try {
2323
return create.apply(at++);
2424
} catch (Throwable t) {
25-
throw new HelperPublisherException(
26-
String.format("Failed to create element in %s for id %s!", getClass().getSimpleName(), at - 1), t);
25+
throw new IllegalStateException(String.format("Failed to create element for id %d!", at - 1), t);
2726
}
2827
}
2928
@Override public void remove() { throw new UnsupportedOperationException(); }

tck/src/main/java/org/reactivestreams/tck/support/HelperPublisherException.java

-7
This file was deleted.

tck/src/main/java/org/reactivestreams/tck/support/InfiniteHelperPublisher.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public InfiniteHelperPublisher(final Function<Integer, T> create, final Executor
1818
try {
1919
return create.apply(at++); // Wraps around on overflow
2020
} catch (Throwable t) {
21-
throw new HelperPublisherException(
21+
throw new IllegalStateException(
2222
String.format("Failed to create element in %s for id %s!", getClass().getSimpleName(), at - 1), t);
2323
}
2424
}

tck/src/test/java/org/reactivestreams/tck/IdentityProcessorVerificationTest.java

+5-7
Original file line numberDiff line numberDiff line change
@@ -28,22 +28,20 @@ public class IdentityProcessorVerificationTest extends TCKVerificationSupport {
2828
public void spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError_shouldBeIgnored() throws Throwable {
2929
requireTestSkip(new ThrowingRunnable() {
3030
@Override public void run() throws Throwable {
31-
new IdentityProcessorVerification(newTestEnvironment(), DEFAULT_TIMEOUT_MILLIS){
32-
@Override public Processor createIdentityProcessor(int bufferSize) {
31+
new IdentityProcessorVerification<Integer>(newTestEnvironment(), DEFAULT_TIMEOUT_MILLIS){
32+
@Override public Processor<Integer, Integer> createIdentityProcessor(int bufferSize) {
3333
return new NoopProcessor();
3434
}
3535

3636
@Override public ExecutorService publisherExecutorService() { return ex; }
3737

38-
@Override public Object createElement(int element) {
39-
return null;
40-
}
38+
@Override public Integer createElement(int element) { return element; }
4139

42-
@Override public Publisher createHelperPublisher(long elements) {
40+
@Override public Publisher<Integer> createHelperPublisher(long elements) {
4341
return SKIP;
4442
}
4543

46-
@Override public Publisher createErrorStatePublisher() {
44+
@Override public Publisher<Integer> createErrorStatePublisher() {
4745
return SKIP;
4846
}
4947

0 commit comments

Comments
 (0)