Skip to content

Commit 82b342d

Browse files
committed
Merge pull request #185 from reactive-streams/wip-async-subscriber-example-√
Adds an example how to implement an asynchronous Subscriber,
2 parents a33795f + 39165b6 commit 82b342d

File tree

2 files changed

+287
-0
lines changed

2 files changed

+287
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
package org.reactivestreams.example.unicast;
2+
3+
import org.reactivestreams.Publisher;
4+
import org.reactivestreams.Subscriber;
5+
import org.reactivestreams.Subscription;
6+
7+
import java.util.concurrent.Executor;
8+
import java.util.concurrent.atomic.AtomicBoolean;
9+
import java.util.concurrent.ConcurrentLinkedQueue;
10+
11+
/**
12+
* AsyncSubscriber is an implementation of Reactive Streams `Subscriber`,
13+
* it runs asynchronously (on an Executor), requests one element
14+
* at a time, and invokes a user-defined method to process each element.
15+
*
16+
* NOTE: The code below uses a lot of try-catches to show the reader where exceptions can be expected, and where they are forbidden.
17+
*/
18+
public abstract class AsyncSubscriber<T> implements Subscriber<T>, Runnable {
19+
20+
// Signal represents the asynchronous protocol between the Publisher and Subscriber
21+
private static interface Signal {}
22+
23+
private enum OnComplete implements Signal { Instance; }
24+
25+
private static class OnError implements Signal {
26+
public final Throwable error;
27+
public OnError(final Throwable error) { this.error = error; }
28+
}
29+
30+
private static class OnNext<T> implements Signal {
31+
public final T next;
32+
public OnNext(final T next) { this.next = next; }
33+
}
34+
35+
private static class OnSubscribe implements Signal {
36+
public final Subscription subscription;
37+
public OnSubscribe(final Subscription subscription) { this.subscription = subscription; }
38+
}
39+
40+
private Subscription subscription; // Obeying rule 3.1, we make this private!
41+
private boolean done; // It's useful to keep track of whether this Subscriber is done or not
42+
private final Executor executor; // This is the Executor we'll use to be asynchronous, obeying rule 2.2
43+
44+
// Only one constructor, and it's only accessible for the subclasses
45+
protected AsyncSubscriber(Executor executor) {
46+
if (executor == null) throw null;
47+
this.executor = executor;
48+
}
49+
50+
// Showcases a convenience method to idempotently marking the Subscriber as "done", so we don't want to process more elements
51+
// herefor we also need to cancel our `Subscription`.
52+
private final void done() {
53+
//On this line we could add a guard against `!done`, but since rule 3.7 says that `Subscription.cancel()` is idempotent, we don't need to.
54+
done = true; // If we `foreach` throws an exception, let's consider ourselves done (not accepting more elements)
55+
if (subscription != null) { // If we are bailing out before we got a `Subscription` there's little need for cancelling it.
56+
try {
57+
subscription.cancel(); // Cancel the subscription
58+
} catch(final Throwable t) {
59+
//Subscription.cancel is not allowed to throw an exception, according to rule 3.15
60+
(new IllegalStateException(subscription + " violated the Reactive Streams rule 3.15 by throwing an exception from cancel.", t)).printStackTrace(System.err);
61+
}
62+
}
63+
}
64+
65+
// This method is invoked when the OnNext signals arrive
66+
// Returns whether more elements are desired or not, and if no more elements are desired,
67+
// for convenience.
68+
protected abstract boolean whenNext(final T element);
69+
70+
// This method is invoked when the OnComplete signal arrives
71+
// override this method to implement your own custom onComplete logic.
72+
protected void whenComplete() { }
73+
74+
// This method is invoked if the OnError signal arrives
75+
// override this method to implement your own custom onError logic.
76+
protected void whenError(Throwable error) { }
77+
78+
private final void handleOnSubscribe(final Subscription s) {
79+
if (s == null) {
80+
// Getting a null `Subscription` here is not valid so lets just ignore it.
81+
} else if (subscription != null) { // If someone has made a mistake and added this Subscriber multiple times, let's handle it gracefully
82+
try {
83+
s.cancel(); // Cancel the additional subscription to follow rule 2.5
84+
} catch(final Throwable t) {
85+
//Subscription.cancel is not allowed to throw an exception, according to rule 3.15
86+
(new IllegalStateException(s + " violated the Reactive Streams rule 3.15 by throwing an exception from cancel.", t)).printStackTrace(System.err);
87+
}
88+
} else {
89+
// We have to assign it locally before we use it, if we want to be a synchronous `Subscriber`
90+
// Because according to rule 3.10, the Subscription is allowed to call `onNext` synchronously from within `request`
91+
subscription = s;
92+
try {
93+
// If we want elements, according to rule 2.1 we need to call `request`
94+
// And, according to rule 3.2 we are allowed to call this synchronously from within the `onSubscribe` method
95+
s.request(1); // Our Subscriber is unbuffered and modest, it requests one element at a time
96+
} catch(final Throwable t) {
97+
// Subscription.request is not allowed to throw according to rule 3.16
98+
(new IllegalStateException(s + " violated the Reactive Streams rule 3.16 by throwing an exception from cancel.", t)).printStackTrace(System.err);
99+
}
100+
}
101+
}
102+
103+
private final void handleOnNext(final T element) {
104+
if (!done) { // If we aren't already done
105+
if(subscription == null) { // Check for spec violation of 2.1
106+
(new IllegalStateException("Someone violated the Reactive Streams rule 2.1 by signalling OnNext before `Subscription.request`. (no Subscription)")).printStackTrace(System.err);
107+
} else {
108+
try {
109+
if (whenNext(element)) {
110+
try {
111+
subscription.request(1); // Our Subscriber is unbuffered and modest, it requests one element at a time
112+
} catch(final Throwable t) {
113+
// Subscription.request is not allowed to throw according to rule 3.16
114+
(new IllegalStateException(subscription + " violated the Reactive Streams rule 3.16 by throwing an exception from cancel.", t)).printStackTrace(System.err);
115+
}
116+
} else {
117+
done(); // This is legal according to rule 2.6
118+
}
119+
} catch(final Throwable t) {
120+
done();
121+
try {
122+
onError(t);
123+
} catch(final Throwable t2) {
124+
//Subscriber.onError is not allowed to throw an exception, according to rule 2.13
125+
(new IllegalStateException(this + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t2)).printStackTrace(System.err);
126+
}
127+
}
128+
}
129+
}
130+
}
131+
132+
// Here it is important that we do not violate 2.2 and 2.3 by calling methods on the `Subscription` or `Publisher`
133+
private void handleOnComplete() {
134+
done = true; // Obey rule 2.4
135+
whenComplete();
136+
}
137+
138+
// Here it is important that we do not violate 2.2 and 2.3 by calling methods on the `Subscription` or `Publisher`
139+
private void handleOnError(final Throwable error) {
140+
done = true; // Obey rule 2.4
141+
whenError(error);
142+
}
143+
144+
// We implement the OnX methods on `Subscriber` to send Signals that we will process asycnhronously, but only one at a time
145+
146+
@Override public final void onSubscribe(final Subscription s) {
147+
signal(new OnSubscribe(s));
148+
}
149+
150+
@Override public final void onNext(final T element) {
151+
signal(new OnNext<T>(element));
152+
}
153+
154+
@Override public final void onError(final Throwable t) {
155+
signal(new OnError(t));
156+
}
157+
158+
@Override public final void onComplete() {
159+
signal(OnComplete.Instance);
160+
}
161+
162+
// This `ConcurrentLinkedQueue` will track signals that are sent to this `Subscriber`, like `OnComplete` and `OnNext` ,
163+
// and obeying rule 2.11
164+
private final ConcurrentLinkedQueue<Signal> inboundSignals = new ConcurrentLinkedQueue<Signal>();
165+
166+
// We are using this `AtomicBoolean` to make sure that this `Subscriber` doesn't run concurrently with itself,
167+
// obeying rule 2.7 and 2.11
168+
private final AtomicBoolean on = new AtomicBoolean(false);
169+
170+
@SuppressWarnings("unchecked")
171+
@Override public final void run() {
172+
if(on.get()) { // establishes a happens-before relationship with the end of the previous run
173+
try {
174+
final Signal s = inboundSignals.poll(); // We take a signal off the queue
175+
if (!done) { // If we're done, we shouldn't process any more signals, obeying rule 2.8
176+
177+
// Below we simply unpack the `Signal`s and invoke the corresponding methods
178+
if (s instanceof OnNext<?>)
179+
handleOnNext(((OnNext<T>)s).next);
180+
else if (s instanceof OnSubscribe)
181+
handleOnSubscribe(((OnSubscribe)s).subscription);
182+
else if (s instanceof OnError) // We are always able to handle OnError, obeying rule 2.10
183+
handleOnError(((OnError)s).error);
184+
else if (s == OnComplete.Instance) // We are always able to handle OnError, obeying rule 2.9
185+
handleOnComplete();
186+
}
187+
} finally {
188+
on.set(false); // establishes a happens-before relationship with the beginning of the next run
189+
if(!inboundSignals.isEmpty()) // If we still have signals to process
190+
tryScheduleToExecute(); // Then we try to schedule ourselves to execute again
191+
}
192+
}
193+
}
194+
195+
// What `signal` does is that it sends signals to the `Subscription` asynchronously
196+
private void signal(final Signal signal) {
197+
if (inboundSignals.offer(signal)) // No need to null-check here as ConcurrentLinkedQueue does this for us
198+
tryScheduleToExecute(); // Then we try to schedule it for execution, if it isn't already
199+
}
200+
201+
// This method makes sure that this `Subscriber` is only executing on one Thread at a time
202+
private final void tryScheduleToExecute() {
203+
if(on.compareAndSet(false, true)) {
204+
try {
205+
executor.execute(this);
206+
} catch(Throwable t) { // If we can't run on the `Executor`, we need to fail gracefully and not violate rule 2.13
207+
if (!done) {
208+
try {
209+
done(); // First of all, this failure is not recoverable, so we need to cancel our subscription
210+
} finally {
211+
inboundSignals.clear(); // We're not going to need these anymore
212+
// This subscription is cancelled by now, but letting the Subscriber become schedulable again means
213+
// that we can drain the inboundSignals queue if anything arrives after clearing
214+
on.set(false);
215+
}
216+
}
217+
}
218+
}
219+
}
220+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package org.reactivestreams.example.unicast;
2+
3+
import java.util.Collections;
4+
import java.util.Iterator;
5+
import org.reactivestreams.Publisher;
6+
import org.reactivestreams.Subscriber;
7+
import org.reactivestreams.tck.SubscriberBlackboxVerification;
8+
import org.reactivestreams.tck.TestEnvironment;
9+
import org.testng.annotations.Test;
10+
import static org.testng.Assert.assertEquals;
11+
12+
import org.testng.annotations.BeforeClass;
13+
import org.testng.annotations.AfterClass;
14+
import java.util.concurrent.Executors;
15+
import java.util.concurrent.ExecutorService;
16+
import java.util.concurrent.CountDownLatch;
17+
import java.util.concurrent.atomic.AtomicLong;
18+
import java.util.concurrent.TimeUnit;
19+
20+
@Test // Must be here for TestNG to find and run this, do not remove
21+
public class AsyncSubscriberTest extends SubscriberBlackboxVerification<Integer> {
22+
final static long DefaultTimeoutMillis = 100;
23+
24+
private ExecutorService e;
25+
@BeforeClass void before() { e = Executors.newFixedThreadPool(4); }
26+
@AfterClass void after() { if (e != null) e.shutdown(); }
27+
28+
public AsyncSubscriberTest() {
29+
super(new TestEnvironment(DefaultTimeoutMillis));
30+
}
31+
32+
@Override public Subscriber<Integer> createSubscriber() {
33+
return new AsyncSubscriber<Integer>(e) {
34+
private long acc;
35+
@Override protected boolean whenNext(final Integer element) {
36+
return true;
37+
}
38+
};
39+
}
40+
@SuppressWarnings("unchecked")
41+
@Override public Publisher<Integer> createHelperPublisher(long elements) {
42+
if (elements > Integer.MAX_VALUE) return new InfiniteIncrementNumberPublisher(e);
43+
else return new NumberIterablePublisher(0, (int)elements, e);
44+
}
45+
46+
@Test public void testAccumulation() throws InterruptedException {
47+
48+
final AtomicLong i = new AtomicLong(Long.MIN_VALUE);
49+
final CountDownLatch latch = new CountDownLatch(1);
50+
final Subscriber<Integer> sub = new AsyncSubscriber<Integer>(e) {
51+
private long acc;
52+
@Override protected boolean whenNext(final Integer element) {
53+
acc += element;
54+
return true;
55+
}
56+
57+
@Override protected void whenComplete() {
58+
i.set(acc);
59+
latch.countDown();
60+
}
61+
};
62+
63+
new NumberIterablePublisher<Integer>(0, 10, e).subscribe(sub);
64+
latch.await(DefaultTimeoutMillis * 10, TimeUnit.MILLISECONDS);
65+
assertEquals(i.get(), 45);
66+
}
67+
}

0 commit comments

Comments
 (0)