@@ -40,16 +40,37 @@ public RangePublisher(int start, int count) {
40
40
}
41
41
42
42
@ Override
43
- public void subscribe (Subscriber <? super Integer > s ) {
44
- s .onSubscribe (new RangeSubscription (s , start , start + count ));
43
+ public void subscribe (Subscriber <? super Integer > subscriber ) {
44
+ // As per rule 1.11, we have decided to support multiple subscribers
45
+ // in a unicast configuration for this `Publisher` implementation.
46
+
47
+ // As per rule 1.09, we need to throw a `java.lang.NullPointerException`
48
+ // if the `Subscriber` is `null`
49
+ if (subscriber == null ) throw null ;
50
+
51
+ // As per 2.13, this method must return normally (i.e. not throw).
52
+ try {
53
+ subscriber .onSubscribe (new RangeSubscription (subscriber , start , start + count ));
54
+ } catch (Throwable ex ) {
55
+ new IllegalStateException (subscriber + " violated the Reactive Streams rule 2.13 " +
56
+ "by throwing an exception from onSubscribe." , ex )
57
+ // When onSubscribe fails this way, we don't know what state the
58
+ // subscriber is thus calling onError may cause more crashes.
59
+ .printStackTrace ();
60
+ }
45
61
}
46
62
47
63
/**
48
64
* A Subscription implementation that holds the current downstream
49
65
* requested amount and responds to the downstream's request() and
50
66
* cancel() calls.
51
67
*/
52
- static final class RangeSubscription extends AtomicLong implements Subscription {
68
+ static final class RangeSubscription
69
+ // We are using this `AtomicLong` to make sure that this `Subscription`
70
+ // doesn't run concurrently with itself, which would violate rule 1.3
71
+ // among others (no concurrent notifications).
72
+ // The atomic transition from 0L to N > 0L will ensure this.
73
+ extends AtomicLong implements Subscription {
53
74
54
75
private static final long serialVersionUID = -9000845542177067735L ;
55
76
@@ -89,6 +110,8 @@ static final class RangeSubscription extends AtomicLong implements Subscription
89
110
this .end = end ;
90
111
}
91
112
113
+ // This method will register inbound demand from our `Subscriber` and
114
+ // validate it against rule 3.9 and rule 3.17
92
115
@ Override
93
116
public void request (long n ) {
94
117
// Non-positive requests should be honored with IllegalArgumentException
@@ -100,7 +123,8 @@ public void request(long n) {
100
123
for (;;) {
101
124
long requested = get ();
102
125
long update = requested + n ;
103
- // cap the amount at Long.MAX_VALUE
126
+ // As governed by rule 3.17, when demand overflows `Long.MAX_VALUE`
127
+ // we treat the signalled demand as "effectively unbounded"
104
128
if (update < 0L ) {
105
129
update = Long .MAX_VALUE ;
106
130
}
@@ -115,6 +139,8 @@ public void request(long n) {
115
139
}
116
140
}
117
141
142
+ // This handles cancellation requests, and is idempotent, thread-safe and not
143
+ // synchronously performing heavy computations as specified in rule 3.5
118
144
@ Override
119
145
public void cancel () {
120
146
// Indicate to the emission loop it should stop.
@@ -128,59 +154,86 @@ void emit(long currentRequested) {
128
154
int end = this .end ;
129
155
int emitted = 0 ;
130
156
131
- for (;;) {
132
- // Check if there was an invalid request and then report it.
133
- Throwable invalidRequest = this . invalidRequest ;
134
- if ( invalidRequest != null ) {
135
- downstream . onError (invalidRequest );
136
- return ;
137
- }
157
+ try {
158
+ for (; ; ) {
159
+ // Check if there was an invalid request and then report it.
160
+ Throwable invalidRequest = this . invalidRequest ;
161
+ if (invalidRequest != null ) {
162
+ // When we signal onError, the subscription must be considered as cancelled, as per rule 1.6
163
+ cancelled = true ;
138
164
139
- // Loop while the index hasn't reached the end and we haven't
140
- // emitted all that's been requested
141
- while (index != end && emitted != currentRequested ) {
142
- // We stop if cancellation was requested
143
- if (cancelled ) {
165
+ downstream .onError (invalidRequest );
144
166
return ;
145
167
}
146
168
147
- downstream .onNext (index );
148
-
149
- // Increment the index for the next possible emission.
150
- index ++;
151
- // Increment the emitted count to prevent overflowing the downstream.
152
- emitted ++;
153
- }
169
+ // Loop while the index hasn't reached the end and we haven't
170
+ // emitted all that's been requested
171
+ while (index != end && emitted != currentRequested ) {
172
+ // to make sure that we follow rule 1.8, 3.6 and 3.7
173
+ // We stop if cancellation was requested.
174
+ if (cancelled ) {
175
+ return ;
176
+ }
177
+
178
+ downstream .onNext (index );
179
+
180
+ // Increment the index for the next possible emission.
181
+ index ++;
182
+ // Increment the emitted count to prevent overflowing the downstream.
183
+ emitted ++;
184
+ }
154
185
155
- // If the index reached the end, we complete the downstream.
156
- if (index == end ) {
157
- // Unless cancellation was requested by the last onNext.
158
- if (!cancelled ) {
159
- downstream .onComplete ();
186
+ // If the index reached the end, we complete the downstream.
187
+ if (index == end ) {
188
+ // to make sure that we follow rule 1.8, 3.6 and 3.7
189
+ // Unless cancellation was requested by the last onNext.
190
+ if (!cancelled ) {
191
+ // We need to consider this `Subscription` as cancelled as per rule 1.6
192
+ // Note, however, that this state is not observable from the outside
193
+ // world and since we leave the loop with requested > 0L, any
194
+ // further request() will never trigger the loop.
195
+ cancelled = true ;
196
+
197
+ downstream .onComplete ();
198
+ }
199
+ return ;
160
200
}
161
- return ;
162
- }
163
201
164
- // Did the requested amount change while we were looping?
165
- long freshRequested = get ();
166
- if (freshRequested == currentRequested ) {
167
- // Save where the loop has left off: the next value to be emitted
168
- this .index = index ;
169
- // Atomically subtract the previously requested (also emitted) amount
170
- currentRequested = addAndGet (-currentRequested );
171
- // If there was no new request in between get() and addAndGet(), we simply quit
172
- // The next 0 to N transition in request() will trigger the next emission loop.
173
- if (currentRequested == 0L ) {
174
- break ;
202
+ // Did the requested amount change while we were looping?
203
+ long freshRequested = get ();
204
+ if (freshRequested == currentRequested ) {
205
+ // Save where the loop has left off: the next value to be emitted
206
+ this .index = index ;
207
+ // Atomically subtract the previously requested (also emitted) amount
208
+ currentRequested = addAndGet (-currentRequested );
209
+ // If there was no new request in between get() and addAndGet(), we simply quit
210
+ // The next 0 to N transition in request() will trigger the next emission loop.
211
+ if (currentRequested == 0L ) {
212
+ break ;
213
+ }
214
+ // Looks like there were more async requests, reset the emitted count and continue.
215
+ emitted = 0 ;
216
+ } else {
217
+ // Yes, avoid the atomic subtraction and resume.
218
+ // emitted != currentRequest in this case and index
219
+ // still points to the next value to be emitted
220
+ currentRequested = freshRequested ;
175
221
}
176
- // Looks like there were more async requests, reset the emitted count and continue.
177
- emitted = 0 ;
178
- } else {
179
- // Yes, avoid the atomic subtraction and resume.
180
- // emitted != currentRequest in this case and index
181
- // still points to the next value to be emitted
182
- currentRequested = freshRequested ;
183
222
}
223
+ } catch (Throwable ex ) {
224
+ // We can only get here if `onNext`, `onError` or `onComplete` threw, and they
225
+ // are not allowed to according to 2.13, so we can only cancel and log here.
226
+ // If `onError` throws an exception, this is a spec violation according to rule 1.9,
227
+ // and all we can do is to log it.
228
+
229
+ // Make sure that we are cancelled, since we cannot do anything else
230
+ // since the `Subscriber` is faulty.
231
+ cancelled = true ;
232
+
233
+ // We can't report the failure to onError as the Subscriber is unreliable.
234
+ (new IllegalStateException (downstream + " violated the Reactive Streams rule 2.13 by " +
235
+ "throwing an exception from onNext, onError or onComplete." , ex ))
236
+ .printStackTrace ();
184
237
}
185
238
}
186
239
}
0 commit comments