Skip to content

Commit 9592f96

Browse files
committed
+tck 2.9 blackbox verification should fully drive publisher
It should not use the helper publisher as it may signal asynchronously as well as onComplete in undefined ways - we need to control the publisher in this case because of the very specific test scenario
1 parent c3dd8a1 commit 9592f96

File tree

7 files changed

+45
-39
lines changed

7 files changed

+45
-39
lines changed

examples/src/test/java/org/reactivestreams/example/unicast/AsyncSubscriberTest.java

+4
Original file line numberDiff line numberDiff line change
@@ -60,4 +60,8 @@ public AsyncSubscriberTest() {
6060
@Override public Integer createElement(int element) {
6161
return element;
6262
}
63+
64+
@Override public Publisher<Integer> createHelperPublisher(long elements) {
65+
return super.createHelperPublisher(elements);
66+
}
6367
}

tck/README.md

+15-21
Original file line numberDiff line numberDiff line change
@@ -180,34 +180,28 @@ The Blackbox Verification tests do not require the implementation under test to
180180

181181
### Helper Publisher implementations
182182
Since testing a `Subscriber` is not possible without a corresponding `Publisher` the TCK Subscriber Verifications
183-
both provide a default "helper publisher" to drive its test and also alow to replace this Publisher with a custom implementation.
184-
185-
For simple Subscribers which are able to consume elements of *any type*, it is **highly recommmended** to extend the
186-
SubscriberVerification (described below) classes providing the element type `java.lang.Integer`, like so: `... extends SubscriberBlackboxVerification<Integer>`.
187-
The reason for this is, that the TCK contains a default Publisher implementation which is able to signal `Integer` elements,
188-
thus alowing the implementer to strictly focus on only implementing a proper `Subscriber`, instead of having to implement
189-
an additional Publisher only in order to drive the Subscribers tests. This is especially important for library implementers
190-
which only want to implement a Subscriber – and do not want to spend time or thought on implementing a valid Publisher.
191-
192-
If however any SubscriberVerification class is extended using a custom element type, e.g. like this `... extends SubscriberBlackboxVerification<Message>`,
193-
*the TCK will immediatly fail the entire subscriber test class* as it is unable to properly create signals of type `Message`
194-
(which can be some custom message type the `Subscriber` is able to consume). The exception thrown (`UnableToProvidePublisherException`)
195-
contains some information and directs the implementer towards implementing a custom helper publisher,
196-
which is done by overriding the `Publisher<T> createHelperPublisher(long elements)` method:
183+
both provide a default "*helper publisher*" to drive its test and also alow to replace this Publisher with a custom implementation.
184+
The helper publisher is an asynchronous publisher by default - meaning that your subscriber can not blindly assume single threaded execution.
185+
186+
While the `Publisher` implementation is provided, creating the signal elements is not – this is because a given Subscriber
187+
may for example only work with `HashedMessage` or some other specific kind of signal. The TCK is unable to generate such
188+
special messages automatically, so we provide the `T createElement(Integer id)` method to be implemented as part of
189+
Subscriber Verifications which should take the given ID and return an element of type `T` (where `T` is the type of
190+
elements flowing into the `Subscriber<T>`, as known thanks to `... extends WhiteboxSubscriberVerification<T>`) representing
191+
an element of the stream that will be passed on to the Subscriber. The simplest valid implemenation is to return the incoming
192+
`id` *as the element* in a verification using `Integer`s as element types. The `createElement` method MAY be called from multiple
193+
threads, so in case of more complicated implementations, please be aware of this fact.
194+
195+
**Very Advanced**: While we do not expect many implementations having to do so, it is possible to take full control of the `Publisher`
196+
which will be driving the TCKs test. You can do this by implementing the `createHelperPublisher` method in which you can implement your
197+
own Publisher which will then be used by the TCK to drive your Subscriber tests:
197198

198199
```java
199200
@Override public Publisher<Message> createHelperPublisher(long elements) {
200201
return new Publisher<Message>() { /* IMPL HERE */ };
201202
}
202203
```
203204

204-
Summing up, we recommend implementing Subscribers which are able to consume any type of element, in this case the TCK
205-
should be driven using `Integer` elements as default publishers are already implemented for this type. If the
206-
`Subscriber` is unable to consume `Integer` elements, the implementer MUST implement a custom `Publisher<T>` that will
207-
be able to signal the required element types. It is of course both possible and recommended to re-use existing
208-
implemenations (which can be seen in the examples sub-project) to create these custom Publishers – an example of
209-
such re-use can be found in [ProvidedHelperPublisherForSubscriberVerificationTest#createStringPublisher](https://github.com/reactive-streams/reactive-streams/blob/master/tck/src/test/java/org/reactivestreams/tck/ProvidedHelperPublisherForSubscriberVerificationTest.java#L215)
210-
211205
### Subscriber Blackbox Verification
212206

213207
Blackbox Verification does not require any additional work except from providing a `Subscriber` and `Publisher` instances to the TCK:

tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java

+19-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import java.util.concurrent.ExecutorService;
1717
import java.util.concurrent.Executors;
18+
import java.util.concurrent.atomic.AtomicBoolean;
1819

1920
import static org.reactivestreams.tck.Annotations.NotVerified;
2021
import static org.reactivestreams.tck.Annotations.Required;
@@ -224,7 +225,24 @@ public void spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithPreced
224225
blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
225226
@Override
226227
public void run(BlackboxTestStage stage) throws Throwable {
227-
final Publisher<T> pub = createHelperPublisher(0);
228+
final Publisher<T> pub = new Publisher<T>() {
229+
@Override public void subscribe(final Subscriber<? super T> s) {
230+
s.onSubscribe(new Subscription() {
231+
private boolean completed = false;
232+
233+
@Override public void request(long n) {
234+
if (!completed) {
235+
completed = true;
236+
s.onComplete(); // Publisher now realises that it is in fact already completed
237+
}
238+
}
239+
240+
@Override public void cancel() {
241+
// noop, ignore
242+
}
243+
});
244+
}
245+
};
228246

229247
final Subscriber<T> sub = createSubscriber();
230248
final BlackboxSubscriberProxy<T> probe = stage.createBlackboxSubscriberProxy(env, sub);

tck/src/main/java/org/reactivestreams/tck/support/HelperPublisher.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@ public HelperPublisher(final int from, final int to, final Function<Integer, T>
2222
else try {
2323
return create.apply(at++);
2424
} catch (Throwable t) {
25-
throw new HelperPublisherException(
26-
String.format("Failed to create element in %s for id %s!", getClass().getSimpleName(), at - 1), t);
25+
throw new IllegalStateException(String.format("Failed to create element for id %d!", at - 1), t);
2726
}
2827
}
2928
@Override public void remove() { throw new UnsupportedOperationException(); }

tck/src/main/java/org/reactivestreams/tck/support/HelperPublisherException.java

-7
This file was deleted.

tck/src/main/java/org/reactivestreams/tck/support/InfiniteHelperPublisher.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public InfiniteHelperPublisher(final Function<Integer, T> create, final Executor
1818
try {
1919
return create.apply(at++); // Wraps around on overflow
2020
} catch (Throwable t) {
21-
throw new HelperPublisherException(
21+
throw new IllegalStateException(
2222
String.format("Failed to create element in %s for id %s!", getClass().getSimpleName(), at - 1), t);
2323
}
2424
}

tck/src/test/java/org/reactivestreams/tck/IdentityProcessorVerificationTest.java

+5-7
Original file line numberDiff line numberDiff line change
@@ -28,22 +28,20 @@ public class IdentityProcessorVerificationTest extends TCKVerificationSupport {
2828
public void spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError_shouldBeIgnored() throws Throwable {
2929
requireTestSkip(new ThrowingRunnable() {
3030
@Override public void run() throws Throwable {
31-
new IdentityProcessorVerification(newTestEnvironment(), DEFAULT_TIMEOUT_MILLIS){
32-
@Override public Processor createIdentityProcessor(int bufferSize) {
31+
new IdentityProcessorVerification<Integer>(newTestEnvironment(), DEFAULT_TIMEOUT_MILLIS){
32+
@Override public Processor<Integer, Integer> createIdentityProcessor(int bufferSize) {
3333
return new NoopProcessor();
3434
}
3535

3636
@Override public ExecutorService publisherExecutorService() { return ex; }
3737

38-
@Override public Object createElement(int element) {
39-
return null;
40-
}
38+
@Override public Integer createElement(int element) { return element; }
4139

42-
@Override public Publisher createHelperPublisher(long elements) {
40+
@Override public Publisher<Integer> createHelperPublisher(long elements) {
4341
return SKIP;
4442
}
4543

46-
@Override public Publisher createErrorStatePublisher() {
44+
@Override public Publisher<Integer> createErrorStatePublisher() {
4745
return SKIP;
4846
}
4947

0 commit comments

Comments
 (0)