Skip to content

Commit a7878e2

Browse files
committed
+tck reactive-streams#236 example subscriber whitebox tested, and whitebox fixed
1 parent b33420a commit a7878e2

File tree

6 files changed

+163
-89
lines changed

6 files changed

+163
-89
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package org.reactivestreams.example.unicast;
2+
3+
import org.reactivestreams.Subscriber;
4+
import org.reactivestreams.Subscription;
5+
import org.reactivestreams.tck.SubscriberBlackboxVerification;
6+
import org.reactivestreams.tck.SubscriberWhiteboxVerification;
7+
import org.reactivestreams.tck.TestEnvironment;
8+
import org.testng.annotations.AfterClass;
9+
import org.testng.annotations.BeforeClass;
10+
import org.testng.annotations.Test;
11+
12+
import java.util.concurrent.ExecutorService;
13+
import java.util.concurrent.Executors;
14+
15+
@Test // Must be here for TestNG to find and run this, do not remove
16+
public class SyncSubscriberWhiteboxTest extends SubscriberWhiteboxVerification<Integer> {
17+
18+
private ExecutorService e;
19+
@BeforeClass void before() { e = Executors.newFixedThreadPool(4); }
20+
@AfterClass void after() { if (e != null) e.shutdown(); }
21+
22+
public SyncSubscriberWhiteboxTest() {
23+
super(new TestEnvironment());
24+
}
25+
26+
@Override
27+
public Subscriber<Integer> createSubscriber(final WhiteboxSubscriberProbe<Integer> probe) {
28+
return new SyncSubscriber<Integer>() {
29+
@Override
30+
public void onSubscribe(final Subscription s) {
31+
super.onSubscribe(s);
32+
33+
probe.registerOnSubscribe(new SubscriberPuppet() {
34+
@Override
35+
public void triggerRequest(long elements) {
36+
s.request(elements);
37+
}
38+
39+
@Override
40+
public void signalCancel() {
41+
s.cancel();
42+
}
43+
});
44+
}
45+
46+
@Override
47+
public void onNext(Integer element) {
48+
super.onNext(element);
49+
probe.registerOnNext(element);
50+
}
51+
52+
@Override
53+
public void onError(Throwable cause) {
54+
super.onError(cause);
55+
probe.registerOnError(cause);
56+
}
57+
58+
@Override
59+
public void onComplete() {
60+
super.onComplete();
61+
probe.registerOnComplete();
62+
}
63+
64+
@Override
65+
protected boolean foreach(Integer element) {
66+
return true;
67+
}
68+
};
69+
}
70+
71+
@Override public Integer createElement(int element) {
72+
return element;
73+
}
74+
75+
}

tck/README.md

+19-15
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,8 @@ Based on experiences so far implementing the `SubscriberPuppet` is non-trivial a
288288
We keep the whitebox verification, as it is tremendously useful in the `ProcessorVerification`, where the Puppet is implemented within the TCK and injected to the tests.
289289
We do not expect all implementations to make use of the plain `SubscriberWhiteboxVerification`, using the `SubscriberBlackboxVerification` instead.
290290

291-
A simple synchronous `Subscriber` implementation would look similar to following example:
291+
For the simplest possible (and most common) `Subscriber` implementation using the whitebox verification boils down to
292+
exteding (or delegating to) your implementation with additionally signalling and registering the test probe, as shown in the below example:
292293

293294
```java
294295
package com.example.streams;
@@ -305,21 +306,24 @@ public class MySubscriberWhiteboxVerificationTest extends SubscriberWhiteboxVeri
305306
super(new TestEnvironment());
306307
}
307308

309+
// The implementation under test is "SyncSubscriber":
310+
// class SyncSubscriber<T> extends Subscriber<T> { /* ... */ }
311+
308312
@Override
309313
public Subscriber<Integer> createSubscriber(final WhiteboxSubscriberProbe<Integer> probe) {
310-
311-
// return YOUR subscriber under-test, with additional WhiteboxSubscriberProbe instrumentation
312-
return new Subscriber<Integer>() {
313-
314+
// in order to test the SyncSubscriber we must instrument it by extending it,
315+
// and calling the WhiteboxSubscriberProbe in all of the Subscribers methods:
316+
return new SyncSubscriber<Integer>() {
314317
@Override
315318
public void onSubscribe(final Subscription s) {
316-
// in addition to normal Subscriber work that you're testing,
317-
// register a SubscriberPuppet, to give the TCK control over demand generation and cancelling
318-
probe.registerOnSubscribe(new SubscriberPuppet() {
319+
super.onSubscribe(s);
319320

321+
// register a successful subscription, and create a Puppet,
322+
// for the WhiteboxVerification to be able to drive its tests:
323+
probe.registerOnSubscribe(new SubscriberPuppet() {
320324
@Override
321-
public void triggerRequest(long n) {
322-
s.request(n);
325+
public void triggerRequest(long elements) {
326+
s.request(elements);
323327
}
324328

325329
@Override
@@ -330,20 +334,20 @@ public class MySubscriberWhiteboxVerificationTest extends SubscriberWhiteboxVeri
330334
}
331335

332336
@Override
333-
public void onNext(Integer value) {
334-
// in addition to normal Subscriber work that you're testing, register onNext with the probe
335-
probe.registerOnNext(value);
337+
public void onNext(Integer element) {
338+
super.onNext(element);
339+
probe.registerOnNext(element);
336340
}
337341

338342
@Override
339343
public void onError(Throwable cause) {
340-
// in addition to normal Subscriber work that you're testing, register onError with the probe
344+
super.onError(cause);
341345
probe.registerOnError(cause);
342346
}
343347

344348
@Override
345349
public void onComplete() {
346-
// in addition to normal Subscriber work that you're testing, register onComplete with the probe
350+
super.onComplete();
347351
probe.registerOnComplete();
348352
}
349353
};

tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -514,7 +514,7 @@ public void untested_spec204_mustConsiderTheSubscriptionAsCancelledInAfterReciev
514514
}
515515

516516
@Override @Test
517-
public void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Exception {
517+
public void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Throwable {
518518
subscriberVerification.required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal();
519519
}
520520

tck/src/main/java/org/reactivestreams/tck/SubscriberWhiteboxVerification.java

+66-71
Original file line numberDiff line numberDiff line change
@@ -203,31 +203,35 @@ public void untested_spec204_mustConsiderTheSubscriptionAsCancelledInAfterReciev
203203

204204
// Verifies rule: https://github.com/reactive-streams/reactive-streams#2.5
205205
@Override @Test
206-
public void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Exception {
207-
new WhiteboxTestStage(env) {{
208-
// try to subscribe another time, if the subscriber calls `probe.registerOnSubscribe` the test will fail
209-
final Latch secondSubscriptionCancelled = new Latch(env);
210-
sub().onSubscribe(
211-
new Subscription() {
212-
@Override
213-
public void request(long elements) {
214-
env.flop(String.format("Subscriber %s illegally called `subscription.request(%s)`", sub(), elements));
215-
}
206+
public void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Throwable {
207+
subscriberTest(new TestStageTestRun() {
208+
@Override
209+
public void run(WhiteboxTestStage stage) throws Throwable {
210+
// try to subscribe another time, if the subscriber calls `probe.registerOnSubscribe` the test will fail
211+
final Latch secondSubscriptionCancelled = new Latch(env);
212+
final Subscriber<? super T> sub = stage.sub();
213+
final Subscription subscription = new Subscription() {
214+
@Override
215+
public void request(long elements) {
216+
// ignore...
217+
}
216218

217-
@Override
218-
public void cancel() {
219-
secondSubscriptionCancelled.close();
220-
}
219+
@Override
220+
public void cancel() {
221+
secondSubscriptionCancelled.close();
222+
}
221223

222-
@Override
223-
public String toString() {
224-
return "SecondSubscription(should get cancelled)";
225-
}
226-
});
224+
@Override
225+
public String toString() {
226+
return "SecondSubscription(should get cancelled)";
227+
}
228+
};
229+
sub.onSubscribe(subscription);
227230

228-
secondSubscriptionCancelled.expectClose("Expected 2nd Subscription given to subscriber to be cancelled, but `Subscription.cancel()` was not called.");
229-
env.verifyNoAsyncErrors();
230-
}};
231+
secondSubscriptionCancelled.expectClose("Expected 2nd Subscription given to subscriber to be cancelled, but `Subscription.cancel()` was not called");
232+
env.verifyNoAsyncErrors();
233+
}
234+
});
231235
}
232236

233237
// Verifies rule: https://github.com/reactive-streams/reactive-streams#2.6
@@ -348,50 +352,38 @@ public void required_spec213_onSubscribe_mustThrowNullPointerExceptionWhenParame
348352
@Override
349353
public void run(WhiteboxTestStage stage) throws Throwable {
350354

351-
{
352-
final Subscriber<? super T> sub = stage.sub();
353-
boolean gotNPE = false;
354-
try {
355-
sub.onSubscribe(null);
356-
} catch (final NullPointerException expected) {
357-
gotNPE = true;
358-
}
359-
assertTrue(gotNPE, "onSubscribe(null) did not throw NullPointerException");
355+
final Subscriber<? super T> sub = stage.sub();
356+
boolean gotNPE = false;
357+
try {
358+
sub.onSubscribe(null);
359+
} catch (final NullPointerException expected) {
360+
gotNPE = true;
360361
}
362+
assertTrue(gotNPE, "onSubscribe(null) did not throw NullPointerException");
361363

362-
env.verifyNoAsyncErrors();
364+
env.verifyNoAsyncErrors(env.defaultTimeoutMillis());
363365
}
364366
});
365-
}// Verifies rule: https://github.com/reactive-streams/reactive-streams#2.13
367+
}
368+
369+
// Verifies rule: https://github.com/reactive-streams/reactive-streams#2.13
366370
@Override @Test
367371
public void required_spec213_onNext_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
368372
subscriberTest(new TestStageTestRun() {
369373
@Override
370374
public void run(WhiteboxTestStage stage) throws Throwable {
371375

372-
final Subscription subscription = new Subscription() {
373-
@Override
374-
public void request(final long elements) {
375-
}
376-
377-
@Override
378-
public void cancel() {
379-
}
380-
};
381-
382-
{
383-
final Subscriber<? super T> sub = stage.sub();
384-
boolean gotNPE = false;
385-
sub.onSubscribe(subscription);
386-
try {
387-
sub.onNext(null);
388-
} catch (final NullPointerException expected) {
389-
gotNPE = true;
390-
}
376+
final Subscriber<? super T> sub = stage.sub();
377+
boolean gotNPE = false;
378+
try {
379+
sub.onNext(null);
380+
} catch (final NullPointerException expected) {
381+
gotNPE = true;
382+
} finally {
391383
assertTrue(gotNPE, "onNext(null) did not throw NullPointerException");
392384
}
393385

394-
env.verifyNoAsyncErrors();
386+
env.verifyNoAsyncErrors(env.defaultTimeoutMillis());
395387
}
396388
});
397389
}
@@ -403,29 +395,17 @@ public void required_spec213_onError_mustThrowNullPointerExceptionWhenParameters
403395
@Override
404396
public void run(WhiteboxTestStage stage) throws Throwable {
405397

406-
final Subscription subscription = new Subscription() {
407-
@Override
408-
public void request(final long elements) {
409-
}
410-
411-
@Override
412-
public void cancel() {
413-
}
414-
};
415-
416-
{
417398
final Subscriber<? super T> sub = stage.sub();
418399
boolean gotNPE = false;
419-
sub.onSubscribe(subscription);
420400
try {
421401
sub.onError(null);
422402
} catch (final NullPointerException expected) {
423403
gotNPE = true;
404+
} finally {
405+
assertTrue(gotNPE, "onError(null) did not throw NullPointerException");
424406
}
425-
assertTrue(gotNPE, "onError(null) did not throw NullPointerException");
426-
}
427407

428-
env.verifyNoAsyncErrors();
408+
env.verifyNoAsyncErrors(env.defaultTimeoutMillis());
429409
}
430410
});
431411
}
@@ -495,11 +475,24 @@ abstract class TestStageTestRun {
495475
public abstract void run(WhiteboxTestStage stage) throws Throwable;
496476
}
497477

478+
/**
479+
* Prepares subscriber and publisher pair (by subscribing the first to the latter),
480+
* and then hands over the tests {@link WhiteboxTestStage} over to the test.
481+
*
482+
* The test stage is, like in a puppet show, used to orchestrate what each participant should do.
483+
* Since this is a whitebox test, this allows the stage to completely control when and how to signal / expect signals.
484+
*/
498485
public void subscriberTest(TestStageTestRun body) throws Throwable {
499486
WhiteboxTestStage stage = new WhiteboxTestStage(env, true);
500487
body.run(stage);
501488
}
502489

490+
/**
491+
* Provides a {@link WhiteboxTestStage} without performing any additional setup,
492+
* like the {@link org.reactivestreams.tck.SubscriberWhiteboxVerification#subscriberTest(TestStageTestRun)} would.
493+
*
494+
* Use this method to write tests in which you need full control over when and how the initial {@code subscribe} is signalled.
495+
*/
503496
public void subscriberTestWithoutSetup(TestStageTestRun body) throws Throwable {
504497
WhiteboxTestStage stage = new WhiteboxTestStage(env, false);
505498
body.run(stage);
@@ -560,6 +553,10 @@ public WhiteboxSubscriberProbe<T> createWhiteboxSubscriberProbe(TestEnvironment
560553

561554
public T signalNext() throws InterruptedException {
562555
T element = nextT();
556+
return signalNext(element);
557+
}
558+
559+
public T signalNext(T element) throws InterruptedException {
563560
sendNext(element);
564561
return element;
565562
}
@@ -577,7 +574,7 @@ public void verifyNoAsyncErrors() {
577574
/**
578575
* This class is intented to be used as {@code Subscriber} decorator and should be used in {@code pub.subscriber(...)} calls,
579576
* in order to allow intercepting calls on the underlying {@code Subscriber}.
580-
* This delegation allows the proxy to implement {@link org.reactivestreams.tck.SubscriberWhiteboxVerification.BlackboxProbe} assertions.
577+
* This delegation allows the proxy to implement {@link BlackboxProbe} assertions.
581578
*/
582579
public static class BlackboxSubscriberProxy<T> extends BlackboxProbe<T> implements Subscriber<T> {
583580

@@ -741,8 +738,6 @@ private SubscriberPuppet puppet() {
741738
public void registerOnSubscribe(SubscriberPuppet p) {
742739
if (!puppet.isCompleted()) {
743740
puppet.complete(p);
744-
} else {
745-
env.flop(String.format("Subscriber %s illegally accepted a second Subscription", sub()));
746741
}
747742
}
748743

tck/src/main/java/org/reactivestreams/tck/support/SubscriberWhiteboxVerificationRules.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ public interface SubscriberWhiteboxVerificationRules {
1111
void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete() throws Throwable;
1212
void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnError() throws Throwable;
1313
void untested_spec204_mustConsiderTheSubscriptionAsCancelledInAfterRecievingOnCompleteOrOnError() throws Exception;
14-
void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Exception;
14+
void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Throwable;
1515
void untested_spec206_mustCallSubscriptionCancelIfItIsNoLongerValid() throws Exception;
1616
void untested_spec207_mustEnsureAllCallsOnItsSubscriptionTakePlaceFromTheSameThreadOrTakeCareOfSynchronization() throws Exception;
1717
void required_spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel() throws Throwable;

tck/src/test/java/org/reactivestreams/tck/SubscriberWhiteboxVerificationTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ public Subscriber<Integer> apply(WhiteboxSubscriberProbe<Integer> probe) throws
162162
}
163163
}).required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal();
164164
}
165-
}, "illegally accepted a second Subscription");
165+
}, "Expected 2nd Subscription given to subscriber to be cancelled");
166166
}
167167

168168
@Test

0 commit comments

Comments
 (0)