Skip to content

Commit 9f2a05f

Browse files
committed
Fixes #233 by implementing support for triggered demand in in the SubscriberBlackboxVerification
1 parent 0b59ebe commit 9f2a05f

File tree

4 files changed

+259
-0
lines changed

4 files changed

+259
-0
lines changed

Diff for: tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java

+12
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,16 @@ protected SubscriberBlackboxVerification(TestEnvironment env) {
4848
*/
4949
public abstract Subscriber<T> createSubscriber();
5050

51+
/**
52+
* Override this method if the Subscriber implementation you are verifying
53+
* needs an external signal before it signals demand to its Publisher.
54+
*
55+
* By default this method does nothing.
56+
*/
57+
public void triggerRequest(final Subscriber<? super T> subscriber) {
58+
59+
}
60+
5161
// ENV SETUP
5262

5363
/**
@@ -74,6 +84,7 @@ public void required_spec201_blackbox_mustSignalDemandViaSubscriptionRequest() t
7484
blackboxSubscriberTest(new BlackboxTestStageTestRun() {
7585
@Override
7686
public void run(BlackboxTestStage stage) throws InterruptedException {
87+
triggerRequest(stage.subProxy().sub());
7788
final long n = stage.expectRequest();// assuming subscriber wants to consume elements...
7889

7990
// should cope with up to requested number of elements
@@ -248,6 +259,7 @@ public void run(BlackboxTestStage stage) throws Throwable {
248259
final BlackboxSubscriberProxy<T> probe = stage.createBlackboxSubscriberProxy(env, sub);
249260

250261
pub.subscribe(probe);
262+
triggerRequest(sub);
251263
probe.expectCompletion();
252264
probe.expectNone();
253265

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package org.reactivestreams.tck;
2+
3+
import org.reactivestreams.Subscriber;
4+
import org.reactivestreams.Subscription;
5+
import org.reactivestreams.tck.SubscriberBlackboxVerification;
6+
import org.reactivestreams.tck.TestEnvironment;
7+
import org.testng.annotations.AfterClass;
8+
import org.testng.annotations.BeforeClass;
9+
import org.testng.annotations.Test;
10+
11+
import org.reactivestreams.tck.support.SyncTriggeredDemandSubscriber;
12+
13+
import java.util.concurrent.ExecutorService;
14+
import java.util.concurrent.Executors;
15+
16+
@Test // Must be here for TestNG to find and run this, do not remove
17+
public class SyncTriggeredDemandSubscriberTest extends SubscriberBlackboxVerification<Integer> {
18+
19+
private ExecutorService e;
20+
@BeforeClass void before() { e = Executors.newFixedThreadPool(4); }
21+
@AfterClass void after() { if (e != null) e.shutdown(); }
22+
23+
public SyncTriggeredDemandSubscriberTest() {
24+
super(new TestEnvironment());
25+
}
26+
27+
@Override public void triggerRequest(final Subscriber<? super Integer> subscriber) {
28+
((SyncTriggeredDemandSubscriber<? super Integer>)subscriber).triggerDemand(1);
29+
}
30+
31+
@Override public Subscriber<Integer> createSubscriber() {
32+
return new SyncTriggeredDemandSubscriber<Integer>() {
33+
private long acc;
34+
@Override protected long foreach(final Integer element) {
35+
acc += element;
36+
return 1;
37+
}
38+
39+
@Override public void onComplete() {
40+
}
41+
};
42+
}
43+
44+
@Override public Integer createElement(int element) {
45+
return element;
46+
}
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package org.reactivestreams.tck;
2+
3+
import org.reactivestreams.Subscriber;
4+
import org.reactivestreams.Subscription;
5+
import org.reactivestreams.tck.SubscriberBlackboxVerification;
6+
import org.reactivestreams.tck.SubscriberWhiteboxVerification;
7+
import org.reactivestreams.tck.TestEnvironment;
8+
import org.testng.annotations.AfterClass;
9+
import org.testng.annotations.BeforeClass;
10+
import org.testng.annotations.Test;
11+
12+
import org.reactivestreams.tck.support.SyncTriggeredDemandSubscriber;
13+
14+
import java.util.concurrent.ExecutorService;
15+
import java.util.concurrent.Executors;
16+
17+
@Test // Must be here for TestNG to find and run this, do not remove
18+
public class SyncTriggeredDemandSubscriberWhiteboxTest extends SubscriberWhiteboxVerification<Integer> {
19+
20+
private ExecutorService e;
21+
@BeforeClass void before() { e = Executors.newFixedThreadPool(4); }
22+
@AfterClass void after() { if (e != null) e.shutdown(); }
23+
24+
public SyncTriggeredDemandSubscriberWhiteboxTest() {
25+
super(new TestEnvironment());
26+
}
27+
28+
@Override
29+
public Subscriber<Integer> createSubscriber(final WhiteboxSubscriberProbe<Integer> probe) {
30+
return new SyncTriggeredDemandSubscriber<Integer>() {
31+
@Override
32+
public void onSubscribe(final Subscription s) {
33+
super.onSubscribe(s);
34+
35+
probe.registerOnSubscribe(new SubscriberPuppet() {
36+
@Override
37+
public void triggerRequest(long elements) {
38+
s.request(elements);
39+
}
40+
41+
@Override
42+
public void signalCancel() {
43+
s.cancel();
44+
}
45+
});
46+
}
47+
48+
@Override
49+
public void onNext(Integer element) {
50+
super.onNext(element);
51+
probe.registerOnNext(element);
52+
}
53+
54+
@Override
55+
public void onError(Throwable cause) {
56+
super.onError(cause);
57+
probe.registerOnError(cause);
58+
}
59+
60+
@Override
61+
public void onComplete() {
62+
super.onComplete();
63+
probe.registerOnComplete();
64+
}
65+
66+
@Override
67+
protected long foreach(Integer element) {
68+
return 1;
69+
}
70+
};
71+
}
72+
73+
@Override public Integer createElement(int element) {
74+
return element;
75+
}
76+
77+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
package org.reactivestreams.tck.support;
2+
3+
import org.reactivestreams.Subscriber;
4+
import org.reactivestreams.Subscription;
5+
6+
/**
7+
* SyncTriggeredDemandSubscriber is an implementation of Reactive Streams `Subscriber`,
8+
* it runs synchronously (on the Publisher's thread) and requests demand triggered from
9+
* "the outside" using its `triggerDemand` method and from "the inside" using the return
10+
* value of its user-defined `foreach` method which is invoked to process each element.
11+
*
12+
* NOTE: The code below uses a lot of try-catches to show the reader where exceptions can be expected, and where they are forbidden.
13+
*/
14+
public abstract class SyncTriggeredDemandSubscriber<T> implements Subscriber<T> {
15+
private Subscription subscription; // Obeying rule 3.1, we make this private!
16+
private boolean done = false;
17+
18+
@Override public void onSubscribe(final Subscription s) {
19+
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Subscription` is `null`
20+
if (s == null) throw null;
21+
22+
if (subscription != null) { // If someone has made a mistake and added this Subscriber multiple times, let's handle it gracefully
23+
try {
24+
s.cancel(); // Cancel the additional subscription
25+
} catch(final Throwable t) {
26+
//Subscription.cancel is not allowed to throw an exception, according to rule 3.15
27+
(new IllegalStateException(s + " violated the Reactive Streams rule 3.15 by throwing an exception from cancel.", t)).printStackTrace(System.err);
28+
}
29+
} else {
30+
// We have to assign it locally before we use it, if we want to be a synchronous `Subscriber`
31+
// Because according to rule 3.10, the Subscription is allowed to call `onNext` synchronously from within `request`
32+
subscription = s;
33+
}
34+
}
35+
36+
/**
37+
* Requests the provided number of elements from the `Subscription` of this `Subscriber`.
38+
* NOTE: This makes no attempt at thread safety so only invoke it once from the outside to initiate the demand.
39+
* @return `true` if successful and `false` if not (either due to no `Subscription` or due to exceptions thrown)
40+
*/
41+
public boolean triggerDemand(final long n) {
42+
final Subscription s = subscription;
43+
if (s == null) return false;
44+
else {
45+
try {
46+
s.request(n);
47+
} catch(final Throwable t) {
48+
// Subscription.request is not allowed to throw according to rule 3.16
49+
(new IllegalStateException(s + " violated the Reactive Streams rule 3.16 by throwing an exception from request.", t)).printStackTrace(System.err);
50+
return false;
51+
}
52+
return true;
53+
}
54+
}
55+
56+
@Override public void onNext(final T element) {
57+
if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec
58+
(new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onNext prior to onSubscribe.")).printStackTrace(System.err);
59+
} else {
60+
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `element` is `null`
61+
if (element == null) throw null;
62+
63+
if (!done) { // If we aren't already done
64+
try {
65+
final long need = foreach(element);
66+
if (need > 0) triggerDemand(need);
67+
else if (need == 0) {}
68+
else {
69+
done();
70+
}
71+
} catch (final Throwable t) {
72+
done();
73+
try {
74+
onError(t);
75+
} catch (final Throwable t2) {
76+
//Subscriber.onError is not allowed to throw an exception, according to rule 2.13
77+
(new IllegalStateException(this + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t2)).printStackTrace(System.err);
78+
}
79+
}
80+
}
81+
}
82+
}
83+
84+
// Showcases a convenience method to idempotently marking the Subscriber as "done", so we don't want to process more elements
85+
// herefor we also need to cancel our `Subscription`.
86+
private void done() {
87+
//On this line we could add a guard against `!done`, but since rule 3.7 says that `Subscription.cancel()` is idempotent, we don't need to.
88+
done = true; // If we `foreach` throws an exception, let's consider ourselves done (not accepting more elements)
89+
try {
90+
subscription.cancel(); // Cancel the subscription
91+
} catch(final Throwable t) {
92+
//Subscription.cancel is not allowed to throw an exception, according to rule 3.15
93+
(new IllegalStateException(subscription + " violated the Reactive Streams rule 3.15 by throwing an exception from cancel.", t)).printStackTrace(System.err);
94+
}
95+
}
96+
97+
// This method is left as an exercise to the reader/extension point
98+
// Don't forget to call `triggerDemand` at the end if you are interested in more data,
99+
// a return value of < 0 indicates that the subscription should be cancelled,
100+
// a value of 0 indicates that there is no current need,
101+
// a value of > 0 indicates the current need.
102+
protected abstract long foreach(final T element);
103+
104+
@Override public void onError(final Throwable t) {
105+
if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec
106+
(new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onError prior to onSubscribe.")).printStackTrace(System.err);
107+
} else {
108+
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Throwable` is `null`
109+
if (t == null) throw null;
110+
// Here we are not allowed to call any methods on the `Subscription` or the `Publisher`, as per rule 2.3
111+
// And anyway, the `Subscription` is considered to be cancelled if this method gets called, as per rule 2.4
112+
}
113+
}
114+
115+
@Override public void onComplete() {
116+
if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec
117+
(new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onComplete prior to onSubscribe.")).printStackTrace(System.err);
118+
} else {
119+
// Here we are not allowed to call any methods on the `Subscription` or the `Publisher`, as per rule 2.3
120+
// And anyway, the `Subscription` is considered to be cancelled if this method gets called, as per rule 2.4
121+
}
122+
}
123+
}

0 commit comments

Comments
 (0)