Skip to content

Commit 7021f18

Browse files
akarnokdviktorklang
authored andcommitted
Example synchronous range Publisher (#395)
* Example synchronous range Publisher * Udpated with rule numbers in comments * Mentioning rule 3.9 again in emit() * Move classes to the unicast package.
1 parent 68e0713 commit 7021f18

File tree

2 files changed

+274
-0
lines changed

2 files changed

+274
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
/************************************************************************
2+
* Licensed under Public Domain (CC0) *
3+
* *
4+
* To the extent possible under law, the person who associated CC0 with *
5+
* this code has waived all copyright and related or neighboring *
6+
* rights to this code. *
7+
* *
8+
* You should have received a copy of the CC0 legalcode along with this *
9+
* work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.*
10+
************************************************************************/
11+
12+
package org.reactivestreams.example.unicast;
13+
14+
import org.reactivestreams.*;
15+
16+
import java.util.concurrent.atomic.AtomicLong;
17+
18+
/**
19+
* A synchronous implementation of the {@link Publisher} that can
20+
* be subscribed to multiple times and each individual subscription
21+
* will receive range of monotonically increasing integer values on demand.
22+
*/
23+
public final class RangePublisher implements Publisher<Integer> {
24+
25+
/** The starting value of the range. */
26+
final int start;
27+
28+
/** The number of items to emit. */
29+
final int count;
30+
31+
/**
32+
* Constructs a RangePublisher instance with the given start and count values
33+
* that yields a sequence of [start, start + count).
34+
* @param start the starting value of the range
35+
* @param count the number of items to emit
36+
*/
37+
public RangePublisher(int start, int count) {
38+
this.start = start;
39+
this.count = count;
40+
}
41+
42+
@Override
43+
public void subscribe(Subscriber<? super Integer> subscriber) {
44+
// As per rule 1.11, we have decided to support multiple subscribers
45+
// in a unicast configuration for this `Publisher` implementation.
46+
47+
// As per rule 1.09, we need to throw a `java.lang.NullPointerException`
48+
// if the `Subscriber` is `null`
49+
if (subscriber == null) throw null;
50+
51+
// As per 2.13, this method must return normally (i.e. not throw).
52+
try {
53+
subscriber.onSubscribe(new RangeSubscription(subscriber, start, start + count));
54+
} catch (Throwable ex) {
55+
new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 " +
56+
"by throwing an exception from onSubscribe.", ex)
57+
// When onSubscribe fails this way, we don't know what state the
58+
// subscriber is thus calling onError may cause more crashes.
59+
.printStackTrace();
60+
}
61+
}
62+
63+
/**
64+
* A Subscription implementation that holds the current downstream
65+
* requested amount and responds to the downstream's request() and
66+
* cancel() calls.
67+
*/
68+
static final class RangeSubscription
69+
// We are using this `AtomicLong` to make sure that this `Subscription`
70+
// doesn't run concurrently with itself, which would violate rule 1.3
71+
// among others (no concurrent notifications).
72+
// The atomic transition from 0L to N > 0L will ensure this.
73+
extends AtomicLong implements Subscription {
74+
75+
private static final long serialVersionUID = -9000845542177067735L;
76+
77+
/** The Subscriber we are emitting integer values to. */
78+
final Subscriber<? super Integer> downstream;
79+
80+
/** The end index (exclusive). */
81+
final int end;
82+
83+
/**
84+
* The current index and within the [start, start + count) range that
85+
* will be emitted as downstream.onNext().
86+
*/
87+
int index;
88+
89+
/**
90+
* Indicates the emission should stop.
91+
*/
92+
volatile boolean cancelled;
93+
94+
/**
95+
* Holds onto the IllegalArgumentException (containing the offending stacktrace)
96+
* indicating there was a non-positive request() call from the downstream.
97+
*/
98+
volatile Throwable invalidRequest;
99+
100+
/**
101+
* Constructs a stateful RangeSubscription that emits signals to the given
102+
* downstream from an integer range of [start, end).
103+
* @param downstream the Subscriber receiving the integer values and the completion signal.
104+
* @param start the first integer value emitted, start of the range
105+
* @param end the end of the range, exclusive
106+
*/
107+
RangeSubscription(Subscriber<? super Integer> downstream, int start, int end) {
108+
this.downstream = downstream;
109+
this.index = start;
110+
this.end = end;
111+
}
112+
113+
// This method will register inbound demand from our `Subscriber` and
114+
// validate it against rule 3.9 and rule 3.17
115+
@Override
116+
public void request(long n) {
117+
// Non-positive requests should be honored with IllegalArgumentException
118+
if (n <= 0L) {
119+
invalidRequest = new IllegalArgumentException("§3.9: non-positive requests are not allowed!");
120+
n = 1;
121+
}
122+
// Downstream requests are cumulative and may come from any thread
123+
for (;;) {
124+
long requested = get();
125+
long update = requested + n;
126+
// As governed by rule 3.17, when demand overflows `Long.MAX_VALUE`
127+
// we treat the signalled demand as "effectively unbounded"
128+
if (update < 0L) {
129+
update = Long.MAX_VALUE;
130+
}
131+
// atomically update the current requested amount
132+
if (compareAndSet(requested, update)) {
133+
// if there was no prior request amount, we start the emission loop
134+
if (requested == 0L) {
135+
emit(update);
136+
}
137+
break;
138+
}
139+
}
140+
}
141+
142+
// This handles cancellation requests, and is idempotent, thread-safe and not
143+
// synchronously performing heavy computations as specified in rule 3.5
144+
@Override
145+
public void cancel() {
146+
// Indicate to the emission loop it should stop.
147+
cancelled = true;
148+
}
149+
150+
void emit(long currentRequested) {
151+
// Load fields to avoid re-reading them from memory due to volatile accesses in the loop.
152+
Subscriber<? super Integer> downstream = this.downstream;
153+
int index = this.index;
154+
int end = this.end;
155+
int emitted = 0;
156+
157+
try {
158+
for (; ; ) {
159+
// Check if there was an invalid request and then report its exception
160+
// as mandated by rule 3.9. The stacktrace in it should
161+
// help locate the faulty logic in the Subscriber.
162+
Throwable invalidRequest = this.invalidRequest;
163+
if (invalidRequest != null) {
164+
// When we signal onError, the subscription must be considered as cancelled, as per rule 1.6
165+
cancelled = true;
166+
167+
downstream.onError(invalidRequest);
168+
return;
169+
}
170+
171+
// Loop while the index hasn't reached the end and we haven't
172+
// emitted all that's been requested
173+
while (index != end && emitted != currentRequested) {
174+
// to make sure that we follow rule 1.8, 3.6 and 3.7
175+
// We stop if cancellation was requested.
176+
if (cancelled) {
177+
return;
178+
}
179+
180+
downstream.onNext(index);
181+
182+
// Increment the index for the next possible emission.
183+
index++;
184+
// Increment the emitted count to prevent overflowing the downstream.
185+
emitted++;
186+
}
187+
188+
// If the index reached the end, we complete the downstream.
189+
if (index == end) {
190+
// to make sure that we follow rule 1.8, 3.6 and 3.7
191+
// Unless cancellation was requested by the last onNext.
192+
if (!cancelled) {
193+
// We need to consider this `Subscription` as cancelled as per rule 1.6
194+
// Note, however, that this state is not observable from the outside
195+
// world and since we leave the loop with requested > 0L, any
196+
// further request() will never trigger the loop.
197+
cancelled = true;
198+
199+
downstream.onComplete();
200+
}
201+
return;
202+
}
203+
204+
// Did the requested amount change while we were looping?
205+
long freshRequested = get();
206+
if (freshRequested == currentRequested) {
207+
// Save where the loop has left off: the next value to be emitted
208+
this.index = index;
209+
// Atomically subtract the previously requested (also emitted) amount
210+
currentRequested = addAndGet(-currentRequested);
211+
// If there was no new request in between get() and addAndGet(), we simply quit
212+
// The next 0 to N transition in request() will trigger the next emission loop.
213+
if (currentRequested == 0L) {
214+
break;
215+
}
216+
// Looks like there were more async requests, reset the emitted count and continue.
217+
emitted = 0;
218+
} else {
219+
// Yes, avoid the atomic subtraction and resume.
220+
// emitted != currentRequest in this case and index
221+
// still points to the next value to be emitted
222+
currentRequested = freshRequested;
223+
}
224+
}
225+
} catch (Throwable ex) {
226+
// We can only get here if `onNext`, `onError` or `onComplete` threw, and they
227+
// are not allowed to according to 2.13, so we can only cancel and log here.
228+
// If `onError` throws an exception, this is a spec violation according to rule 1.9,
229+
// and all we can do is to log it.
230+
231+
// Make sure that we are cancelled, since we cannot do anything else
232+
// since the `Subscriber` is faulty.
233+
cancelled = true;
234+
235+
// We can't report the failure to onError as the Subscriber is unreliable.
236+
(new IllegalStateException(downstream + " violated the Reactive Streams rule 2.13 by " +
237+
"throwing an exception from onNext, onError or onComplete.", ex))
238+
.printStackTrace();
239+
}
240+
}
241+
}
242+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/************************************************************************
2+
* Licensed under Public Domain (CC0) *
3+
* *
4+
* To the extent possible under law, the person who associated CC0 with *
5+
* this code has waived all copyright and related or neighboring *
6+
* rights to this code. *
7+
* *
8+
* You should have received a copy of the CC0 legalcode along with this *
9+
* work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.*
10+
************************************************************************/
11+
12+
package org.reactivestreams.example.unicast;
13+
14+
import org.reactivestreams.Publisher;
15+
import org.reactivestreams.example.unicast.RangePublisher;
16+
import org.reactivestreams.tck.*;
17+
18+
public class RangePublisherTest extends PublisherVerification<Integer> {
19+
public RangePublisherTest() {
20+
super(new TestEnvironment(50, 50));
21+
}
22+
23+
@Override
24+
public Publisher<Integer> createPublisher(long elements) {
25+
return new RangePublisher(1, (int)elements);
26+
}
27+
28+
@Override
29+
public Publisher<Integer> createFailedPublisher() {
30+
return null;
31+
}
32+
}

0 commit comments

Comments
 (0)