Skip to content

Commit bca27f5

Browse files
committed
Merge pull request #244 from ktoso/wip-whitebox-better-example
+tck #236 example subscriber whitebox tested, and whitebox fixed
2 parents 954f786 + 99c0cad commit bca27f5

File tree

6 files changed

+163
-90
lines changed

6 files changed

+163
-90
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package org.reactivestreams.example.unicast;
2+
3+
import org.reactivestreams.Subscriber;
4+
import org.reactivestreams.Subscription;
5+
import org.reactivestreams.tck.SubscriberBlackboxVerification;
6+
import org.reactivestreams.tck.SubscriberWhiteboxVerification;
7+
import org.reactivestreams.tck.TestEnvironment;
8+
import org.testng.annotations.AfterClass;
9+
import org.testng.annotations.BeforeClass;
10+
import org.testng.annotations.Test;
11+
12+
import java.util.concurrent.ExecutorService;
13+
import java.util.concurrent.Executors;
14+
15+
@Test // Must be here for TestNG to find and run this, do not remove
16+
public class SyncSubscriberWhiteboxTest extends SubscriberWhiteboxVerification<Integer> {
17+
18+
private ExecutorService e;
19+
@BeforeClass void before() { e = Executors.newFixedThreadPool(4); }
20+
@AfterClass void after() { if (e != null) e.shutdown(); }
21+
22+
public SyncSubscriberWhiteboxTest() {
23+
super(new TestEnvironment());
24+
}
25+
26+
@Override
27+
public Subscriber<Integer> createSubscriber(final WhiteboxSubscriberProbe<Integer> probe) {
28+
return new SyncSubscriber<Integer>() {
29+
@Override
30+
public void onSubscribe(final Subscription s) {
31+
super.onSubscribe(s);
32+
33+
probe.registerOnSubscribe(new SubscriberPuppet() {
34+
@Override
35+
public void triggerRequest(long elements) {
36+
s.request(elements);
37+
}
38+
39+
@Override
40+
public void signalCancel() {
41+
s.cancel();
42+
}
43+
});
44+
}
45+
46+
@Override
47+
public void onNext(Integer element) {
48+
super.onNext(element);
49+
probe.registerOnNext(element);
50+
}
51+
52+
@Override
53+
public void onError(Throwable cause) {
54+
super.onError(cause);
55+
probe.registerOnError(cause);
56+
}
57+
58+
@Override
59+
public void onComplete() {
60+
super.onComplete();
61+
probe.registerOnComplete();
62+
}
63+
64+
@Override
65+
protected boolean foreach(Integer element) {
66+
return true;
67+
}
68+
};
69+
}
70+
71+
@Override public Integer createElement(int element) {
72+
return element;
73+
}
74+
75+
}

tck/README.md

+19-15
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,8 @@ Based on experiences so far implementing the `SubscriberPuppet` is non-trivial a
309309
We keep the whitebox verification, as it is tremendously useful in the `ProcessorVerification`, where the Puppet is implemented within the TCK and injected to the tests.
310310
We do not expect all implementations to make use of the plain `SubscriberWhiteboxVerification`, using the `SubscriberBlackboxVerification` instead.
311311

312-
A simple synchronous `Subscriber` implementation would look similar to following example:
312+
For the simplest possible (and most common) `Subscriber` implementation using the whitebox verification boils down to
313+
exteding (or delegating to) your implementation with additionally signalling and registering the test probe, as shown in the below example:
313314

314315
```java
315316
package com.example.streams;
@@ -326,21 +327,24 @@ public class MySubscriberWhiteboxVerificationTest extends SubscriberWhiteboxVeri
326327
super(new TestEnvironment());
327328
}
328329

330+
// The implementation under test is "SyncSubscriber":
331+
// class SyncSubscriber<T> extends Subscriber<T> { /* ... */ }
332+
329333
@Override
330334
public Subscriber<Integer> createSubscriber(final WhiteboxSubscriberProbe<Integer> probe) {
331-
332-
// return YOUR subscriber under-test, with additional WhiteboxSubscriberProbe instrumentation
333-
return new Subscriber<Integer>() {
334-
335+
// in order to test the SyncSubscriber we must instrument it by extending it,
336+
// and calling the WhiteboxSubscriberProbe in all of the Subscribers methods:
337+
return new SyncSubscriber<Integer>() {
335338
@Override
336339
public void onSubscribe(final Subscription s) {
337-
// in addition to normal Subscriber work that you're testing,
338-
// register a SubscriberPuppet, to give the TCK control over demand generation and cancelling
339-
probe.registerOnSubscribe(new SubscriberPuppet() {
340+
super.onSubscribe(s);
340341

342+
// register a successful subscription, and create a Puppet,
343+
// for the WhiteboxVerification to be able to drive its tests:
344+
probe.registerOnSubscribe(new SubscriberPuppet() {
341345
@Override
342-
public void triggerRequest(long n) {
343-
s.request(n);
346+
public void triggerRequest(long elements) {
347+
s.request(elements);
344348
}
345349

346350
@Override
@@ -351,20 +355,20 @@ public class MySubscriberWhiteboxVerificationTest extends SubscriberWhiteboxVeri
351355
}
352356

353357
@Override
354-
public void onNext(Integer value) {
355-
// in addition to normal Subscriber work that you're testing, register onNext with the probe
356-
probe.registerOnNext(value);
358+
public void onNext(Integer element) {
359+
super.onNext(element);
360+
probe.registerOnNext(element);
357361
}
358362

359363
@Override
360364
public void onError(Throwable cause) {
361-
// in addition to normal Subscriber work that you're testing, register onError with the probe
365+
super.onError(cause);
362366
probe.registerOnError(cause);
363367
}
364368

365369
@Override
366370
public void onComplete() {
367-
// in addition to normal Subscriber work that you're testing, register onComplete with the probe
371+
super.onComplete();
368372
probe.registerOnComplete();
369373
}
370374
};

tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -518,7 +518,7 @@ public void untested_spec204_mustConsiderTheSubscriptionAsCancelledInAfterReciev
518518
}
519519

520520
@Override @Test
521-
public void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Exception {
521+
public void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Throwable {
522522
subscriberVerification.required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal();
523523
}
524524

tck/src/main/java/org/reactivestreams/tck/SubscriberWhiteboxVerification.java

+66-72
Original file line numberDiff line numberDiff line change
@@ -203,31 +203,35 @@ public void untested_spec204_mustConsiderTheSubscriptionAsCancelledInAfterReciev
203203

204204
// Verifies rule: https://github.com/reactive-streams/reactive-streams#2.5
205205
@Override @Test
206-
public void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Exception {
207-
new WhiteboxTestStage(env) {{
208-
// try to subscribe another time, if the subscriber calls `probe.registerOnSubscribe` the test will fail
209-
final Latch secondSubscriptionCancelled = new Latch(env);
210-
sub().onSubscribe(
211-
new Subscription() {
212-
@Override
213-
public void request(long elements) {
214-
env.flop(String.format("Subscriber %s illegally called `subscription.request(%s)`", sub(), elements));
215-
}
206+
public void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Throwable {
207+
subscriberTest(new TestStageTestRun() {
208+
@Override
209+
public void run(WhiteboxTestStage stage) throws Throwable {
210+
// try to subscribe another time, if the subscriber calls `probe.registerOnSubscribe` the test will fail
211+
final Latch secondSubscriptionCancelled = new Latch(env);
212+
final Subscriber<? super T> sub = stage.sub();
213+
final Subscription subscription = new Subscription() {
214+
@Override
215+
public void request(long elements) {
216+
// ignore...
217+
}
216218

217-
@Override
218-
public void cancel() {
219-
secondSubscriptionCancelled.close();
220-
}
219+
@Override
220+
public void cancel() {
221+
secondSubscriptionCancelled.close();
222+
}
221223

222-
@Override
223-
public String toString() {
224-
return "SecondSubscription(should get cancelled)";
225-
}
226-
});
224+
@Override
225+
public String toString() {
226+
return "SecondSubscription(should get cancelled)";
227+
}
228+
};
229+
sub.onSubscribe(subscription);
227230

228-
secondSubscriptionCancelled.expectClose("Expected 2nd Subscription given to subscriber to be cancelled, but `Subscription.cancel()` was not called.");
229-
env.verifyNoAsyncErrors();
230-
}};
231+
secondSubscriptionCancelled.expectClose("Expected 2nd Subscription given to subscriber to be cancelled, but `Subscription.cancel()` was not called");
232+
env.verifyNoAsyncErrors();
233+
}
234+
});
231235
}
232236

233237
// Verifies rule: https://github.com/reactive-streams/reactive-streams#2.6
@@ -348,50 +352,38 @@ public void required_spec213_onSubscribe_mustThrowNullPointerExceptionWhenParame
348352
@Override
349353
public void run(WhiteboxTestStage stage) throws Throwable {
350354

351-
{
352-
final Subscriber<? super T> sub = stage.sub();
353-
boolean gotNPE = false;
354-
try {
355-
sub.onSubscribe(null);
356-
} catch (final NullPointerException expected) {
357-
gotNPE = true;
358-
}
359-
assertTrue(gotNPE, "onSubscribe(null) did not throw NullPointerException");
355+
final Subscriber<? super T> sub = stage.sub();
356+
boolean gotNPE = false;
357+
try {
358+
sub.onSubscribe(null);
359+
} catch (final NullPointerException expected) {
360+
gotNPE = true;
360361
}
362+
assertTrue(gotNPE, "onSubscribe(null) did not throw NullPointerException");
361363

362-
env.verifyNoAsyncErrors();
364+
env.verifyNoAsyncErrors(env.defaultTimeoutMillis());
363365
}
364366
});
365-
}// Verifies rule: https://github.com/reactive-streams/reactive-streams#2.13
367+
}
368+
369+
// Verifies rule: https://github.com/reactive-streams/reactive-streams#2.13
366370
@Override @Test
367371
public void required_spec213_onNext_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
368372
subscriberTest(new TestStageTestRun() {
369373
@Override
370374
public void run(WhiteboxTestStage stage) throws Throwable {
371375

372-
final Subscription subscription = new Subscription() {
373-
@Override
374-
public void request(final long elements) {
375-
}
376-
377-
@Override
378-
public void cancel() {
379-
}
380-
};
381-
382-
{
383-
final Subscriber<? super T> sub = stage.sub();
384-
boolean gotNPE = false;
385-
sub.onSubscribe(subscription);
386-
try {
387-
sub.onNext(null);
388-
} catch (final NullPointerException expected) {
389-
gotNPE = true;
390-
}
376+
final Subscriber<? super T> sub = stage.sub();
377+
boolean gotNPE = false;
378+
try {
379+
sub.onNext(null);
380+
} catch (final NullPointerException expected) {
381+
gotNPE = true;
382+
} finally {
391383
assertTrue(gotNPE, "onNext(null) did not throw NullPointerException");
392384
}
393385

394-
env.verifyNoAsyncErrors();
386+
env.verifyNoAsyncErrors(env.defaultTimeoutMillis());
395387
}
396388
});
397389
}
@@ -403,29 +395,17 @@ public void required_spec213_onError_mustThrowNullPointerExceptionWhenParameters
403395
@Override
404396
public void run(WhiteboxTestStage stage) throws Throwable {
405397

406-
final Subscription subscription = new Subscription() {
407-
@Override
408-
public void request(final long elements) {
409-
}
410-
411-
@Override
412-
public void cancel() {
413-
}
414-
};
415-
416-
{
417398
final Subscriber<? super T> sub = stage.sub();
418399
boolean gotNPE = false;
419-
sub.onSubscribe(subscription);
420400
try {
421401
sub.onError(null);
422402
} catch (final NullPointerException expected) {
423403
gotNPE = true;
404+
} finally {
405+
assertTrue(gotNPE, "onError(null) did not throw NullPointerException");
424406
}
425-
assertTrue(gotNPE, "onError(null) did not throw NullPointerException");
426-
}
427407

428-
env.verifyNoAsyncErrors();
408+
env.verifyNoAsyncErrors(env.defaultTimeoutMillis());
429409
}
430410
});
431411
}
@@ -495,11 +475,24 @@ abstract class TestStageTestRun {
495475
public abstract void run(WhiteboxTestStage stage) throws Throwable;
496476
}
497477

478+
/**
479+
* Prepares subscriber and publisher pair (by subscribing the first to the latter),
480+
* and then hands over the tests {@link WhiteboxTestStage} over to the test.
481+
*
482+
* The test stage is, like in a puppet show, used to orchestrate what each participant should do.
483+
* Since this is a whitebox test, this allows the stage to completely control when and how to signal / expect signals.
484+
*/
498485
public void subscriberTest(TestStageTestRun body) throws Throwable {
499486
WhiteboxTestStage stage = new WhiteboxTestStage(env, true);
500487
body.run(stage);
501488
}
502489

490+
/**
491+
* Provides a {@link WhiteboxTestStage} without performing any additional setup,
492+
* like the {@link org.reactivestreams.tck.SubscriberWhiteboxVerification#subscriberTest(TestStageTestRun)} would.
493+
*
494+
* Use this method to write tests in which you need full control over when and how the initial {@code subscribe} is signalled.
495+
*/
503496
public void subscriberTestWithoutSetup(TestStageTestRun body) throws Throwable {
504497
WhiteboxTestStage stage = new WhiteboxTestStage(env, false);
505498
body.run(stage);
@@ -559,7 +552,10 @@ public WhiteboxSubscriberProbe<T> createWhiteboxSubscriberProbe(TestEnvironment
559552
}
560553

561554
public T signalNext() throws InterruptedException {
562-
T element = nextT();
555+
return signalNext(nextT());
556+
}
557+
558+
private T signalNext(T element) throws InterruptedException {
563559
sendNext(element);
564560
return element;
565561
}
@@ -577,7 +573,7 @@ public void verifyNoAsyncErrors() {
577573
/**
578574
* This class is intented to be used as {@code Subscriber} decorator and should be used in {@code pub.subscriber(...)} calls,
579575
* in order to allow intercepting calls on the underlying {@code Subscriber}.
580-
* This delegation allows the proxy to implement {@link org.reactivestreams.tck.SubscriberWhiteboxVerification.BlackboxProbe} assertions.
576+
* This delegation allows the proxy to implement {@link BlackboxProbe} assertions.
581577
*/
582578
public static class BlackboxSubscriberProxy<T> extends BlackboxProbe<T> implements Subscriber<T> {
583579

@@ -741,8 +737,6 @@ private SubscriberPuppet puppet() {
741737
public void registerOnSubscribe(SubscriberPuppet p) {
742738
if (!puppet.isCompleted()) {
743739
puppet.complete(p);
744-
} else {
745-
env.flop(String.format("Subscriber %s illegally accepted a second Subscription", sub()));
746740
}
747741
}
748742

tck/src/main/java/org/reactivestreams/tck/support/SubscriberWhiteboxVerificationRules.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ public interface SubscriberWhiteboxVerificationRules {
1111
void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete() throws Throwable;
1212
void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnError() throws Throwable;
1313
void untested_spec204_mustConsiderTheSubscriptionAsCancelledInAfterRecievingOnCompleteOrOnError() throws Exception;
14-
void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Exception;
14+
void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Throwable;
1515
void untested_spec206_mustCallSubscriptionCancelIfItIsNoLongerValid() throws Exception;
1616
void untested_spec207_mustEnsureAllCallsOnItsSubscriptionTakePlaceFromTheSameThreadOrTakeCareOfSynchronization() throws Exception;
1717
void required_spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel() throws Throwable;

tck/src/test/java/org/reactivestreams/tck/SubscriberWhiteboxVerificationTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ public Subscriber<Integer> apply(WhiteboxSubscriberProbe<Integer> probe) throws
162162
}
163163
}).required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal();
164164
}
165-
}, "illegally accepted a second Subscription");
165+
}, "Expected 2nd Subscription given to subscriber to be cancelled");
166166
}
167167

168168
@Test

0 commit comments

Comments
 (0)