Skip to content

Commit 2dfe6d5

Browse files
committed
Example synchronous range Publisher
1 parent 306ae92 commit 2dfe6d5

File tree

2 files changed

+218
-0
lines changed

2 files changed

+218
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
/************************************************************************
2+
* Licensed under Public Domain (CC0) *
3+
* *
4+
* To the extent possible under law, the person who associated CC0 with *
5+
* this code has waived all copyright and related or neighboring *
6+
* rights to this code. *
7+
* *
8+
* You should have received a copy of the CC0 legalcode along with this *
9+
* work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.*
10+
************************************************************************/
11+
12+
package org.reactivestreams.example.lazycast;
13+
14+
import org.reactivestreams.*;
15+
16+
import java.util.concurrent.atomic.AtomicLong;
17+
18+
/**
19+
* A synchronous implementation of the {@link Publisher} that can
20+
* be subscribed to multiple times and each individual subscription
21+
* will receive range of monotonically increasing integer values on demand.
22+
*/
23+
public final class RangePublisher implements Publisher<Integer> {
24+
25+
/** The starting value of the range. */
26+
final int start;
27+
28+
/** The number of items to emit. */
29+
final int count;
30+
31+
/**
32+
* Constructs a RangePublisher instance with the given start and count values
33+
* that yields a sequence of [start, start + count).
34+
* @param start the starting value of the range
35+
* @param count the number of items to emit
36+
*/
37+
public RangePublisher(int start, int count) {
38+
this.start = start;
39+
this.count = count;
40+
}
41+
42+
@Override
43+
public void subscribe(Subscriber<? super Integer> s) {
44+
s.onSubscribe(new RangeSubscription(s, start, start + count));
45+
}
46+
47+
/**
48+
* A Subscription implementation that holds the current downstream
49+
* requested amount and responds to the downstream's request() and
50+
* cancel() calls.
51+
*/
52+
static final class RangeSubscription extends AtomicLong implements Subscription {
53+
54+
private static final long serialVersionUID = -9000845542177067735L;
55+
56+
/** The Subscriber we are emitting integer values to. */
57+
final Subscriber<? super Integer> downstream;
58+
59+
/** The end index (exclusive). */
60+
final int end;
61+
62+
/**
63+
* The current index and within the [start, start + count) range that
64+
* will be emitted as downstream.onNext().
65+
*/
66+
int index;
67+
68+
/**
69+
* Indicates the emission should stop.
70+
*/
71+
volatile boolean cancelled;
72+
73+
/**
74+
* Holds onto the IllegalArgumentException (containing the offending stacktrace)
75+
* indicating there was a non-positive request() call from the downstream.
76+
*/
77+
volatile Throwable invalidRequest;
78+
79+
/**
80+
* Constructs a stateful RangeSubscription that emits signals to the given
81+
* downstream from an integer range of [start, end).
82+
* @param downstream the Subscriber receiving the integer values and the completion signal.
83+
* @param start the first integer value emitted, start of the range
84+
* @param end the end of the range, exclusive
85+
*/
86+
RangeSubscription(Subscriber<? super Integer> downstream, int start, int end) {
87+
this.downstream = downstream;
88+
this.index = start;
89+
this.end = end;
90+
}
91+
92+
@Override
93+
public void request(long n) {
94+
// Non-positive requests should be honored with IllegalArgumentException
95+
if (n <= 0L) {
96+
invalidRequest = new IllegalArgumentException("§3.9: non-positive requests are not allowed!");
97+
n = 1;
98+
}
99+
// Downstream requests are cumulative and may come from any thread
100+
for (;;) {
101+
long requested = get();
102+
long update = requested + n;
103+
// cap the amount at Long.MAX_VALUE
104+
if (update < 0L) {
105+
update = Long.MAX_VALUE;
106+
}
107+
// atomically update the current requested amount
108+
if (compareAndSet(requested, update)) {
109+
// if there was no prior request amount, we start the emission loop
110+
if (requested == 0L) {
111+
emit(update);
112+
}
113+
break;
114+
}
115+
}
116+
}
117+
118+
@Override
119+
public void cancel() {
120+
// Indicate to the emission loop it should stop.
121+
cancelled = true;
122+
}
123+
124+
void emit(long currentRequested) {
125+
// Load fields to avoid re-reading them from memory due to volatile accesses in the loop.
126+
Subscriber<? super Integer> downstream = this.downstream;
127+
int index = this.index;
128+
int end = this.end;
129+
int emitted = 0;
130+
131+
for (;;) {
132+
// Check if there was an invalid request and then report it.
133+
Throwable invalidRequest = this.invalidRequest;
134+
if (invalidRequest != null) {
135+
downstream.onError(invalidRequest);
136+
return;
137+
}
138+
139+
// Loop while the index hasn't reached the end and we haven't
140+
// emitted all that's been requested
141+
while (index != end && emitted != currentRequested) {
142+
// We stop if cancellation was requested
143+
if (cancelled) {
144+
return;
145+
}
146+
147+
downstream.onNext(index);
148+
149+
// Increment the index for the next possible emission.
150+
index++;
151+
// Increment the emitted count to prevent overflowing the downstream.
152+
emitted++;
153+
}
154+
155+
// If the index reached the end, we complete the downstream.
156+
if (index == end) {
157+
// Unless cancellation was requested by the last onNext.
158+
if (!cancelled) {
159+
downstream.onComplete();
160+
}
161+
return;
162+
}
163+
164+
// Did the requested amount change while we were looping?
165+
long freshRequested = get();
166+
if (freshRequested == currentRequested) {
167+
// Save where the loop has left off: the next value to be emitted
168+
this.index = index;
169+
// Atomically subtract the previously requested (also emitted) amount
170+
currentRequested = addAndGet(-currentRequested);
171+
// If there was no new request in between get() and addAndGet(), we simply quit
172+
// The next 0 to N transition in request() will trigger the next emission loop.
173+
if (currentRequested == 0L) {
174+
break;
175+
}
176+
// Looks like there were more async requests, reset the emitted count and continue.
177+
emitted = 0;
178+
} else {
179+
// Yes, avoid the atomic subtraction and resume.
180+
// emitted != currentRequest in this case and index
181+
// still points to the next value to be emitted
182+
currentRequested = freshRequested;
183+
}
184+
}
185+
}
186+
}
187+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/************************************************************************
2+
* Licensed under Public Domain (CC0) *
3+
* *
4+
* To the extent possible under law, the person who associated CC0 with *
5+
* this code has waived all copyright and related or neighboring *
6+
* rights to this code. *
7+
* *
8+
* You should have received a copy of the CC0 legalcode along with this *
9+
* work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.*
10+
************************************************************************/
11+
12+
package org.reactivestreams.example.lazycast;
13+
14+
import org.reactivestreams.Publisher;
15+
import org.reactivestreams.tck.*;
16+
17+
public class RangePublisherTest extends PublisherVerification<Integer> {
18+
public RangePublisherTest() {
19+
super(new TestEnvironment(50, 50));
20+
}
21+
22+
@Override
23+
public Publisher<Integer> createPublisher(long elements) {
24+
return new RangePublisher(1, (int)elements);
25+
}
26+
27+
@Override
28+
public Publisher<Integer> createFailedPublisher() {
29+
return null;
30+
}
31+
}

0 commit comments

Comments
 (0)