Skip to content

Commit ec7c3bd

Browse files
author
Stephane Maldini
committed
Amend with most of @ViktorLang suggestions
1 parent 1ae8ba0 commit ec7c3bd

File tree

1 file changed

+69
-73
lines changed

1 file changed

+69
-73
lines changed

README.md

+69-73
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,10 @@ Implementations are free to implement additional features not covered by the spe
4444

4545
The API consists of the following components that are required to be provided by Reactive Stream implementations:
4646

47-
- Publisher
48-
- Subscriber
49-
- Subscription
50-
- Processor
47+
1. Publisher
48+
2. Subscriber
49+
3. Subscription
50+
4. Processor
5151

5252
A *Publisher* is a provider of a potentially unbounded number of sequenced elements, publishing them according to the demand received from its Subscriber(s).
5353

@@ -60,63 +60,60 @@ onError | (onSubscribe onNext* (onError | onComplete)?)
6060

6161
NOTE: The specifications below use binding words in CAPLOCKS from https://www.ietf.org/rfc/rfc2119.txt
6262

63-
#### Subscriber ([Code](https://github.com/reactive-streams/reactive-streams/blob/master/api/src/main/java/org/reactivestreams/Subscriber.java))
63+
#### 1. Publisher ([Code](https://github.com/reactive-streams/reactive-streams/blob/master/api/src/main/java/org/reactivestreams/Publisher.java))
6464

6565
```java
66-
public interface Subscriber<T> {
67-
public void onSubscribe(Subscription s);
68-
public void onNext(T t);
69-
public void onError(Throwable t);
70-
public void onComplete();
66+
public interface Publisher<T> {
67+
public void subscribe(Subscriber<T> s);
7168
}
7269
````
7370

74-
- A `Subscriber` MUST NOT block a `Publisher` thread.
75-
- A `Subscriber` MUST signal demand via `Subscription.request` to receive notifications.
76-
- A `Subscriber` MAY behave synchronously or asynchronously but SHOULD NOT synchronously perform heavy computations in its methods (`onNext`, `onError`, `onComplete`, `onSubscribe`).
77-
- A `Subscriber.onNext(T t)` and `Subscriber.onSubscribe(Subscription s)` MUST NOT call any methods on the `Subscription`, the `Publisher` or any other `Publishers` or `Subscribers`.
78-
- A `Subscriber.onComplete()` and `Subscriber.onError(Throwable t)` MUST NOT call any methods on the `Subscription`, the `Publisher` or any other `Publishers` or `Subscribers`.
79-
- A `Subscriber.onComplete()` and `Subscriber.onError(Throwable t)` MUST consider the Subscription cancelled after having received the event
80-
- A `Subscriber` MUST NOT accept an `onSubscribe` event if it already has an active Subscription. What exactly "not accepting" means is left to the implementation but should include behavior that makes the user aware of the usage error (e.g. by logging, throwing an exception or similar).
81-
- A `Subscriber` MUST call `Subscription.cancel()` during shutdown if it still has an active `Subscription`.
82-
- A `Subscriber` MUST ensure that all calls on a `Subscription` take place from the same thread or provide for respective external synchronization.
83-
- A `Subscriber` MUST be prepared to receive one or more `onNext` events after having called `Subscription.cancel()` if there are still requested elements pending.
84-
- A `Subscriber` MUST be prepared to receive an `onComplete` event with or without a preceding `Subscription.request(int n)` call.
85-
- A `Subscriber` MUST be prepared to receive an `onError` event with or without a preceding `Subscription.request(int n)` call.
86-
- A `Subscriber` MUST make sure that all calls on its `onXXX` methods happen-before the processing of the respective events. I.e. the Subscriber must take care of properly publishing the event to its processing logic.
87-
88-
89-
#### Publisher ([Code](https://github.com/reactive-streams/reactive-streams/blob/master/api/src/main/java/org/reactivestreams/Publisher.java))
71+
1. The number of `onNext` events emitted by a `Publisher` to a `Subscriber` MUST NOT exceed the cumulative demand that has been signaled via that `Subscriber`’s `Subscription`.
72+
2. A `Publisher` MAY send less events than requested and terminate the `Subscription` by calling `onComplete` or `onError`.
73+
3. Events sent to a `Subscriber` MUST be sent sequentially (no concurrent notifications).
74+
4. If a `Publisher` fails it MUST emit an `onError`.
75+
5. If a `Publisher` terminates successfully (finite stream) it MUST emit an `onComplete`.
76+
6. If a Publisher signals either `onError` or `onComplete` on a `Subscriber`, that `Subscriber`’s `Subscription` MUST be considered canceled.
77+
7. Once a terminal state has been signaled (`onError`, `onComplete`) it is REQUIRED that no further events can be sent.
78+
8. Upon receiving a `Subscription.cancel` request it SHOULD, as soon as it can, stop sending events.
79+
9. `Subscription`'s which have been canceled SHOULD NOT receive subsequent `onError` or `onComplete` events, but implementations will not be able to strictly guarantee this in all cases due to the intrinsic race condition between actions taken concurrently by `Publisher` and `Subscriber`.
80+
10. A `Publisher` SHOULD NOT throw an `Exception`. The only legal way to signal failure (or reject a `Subscription`) is via the `Subscriber.onError` method.
81+
11. The `Subscriber.onSubscribe` method on a given `Subscriber` instance MUST NOT be called more than once.
82+
12. The `Publisher.subscribe` method MAY be called as many times as wanted but MUST be with a different (based on object equality) Subscriber each time. It MUST reject the Subscription with a `java.lang.IllegalStateException` if the same Subscriber already has an active `Subscription` with this `Publisher`. The cause message SHOULD include a reference to this rule and/or quote the full rule.
83+
13. A `Publisher` MAY support multi-subscribe and choose whether each `Subscription` is unicast or multicast.
84+
14. A `Publisher` MAY reject calls to its `subscribe` method if it is unable or unwilling to serve them (e.g. because it is overwhelmed or bounded by a finite number of underlying resources, etc...). If rejecting it MUST do this by calling `onError` on the `Subscriber` passed to `Publisher.subscribe` instead of calling `onSubscribe`".
85+
15. A `Publisher` in `completed` state MUST NOT call `Subscriber.onSubscribe` and MUST emit an `Subscriber.onComplete` on the given `Subscriber`
86+
16. A `Publisher` in `error` state MUST NOT call `Subscriber.onSubscribe` and MUST emit an `Subscriber.onError` with the error cause on the given `Subscriber`
87+
17. A `Publisher` in `shut-down` state MUST NOT call `Subscriber.onSubscribe` and MUST emit an `Subscriber.onError` with `java.lang.IllegalStateException` on the given `Subscriber`. The cause message SHOULD include a reference to this rule and/or quote the full rule.
88+
18. A `Publisher` MUST support a pending element count up to 2^63-1 (java.lang.Long.MAX_VALUE) and provide for overflow protection.
89+
19. A `Publisher` MUST produce the same elements in the same sequence for all its subscribers. Producing the stream elements at (temporarily) differing rates to different subscribers is allowed.
90+
20. A `Publisher` MUST start producing with the oldest element still available for a new subscriber.
91+
92+
#### 2. Subscriber ([Code](https://github.com/reactive-streams/reactive-streams/blob/master/api/src/main/java/org/reactivestreams/Subscriber.java))
9093
9194
```java
92-
public interface Publisher<T> {
93-
public void subscribe(Subscriber<T> s);
95+
public interface Subscriber<T> {
96+
public void onSubscribe(Subscription s);
97+
public void onNext(T t);
98+
public void onError(Throwable t);
99+
public void onComplete();
94100
}
95101
````
96102
97-
- The number of `onNext` events emitted by a `Publisher` to a `Subscriber` MUST NOT exceed the cumulative demand that has been signaled via that `Subscriber`’s `Subscription`.
98-
- A `Publisher` MAY send less events than requested and terminate the `Subscription` by calling `onComplete` or `onError`.
99-
- Events sent to a `Subscriber` MUST be sent sequentially (no concurrent notifications).
100-
- If a `Publisher` fails it MUST emit an `onError`.
101-
- If a `Publisher` terminates successfully (finite stream) it MUST emit an `onComplete`.
102-
- If a Publisher signals either `onError` or `onComplete` on a `Subscriber`, that `Subscriber`’s `Subscription` MUST be considered canceled.
103-
- Once a terminal state has been signaled (`onError`, `onComplete`) it is REQUIRED that no further events can be sent.
104-
- Upon receiving a `Subscription.cancel` request it SHOULD stop sending events as soon as it can.
105-
- `Subscription`'s which have been canceled SHOULD NOT receive subsequent `onError` or `onComplete` events, but implementations will not be able to strictly guarantee this in all cases due to the intrinsic race condition between actions taken concurrently by `Publisher` and `Subscriber`.
106-
- A `Publisher` SHOULD NOT throw an `Exception`. The only legal way to signal failure (or reject a `Subscription`) is via the `Subscriber.onError` method.
107-
- The `Subscriber.onSubscribe` method on a given `Subscriber` instance MUST NOT be called more than once.
108-
- The `Publisher.subscribe` method MAY be called as many times as wanted but MUST be with a different `Subscriber` each time, based on object equality. It MUST reject the Subscription with a `java.lang.IllegalStateException` if the same Subscriber already has an active `Subscription` with this `Publisher`.
109-
- A `Publisher` MAY support multi-subscribe and choose whether each `Subscription` is unicast or multicast.
110-
- A `Publisher` MAY reject calls to its `subscribe` method if it is unable or unwilling to serve them (e.g. because it is overwhelmed or bounded by a finite number of underlying resources, etc...). If rejecting it MUST do this by calling `onError` on the `Subscriber` passed to `Publisher.subscribe` instead of calling `onSubscribe`".
111-
- A `Publisher` in `completed` state MUST NOT call `Subscriber.onSubscribe` and MUST emit an `Subscriber.onComplete` on the given `Subscriber`
112-
- A `Publisher` in `error` state MUST NOT call `Subscriber.onSubscribe` and MUST emit an `Subscriber.onError` with the error cause on the given `Subscriber`
113-
- A `Publisher` in `shut-down` state MUST NOT call `Subscriber.onSubscribe` and MUST emit an `Subscriber.onError` with `java.lang.IllegalStateException` on the given `Subscriber`
114-
- A `Publisher` MUST support a pending element count up to 2^63-1 (java.lang.Long.MAX_VALUE) and provide for overflow protection.
115-
- A `Publisher` MUST produce the same elements in the same sequence for all its subscribers. Producing the stream elements at (temporarily) differing rates to different subscribers is allowed.
116-
- A `Publisher` MUST start producing with the oldest element still available for a new subscriber.
117-
118-
119-
#### Subscription ([Code](https://github.com/reactive-streams/reactive-streams/blob/master/api/src/main/java/org/reactivestreams/Subscription.java))
103+
1. A `Subscriber` MUST NOT block a `Publisher` thread.
104+
2. A `Subscriber` MUST signal demand via `Subscription.request(int n)` to receive onNext notifications.
105+
3. A `Subscriber` MAY behave synchronously or asynchronously but SHOULD NOT synchronously perform heavy computations in its methods (`onNext`, `onError`, `onComplete`, `onSubscribe`).
106+
5. A `Subscriber.onComplete()` and `Subscriber.onError(Throwable t)` MUST NOT call any methods on the `Subscription`, the `Publisher` or any other `Publishers` or `Subscribers`.
107+
6. A `Subscriber.onComplete()` and `Subscriber.onError(Throwable t)` MUST consider the Subscription cancelled after having received the event
108+
7. A `Subscriber` MUST NOT accept an `onSubscribe` event if it already has an active Subscription. What exactly "not accepting" means is left to the implementation but should include behavior that makes the user aware of the usage error (e.g. by logging, throwing an exception or similar).
109+
8. A `Subscriber` MUST call `Subscription.cancel()` if it is no longer valid to the `Publisher` without the `Publisher` having signalled `onError` or `onComplete`.
110+
9. A `Subscriber` MUST ensure that all calls on its `Subscription` take place from the same thread or provide for respective external synchronization.
111+
10. A `Subscriber` MUST be prepared to receive one or more `onNext` events after having called `Subscription.cancel()` if there are still requested elements pending.
112+
11. A `Subscriber` MUST be prepared to receive an `onComplete` event with or without a preceding `Subscription.request(int n)` call.
113+
12. A `Subscriber` MUST be prepared to receive an `onError` event with or without a preceding `Subscription.request(int n)` call.
114+
13. A `Subscriber` MUST make sure that all calls on its `onXXX` methods happen-before the processing of the respective events. I.e. the Subscriber must take care of properly publishing the event to its processing logic.
115+
116+
#### 3. Subscription ([Code](https://github.com/reactive-streams/reactive-streams/blob/master/api/src/main/java/org/reactivestreams/Subscription.java))
120117
121118
```java
122119
public interface Subscription {
@@ -125,33 +122,32 @@ public interface Subscription {
125122
}
126123
````
127124
128-
- A `Subscription` can be used once-and-only-once to represent a subscription by a `Subscriber` to a `Publisher`.
129-
- Calls from a `Subscriber` to `Subscription.request(int n)` can be made directly since it is the responsibility of `Subscription` to handle async dispatching.
130-
- The `Subscription.request` method MUST assume that it will be invoked synchronously and MUST NOT allow unbounded recursion such as `Subscriber.onNext` -> `Subscription.request` -> `Subscriber.onNext`.
131-
- The `Subscription.request` method SHOULD NOT synchronously perform heavy computations.
132-
- The `Subscription.cancel` method MUST assume that it will be invoked synchronously and SHOULD NOT synchronously perform heavy computations.
133-
- When the `Subscription` is cancelled, `Subscription.request(int n)` MUST ignore the call.
134-
- When the `Subscription` is cancelled, `Subscription.cancel()` MUST ignore the call.
135-
- When the `Subscription` is not cancelled, `Subscription.request(int n)` MUST register the given number of additional elements to be produced to the respective subscriber.
136-
- When the `Subscription` is not cancelled, `Subscription.request(int n)` MUST throw a `java.lang.IllegalArgumentException` if the argument is <= 0.
137-
- When the `Subscription` is not cancelled, `Subscription.request(int n)` COULD synchronously call `onNext` on this (or other) subscriber(s) if and only if the next element is already available.
138-
- When the `Subscription` is not cancelled, `Subscription.request(int n)` COULD synchronously call `onComplete` or `onError` on this (or other) subscriber(s).
139-
- When the `Subscription` is not cancelled, `Subscription.cancel()` the `Publisher` MUST eventually cease to call any methods on the corresponding subscriber.
140-
- When the `Subscription` is not cancelled, `Subscription.cancel()` the `Publisher` MUST eventually drop any references to the corresponding subscriber. Re-subscribing with the same `Subscriber` instance is discouraged, but this specification does not mandate that it is disallowed since that would mean having to store previously canceled subscriptions indefinitely.
141-
- When the `Subscription` is not cancelled, `Subscription.cancel()` the `Publisher` MUST shut itself down if the given Subscription is the last downstream `Subscription`. Explicitly adding "keep-alive" Subscribers SHOULD prevent automatic shutdown if required.
142-
143-
#### Processor ([Code](https://github.com/reactive-streams/reactive-streams/blob/master/api/src/main/java/org/reactivestreams/Processor.java))
125+
1. A `Subscription` can be used once-and-only-once to represent a subscription by a `Subscriber` to a `Publisher`.
126+
2. Calls from a `Subscriber` to `Subscription.request(int n)` can be made directly since it is the responsibility of `Subscription` to handle async dispatching.
127+
3. The `Subscription.request` method MUST assume that it will be invoked synchronously and MUST NOT allow unbounded recursion such as `Subscriber.onNext` -> `Subscription.request` -> `Subscriber.onNext`.
128+
4. The `Subscription.request` method SHOULD NOT synchronously perform heavy computations.
129+
5. The `Subscription.cancel` method MUST assume that it will be invoked synchronously and SHOULD NOT synchronously perform heavy computations.
130+
6. After the `Subscription` is cancelled, additional `Subscription.request(int n)` MUST be NOPs.
131+
7. After the `Subscription` is cancelled, additional `Subscription.cancel()` MUST be NOPs.
132+
8. When the `Subscription` is not cancelled, `Subscription.request(int n)` MUST register the given number of additional elements to be produced to the respective subscriber.
133+
9. When the `Subscription` is not cancelled, `Subscription.request(int n)` MUST throw a `java.lang.IllegalArgumentException` if the argument is <= 0. The cause message SHOULD include a reference to this rule and/or quote the full rule.
134+
10. When the `Subscription` is not cancelled, `Subscription.request(int n)` COULD synchronously call `onNext` on this (or other) subscriber(s) if and only if the next element is already available.
135+
11. When the `Subscription` is not cancelled, `Subscription.request(int n)` COULD synchronously call `onComplete` or `onError` on this (or other) subscriber(s).
136+
12. When the `Subscription` is not cancelled, `Subscription.cancel()` the `Publisher` MUST eventually cease to call any methods on the corresponding subscriber.
137+
13. When the `Subscription` is not cancelled, `Subscription.cancel()` the `Publisher` MUST eventually drop any references to the corresponding subscriber. Re-subscribing with the same `Subscriber` instance is discouraged, but this specification does not mandate that it is disallowed since that would mean having to store previously canceled subscriptions indefinitely.
138+
14. When the `Subscription` is not cancelled, `Subscription.cancel()` the `Publisher` MUST transition to a `shut-down` state [see 1.17] if the given `Subscription` is the last downstream `Subscription`. Explicitly adding "keep-alive" Subscribers SHOULD prevent automatic shutdown if required.
139+
140+
#### 4.Processor ([Code](https://github.com/reactive-streams/reactive-streams/blob/master/api/src/main/java/org/reactivestreams/Processor.java))
144141
145142
```java
146143
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
147144
}
148145
````
149146
150-
- A `Processor` represents a processing stage—which is both a `Subscriber` and a `Publisher` and obeys the contracts of both.
151-
- A `Processor` must cancel its upstream Subscription if its last downstream Subscription has been cancelled
152-
- A `Processor` must immediately pass on `onError` events received from its upstream to its downstream
153-
- A `Processor` must be prepared to receive incoming elements from its upstream even if a downstream subscriber has not
154-
requested anything yet
147+
1. A `Processor` represents a processing stage—which is both a `Subscriber` and a `Publisher` and obeys the contracts of both.
148+
2. A `Processor` must cancel its upstream Subscription if its last downstream Subscription has been cancelled.
149+
3. A `Processor` must immediately pass on `onError` events received from its upstream to its downstream.
150+
4. A `Processor` must be prepared to receive incoming elements from its upstream even if a downstream subscriber has not requested anything yet.
155151
156152
### Asynchronous vs Synchronous Processing ###
157153
@@ -213,4 +209,4 @@ Subscribers signaling a demand for one element after the reception of an element
213209
214210
## Legal
215211
216-
This project is a collaboration between engineers from Netflix, Twitter, RedHat, Pivotal, Typesafe and many others. The code is offered to the Public Domain in order to allow free use by interested parties who want to create compatible implementations. For details see `COPYING`.
212+
This project is a collaboration between engineers from Netflix, Twitter, RedHat, Pivotal, Typesafe and many others. The code is offered to the Public Domain in order to allow free use by interested parties who want to create compatible implementations. For details see `COPYING`.

0 commit comments

Comments
 (0)