Skip to content

Commit 54220e8

Browse files
committed
Merge pull request #212 from reactive-streams/wip-202-signal-sequence-√
Attempt to clarify the signalling sequence in the spec
2 parents b56ee39 + 10b6db2 commit 54220e8

File tree

10 files changed

+218
-66
lines changed

10 files changed

+218
-66
lines changed

Diff for: README.md

+6-3
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,12 @@ A *Publisher* is a provider of a potentially unbounded number of sequenced eleme
5757
In response to a call to `Publisher.subscribe(Subscriber)` the possible invocation sequences for methods on the `Subscriber` are given by the following protocol:
5858

5959
```
60-
onError | (onSubscribe onNext* (onError | onComplete)?)
60+
onSubscribe onNext* (onError | onComplete)?
6161
```
6262

63+
This means that `onSubscribe` is always signalled,
64+
followed by a possibly unbounded number of `onNext` signals (as requested by `Subscriber`) followed by an `onError` signal if there is a failure, or an `onComplete` signal when no more elements are available—all as long as the `Subscription` is not cancelled.
65+
6366
#### NOTES
6467

6568
- The specifications below use binding words in capital letters from https://www.ietf.org/rfc/rfc2119.txt
@@ -87,7 +90,7 @@ public interface Publisher<T> {
8790
| <a name="1.6">6</a> | If a `Publisher` signals either `onError` or `onComplete` on a `Subscriber`, that `Subscriber`’s `Subscription` MUST be considered cancelled. |
8891
| <a name="1.7">7</a> | Once a terminal state has been signaled (`onError`, `onComplete`) it is REQUIRED that no further signals occur. |
8992
| <a name="1.8">8</a> | If a `Subscription` is cancelled its `Subscriber` MUST eventually stop being signaled. |
90-
| <a name="1.9">9</a> | Calling `Publisher.subscribe` MUST return normally except when the provided `Subscriber` is `null` in which case it MUST throw a `java.lang.NullPointerException` to the caller, for all other situations the only legal way to signal failure (or reject a `Subscriber`) is via the `onError` method. |
93+
| <a name="1.9">9</a> | `Publisher.subscribe` MUST call `onSubscribe` on the provided `Subscriber` prior to any other signals to that `Subscriber` and MUST return normally, except when the provided `Subscriber` is `null` in which case it MUST throw a `java.lang.NullPointerException` to the caller, for all other situations the only legal way to signal failure (or reject the `Subscriber`) is by calling `onError` (after calling `onSubscribe`). |
9194
| <a name="1.10">10</a> | `Publisher.subscribe` MAY be called as many times as wanted but MUST be with a different `Subscriber` each time [see [2.12](#2.12)]. |
9295
| <a name="1.11">11</a> | A `Publisher` MAY support multiple `Subscriber`s and decides whether each `Subscription` is unicast or multicast. |
9396
| <a name="1.12">12</a> | A `Publisher` MUST produce the same elements, starting with the oldest element still available, in the same sequence for all its subscribers and MAY produce the stream elements at (temporarily) differing rates to different subscribers. |
@@ -147,7 +150,7 @@ public interface Subscription {
147150
| <a name="3.11">11</a> | While the `Subscription` is not cancelled, `Subscription.request(long n)` MAY synchronously call `onComplete` or `onError` on this (or other) subscriber(s). |
148151
| <a name="3.12">12</a> | While the `Subscription` is not cancelled, `Subscription.cancel()` MUST request the `Publisher` to eventually stop signaling its `Subscriber`. The operation is NOT REQUIRED to affect the `Subscription` immediately. |
149152
| <a name="3.13">13</a> | While the `Subscription` is not cancelled, `Subscription.cancel()` MUST request the `Publisher` to eventually drop any references to the corresponding subscriber. Re-subscribing with the same `Subscriber` object is discouraged [see [2.12](#2.12)], but this specification does not mandate that it is disallowed since that would mean having to store previously cancelled subscriptions indefinitely. |
150-
| <a name="3.14">14</a> | While the `Subscription` is not cancelled, calling `Subscription.cancel` MAY cause the `Publisher`, if stateful, to transition into the `shut-down` state if no other `Subscription` exists at this point [see [1.12](#1.12)].
153+
| <a name="3.14">14</a> | While the `Subscription` is not cancelled, calling `Subscription.cancel` MAY cause the `Publisher`, if stateful, to transition into the `shut-down` state if no other `Subscription` exists at this point [see [1.9](#1.9)].
151154
| <a name="3.15">15</a> | Calling `Subscription.cancel` MUST return normally. The only legal way to signal failure to a `Subscriber` is via the `onError` method. |
152155
| <a name="3.16">16</a> | Calling `Subscription.request` MUST return normally. The only legal way to signal failure to a `Subscriber` is via the `onError` method. |
153156
| <a name="3.17">17</a> | A `Subscription` MUST support an unbounded number of calls to request and MUST support a demand (sum requested - sum delivered) up to 2^63-1 (`java.lang.Long.MAX_VALUE`). A demand equal or greater than 2^63-1 (`java.lang.Long.MAX_VALUE`) MAY be considered by the `Publisher` as “effectively unbounded”[[1](#footnote-3-1)]. |

Diff for: examples/src/main/java/org/reactivestreams/example/unicast/AsyncIterablePublisher.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,10 @@ private void doSubscribe() {
106106
if (iterator == null)
107107
iterator = Collections.<T>emptyList().iterator(); // So we can assume that `iterator` is never null
108108
} catch(final Throwable t) {
109+
subscriber.onSubscribe(new Subscription() { // We need to make sure we signal onSubscribe before onError, obeying rule 1.9
110+
@Override public void cancel() {}
111+
@Override public void request(long n) {}
112+
});
109113
terminateDueTo(t); // Here we send onError, obeying rule 1.09
110114
}
111115

@@ -177,7 +181,7 @@ private void terminateDueTo(final Throwable t) {
177181
cancelled = true; // When we signal onError, the subscription must be considered as cancelled, as per rule 1.6
178182
try {
179183
subscriber.onError(t); // Then we signal the error downstream, to the `Subscriber`
180-
} catch(final Throwable t2) { // If `onError` throws an exception, this is a spec violation according to rule 1.12, and all we can do is to log it.
184+
} catch(final Throwable t2) { // If `onError` throws an exception, this is a spec violation according to rule 1.9, and all we can do is to log it.
181185
(new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t2)).printStackTrace(System.err);
182186
}
183187
}

Diff for: examples/src/main/java/org/reactivestreams/example/unicast/AsyncSubscriber.java

+19-8
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,9 @@ private final void handleOnSubscribe(final Subscription s) {
101101

102102
private final void handleOnNext(final T element) {
103103
if (!done) { // If we aren't already done
104-
if(subscription == null) { // Check for spec violation of 2.1
105-
(new IllegalStateException("Someone violated the Reactive Streams rule 2.1 by signalling OnNext before `Subscription.request`. (no Subscription)")).printStackTrace(System.err);
104+
if(subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec
105+
// Check for spec violation of 2.1 and 1.09
106+
(new IllegalStateException("Someone violated the Reactive Streams rule 1.09 and 2.1 by signalling OnNext before `Subscription.request`. (no Subscription)")).printStackTrace(System.err);
106107
} else {
107108
try {
108109
if (whenNext(element)) {
@@ -116,7 +117,7 @@ private final void handleOnNext(final T element) {
116117
done(); // This is legal according to rule 2.6
117118
}
118119
} catch(final Throwable t) {
119-
done();
120+
done();
120121
try {
121122
onError(t);
122123
} catch(final Throwable t2) {
@@ -130,21 +131,32 @@ private final void handleOnNext(final T element) {
130131

131132
// Here it is important that we do not violate 2.2 and 2.3 by calling methods on the `Subscription` or `Publisher`
132133
private void handleOnComplete() {
133-
done = true; // Obey rule 2.4
134-
whenComplete();
134+
if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec
135+
// Publisher is not allowed to signal onComplete before onSubscribe according to rule 1.09
136+
(new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onComplete prior to onSubscribe.")).printStackTrace(System.err);
137+
} else {
138+
done = true; // Obey rule 2.4
139+
whenComplete();
140+
}
135141
}
136142

137143
// Here it is important that we do not violate 2.2 and 2.3 by calling methods on the `Subscription` or `Publisher`
138144
private void handleOnError(final Throwable error) {
139-
done = true; // Obey rule 2.4
140-
whenError(error);
145+
if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec
146+
// Publisher is not allowed to signal onError before onSubscribe according to rule 1.09
147+
(new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onError prior to onSubscribe.")).printStackTrace(System.err);
148+
} else {
149+
done = true; // Obey rule 2.4
150+
whenError(error);
151+
}
141152
}
142153

143154
// We implement the OnX methods on `Subscriber` to send Signals that we will process asycnhronously, but only one at a time
144155

145156
@Override public final void onSubscribe(final Subscription s) {
146157
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Subscription` is `null`
147158
if (s == null) throw null;
159+
148160
signal(new OnSubscribe(s));
149161
}
150162

@@ -180,7 +192,6 @@ private void handleOnError(final Throwable error) {
180192
try {
181193
final Signal s = inboundSignals.poll(); // We take a signal off the queue
182194
if (!done) { // If we're done, we shouldn't process any more signals, obeying rule 2.8
183-
184195
// Below we simply unpack the `Signal`s and invoke the corresponding methods
185196
if (s instanceof OnNext<?>)
186197
handleOnNext(((OnNext<T>)s).next);

Diff for: examples/src/main/java/org/reactivestreams/example/unicast/SyncSubscriber.java

+37-25
Original file line numberDiff line numberDiff line change
@@ -41,28 +41,32 @@ public abstract class SyncSubscriber<T> implements Subscriber<T> {
4141
}
4242

4343
@Override public void onNext(final T element) {
44-
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `element` is `null`
45-
if (element == null) throw null;
44+
if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec
45+
(new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onNext prior to onSubscribe.")).printStackTrace(System.err);
46+
} else {
47+
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `element` is `null`
48+
if (element == null) throw null;
4649

47-
if (!done) { // If we aren't already done
48-
try {
49-
if (foreach(element)) {
50-
try {
51-
subscription.request(1); // Our Subscriber is unbuffered and modest, it requests one element at a time
52-
} catch(final Throwable t) {
53-
// Subscription.request is not allowed to throw according to rule 3.16
54-
(new IllegalStateException(subscription + " violated the Reactive Streams rule 3.16 by throwing an exception from cancel.", t)).printStackTrace(System.err);
50+
if (!done) { // If we aren't already done
51+
try {
52+
if (foreach(element)) {
53+
try {
54+
subscription.request(1); // Our Subscriber is unbuffered and modest, it requests one element at a time
55+
} catch (final Throwable t) {
56+
// Subscription.request is not allowed to throw according to rule 3.16
57+
(new IllegalStateException(subscription + " violated the Reactive Streams rule 3.16 by throwing an exception from cancel.", t)).printStackTrace(System.err);
58+
}
59+
} else {
60+
done();
5561
}
56-
} else {
62+
} catch (final Throwable t) {
5763
done();
58-
}
59-
} catch(final Throwable t) {
60-
done();
61-
try {
62-
onError(t);
63-
} catch(final Throwable t2) {
64-
//Subscriber.onError is not allowed to throw an exception, according to rule 2.13
65-
(new IllegalStateException(this + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t2)).printStackTrace(System.err);
64+
try {
65+
onError(t);
66+
} catch (final Throwable t2) {
67+
//Subscriber.onError is not allowed to throw an exception, according to rule 2.13
68+
(new IllegalStateException(this + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t2)).printStackTrace(System.err);
69+
}
6670
}
6771
}
6872
}
@@ -86,14 +90,22 @@ private void done() {
8690
protected abstract boolean foreach(final T element);
8791

8892
@Override public void onError(final Throwable t) {
89-
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Throwable` is `null`
90-
if (t == null) throw null;
91-
// Here we are not allowed to call any methods on the `Subscription` or the `Publisher`, as per rule 2.3
92-
// And anyway, the `Subscription` is considered to be cancelled if this method gets called, as per rule 2.4
93+
if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec
94+
(new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onError prior to onSubscribe.")).printStackTrace(System.err);
95+
} else {
96+
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Throwable` is `null`
97+
if (t == null) throw null;
98+
// Here we are not allowed to call any methods on the `Subscription` or the `Publisher`, as per rule 2.3
99+
// And anyway, the `Subscription` is considered to be cancelled if this method gets called, as per rule 2.4
100+
}
93101
}
94102

95103
@Override public void onComplete() {
96-
// Here we are not allowed to call any methods on the `Subscription` or the `Publisher`, as per rule 2.3
97-
// And anyway, the `Subscription` is considered to be cancelled if this method gets called, as per rule 2.4
104+
if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec
105+
(new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onComplete prior to onSubscribe.")).printStackTrace(System.err);
106+
} else {
107+
// Here we are not allowed to call any methods on the `Subscription` or the `Publisher`, as per rule 2.3
108+
// And anyway, the `Subscription` is considered to be cancelled if this method gets called, as per rule 2.4
109+
}
98110
}
99111
}

Diff for: examples/src/test/java/org/reactivestreams/example/unicast/IterablePublisherTest.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.reactivestreams.example.unicast;
22

3+
import java.lang.Override;
34
import java.util.Collections;
45
import java.util.Iterator;
56
import org.reactivestreams.Publisher;
@@ -30,7 +31,11 @@ public IterablePublisherTest() {
3031
}
3132

3233
@Override public Publisher<Integer> createErrorStatePublisher() {
33-
return null;
34+
return new AsyncIterablePublisher<Integer>(new Iterable<Integer>() {
35+
@Override public Iterator<Integer> iterator() {
36+
throw new RuntimeException("Error state signal!");
37+
}
38+
}, e);
3439
}
3540

3641
@Override public long maxElementsFromPublisher() {

Diff for: examples/src/test/java/org/reactivestreams/example/unicast/UnboundedIntegerIncrementPublisherTest.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.testng.annotations.AfterClass;
99
import java.util.concurrent.Executors;
1010
import java.util.concurrent.ExecutorService;
11+
import java.util.Iterator;
1112

1213
@Test // Must be here for TestNG to find and run this, do not remove
1314
public class UnboundedIntegerIncrementPublisherTest extends PublisherVerification<Integer> {
@@ -25,7 +26,11 @@ public UnboundedIntegerIncrementPublisherTest() {
2526
}
2627

2728
@Override public Publisher<Integer> createErrorStatePublisher() {
28-
return null;
29+
return new AsyncIterablePublisher<Integer>(new Iterable<Integer>() {
30+
@Override public Iterator<Integer> iterator() {
31+
throw new RuntimeException("Error state signal!");
32+
}
33+
}, e);
2934
}
3035

3136
@Override public long maxElementsFromPublisher() {

Diff for: tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -277,8 +277,13 @@ public void required_spec109_subscribeThrowNPEOnNullSubscriber() throws Throwabl
277277
}
278278

279279
@Override @Test
280-
public void required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe() throws Throwable {
281-
publisherVerification.required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe();
280+
public void required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe() throws Throwable {
281+
publisherVerification.required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe();
282+
}
283+
284+
@Override @Test
285+
public void required_spec109_mustIssueOnSubscribeForNonNullSubscriber() throws Throwable {
286+
publisherVerification.required_spec109_mustIssueOnSubscribeForNonNullSubscriber();
282287
}
283288

284289
@Override @Test

0 commit comments

Comments
 (0)