Skip to content

Addressing typos and elaboration requests in the spec. #409

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 16, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ followed by a possibly unbounded number of `onNext` signals (as requested by `Su
| <a name="term_terminal_state">Terminal state</a> | For a Publisher: When `onComplete` or `onError` has been signalled. For a Subscriber: When an `onComplete` or `onError` has been received.|
| <a name="term_nop">NOP</a> | Execution that has no detectable effect to the calling thread, and can as such safely be called any number of times.|
| <a name="term_ext_sync">External synchronization</a> | Access coordination for thread safety purposes implemented outside of the constructs defined in this specification, using techniques such as, but not limited to, `atomics`, `monitors`, or `locks`. |
| <a name="term_thread-safe">Thread-safe</a> | Can be safely invoked synchronously, or asychronously, without requiring external synchronization to ensure program correctness. |
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@reactive-streams/contributors I'm sure this definition can be improved upon, so please suggest improvements. :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it clear that this doesn't imply that it is safe to be called in parallel?


### SPECIFICATION

Expand All @@ -97,7 +98,7 @@ public interface Publisher<T> {
| [:bulb:](#1.1 "1.1 explained") | *The intent of this rule is to make it clear that Publishers cannot signal more elements than Subscribers have requested. There’s an implicit, but important, consequence to this rule: Since demand can only be fulfilled after it has been received, there’s a happens-before relationship between requesting elements and receiving elements.* |
| <a name="1.2">2</a> | A `Publisher` MAY signal fewer `onNext` than requested and terminate the `Subscription` by calling `onComplete` or `onError`. |
| [:bulb:](#1.2 "1.2 explained") | *The intent of this rule is to make it clear that a Publisher cannot guarantee that it will be able to produce the number of elements requested; it simply might not be able to produce them all; it may be in a failed state; it may be empty or otherwise already completed.* |
| <a name="1.3">3</a> | `onSubscribe`, `onNext`, `onError` and `onComplete` signaled to a `Subscriber` MUST be signaled in a `thread-safe` manner—and if performed by multiple threads—use [external synchronization](#term_ext_sync). |
| <a name="1.3">3</a> | `onSubscribe`, `onNext`, `onError` and `onComplete` signaled to a `Subscriber` MUST be signaled in a [thread-safe](#term_thread-safe) manner—and if performed by multiple threads—use [external synchronization](#term_ext_sync). |
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@reactive-streams/contributors Will the definition of thread-safe be in conflict with this clause about external synchronization or is it clear what it means here?

| [:bulb:](#1.3 "1.3 explained") | *The intent of this rule is to make it clear that [external synchronization](#term_ext_sync) must be employed if the Publisher intends to send signals from multiple/different threads.* |
| <a name="1.4">4</a> | If a `Publisher` fails it MUST signal an `onError`. |
| [:bulb:](#1.4 "1.4 explained") | *The intent of this rule is to make it clear that a Publisher is responsible for notifying its Subscribers if it detects that it cannot proceed—Subscribers must be given a chance to clean up resources or otherwise deal with the Publisher´s failures.* |
Expand All @@ -108,7 +109,7 @@ public interface Publisher<T> {
| <a name="1.7">7</a> | Once a [terminal state](#term_terminal_state) has been signaled (`onError`, `onComplete`) it is REQUIRED that no further signals occur. |
| [:bulb:](#1.7 "1.7 explained") | *The intent of this rule is to make sure that onError and onComplete are the final states of an interaction between a Publisher and Subscriber pair.* |
| <a name="1.8">8</a> | If a `Subscription` is cancelled its `Subscriber` MUST eventually stop being signaled. |
| [:bulb:](#1.8 "1.8 explained") | *The intent of this rule is to make sure that Publishers respect a Subscriber’s request to cancel a Subscription when Subscription.cancel() has been called. The reason for *eventually* is because signals can have propagation delay due to being asynchronous.* |
| [:bulb:](#1.8 "1.8 explained") | *The intent of this rule is to make sure that Publishers respect a Subscriber’s request to cancel a Subscription when Subscription.cancel() has been called. The reason for **eventually** is because signals can have propagation delay due to being asynchronous.* |
| <a name="1.9">9</a> | `Publisher.subscribe` MUST call `onSubscribe` on the provided `Subscriber` prior to any other signals to that `Subscriber` and MUST [return normally](#term_return_normally), except when the provided `Subscriber` is `null` in which case it MUST throw a `java.lang.NullPointerException` to the caller, for all other situations the only legal way to signal failure (or reject the `Subscriber`) is by calling `onError` (after calling `onSubscribe`). |
| [:bulb:](#1.9 "1.9 explained") | *The intent of this rule is to make sure that `onSubscribe` is always signalled before any of the other signals, so that initialization logic can be executed by the Subscriber when the signal is received. Also `onSubscribe` MUST only be called at most once, [see [2.12](#2.12)]. If the supplied `Subscriber` is `null`, there is nowhere else to signal this but to the caller, which means a `java.lang.NullPointerException` must be thrown. Examples of possible situations: A stateful Publisher can be overwhelmed, bounded by a finite number of underlying resources, exhausted, or in a [terminal state](#term_terminal_state).* |
| <a name="1.10">10</a> | `Publisher.subscribe` MAY be called as many times as wanted but MUST be with a different `Subscriber` each time [see [2.12](#2.12)]. |
Expand All @@ -132,7 +133,7 @@ public interface Subscriber<T> {
| <a name="2.1">1</a> | A `Subscriber` MUST signal demand via `Subscription.request(long n)` to receive `onNext` signals. |
| [:bulb:](#2.1 "2.1 explained") | *The intent of this rule is to establish that it is the responsibility of the Subscriber to signal when, and how many, elements it is able and willing to receive.* |
| <a name="2.2">2</a> | If a `Subscriber` suspects that its processing of signals will negatively impact its `Publisher`´s responsivity, it is RECOMMENDED that it asynchronously dispatches its signals. |
| [:bulb:](#2.2 "2.2 explained") | *The intent of this rule is that a Subscriber should [not obstruct](#term_non-obstructing) the progress of the Publisher from an execution point-of-view. In other words, the Subscriber should not starve the Publisher from CPU cycles.* |
| [:bulb:](#2.2 "2.2 explained") | *The intent of this rule is that a Subscriber should [not obstruct](#term_non-obstructing) the progress of the Publisher from an execution point-of-view. In other words, the Subscriber should not starve the Publisher from receiving CPU cycles.* |
| <a name="2.3">3</a> | `Subscriber.onComplete()` and `Subscriber.onError(Throwable t)` MUST NOT call any methods on the `Subscription` or the `Publisher`. |
| [:bulb:](#2.3 "2.3 explained") | *The intent of this rule is to prevent cycles and race-conditions—between Publisher, Subscription and Subscriber—during the processing of completion signals.* |
| <a name="2.4">4</a> | `Subscriber.onComplete()` and `Subscriber.onError(Throwable t)` MUST consider the Subscription cancelled after having received the signal. |
Expand All @@ -144,7 +145,7 @@ public interface Subscriber<T> {
| <a name="2.7">7</a> | A `Subscriber` MUST ensure that all calls on its `Subscription` take place from the same thread or provide for respective [external synchronization](#term_ext_sync). |
| [:bulb:](#2.7 "2.7 explained") | *The intent of this rule is to establish that [external synchronization](#term_ext_sync) must be added if a Subscriber will be using a Subscription concurrently by two or more threads.* |
| <a name="2.8">8</a> | A `Subscriber` MUST be prepared to receive one or more `onNext` signals after having called `Subscription.cancel()` if there are still requested elements pending [see [3.12](#3.12)]. `Subscription.cancel()` does not guarantee to perform the underlying cleaning operations immediately. |
| [:bulb:](#2.8 "2.8 explained") | *The intent of this rule is to highlight that there may be a delay between calling `cancel` the Publisher seeing that.* |
| [:bulb:](#2.8 "2.8 explained") | *The intent of this rule is to highlight that there may be a delay between calling `cancel` and the Publisher observing that cancellation.* |
| <a name="2.9">9</a> | A `Subscriber` MUST be prepared to receive an `onComplete` signal with or without a preceding `Subscription.request(long n)` call. |
| [:bulb:](#2.9 "2.9 explained") | *The intent of this rule is to establish that completion is unrelated to the demand flow—this allows for streams which complete early, and obviates the need to *poll* for completion.* |
| <a name="2.10">10</a> | A `Subscriber` MUST be prepared to receive an `onError` signal with or without a preceding `Subscription.request(long n)` call. |
Expand Down Expand Up @@ -172,10 +173,10 @@ public interface Subscription {
| <a name="3.2">2</a> | The `Subscription` MUST allow the `Subscriber` to call `Subscription.request` synchronously from within `onNext` or `onSubscribe`. |
| [:bulb:](#3.2 "3.2 explained") | *The intent of this rule is to make it clear that implementations of `request` must be reentrant, to avoid stack overflows in the case of mutual recursion between `request` and `onNext` (and eventually `onComplete` / `onError`). This implies that Publishers can be `synchronous`, i.e. signalling `onNext`´s on the thread which calls `request`.* |
| <a name="3.3">3</a> | `Subscription.request` MUST place an upper bound on possible synchronous recursion between `Publisher` and `Subscriber`. |
| [:bulb:](#3.3 "3.3 explained") | *The intent of this rule is to complement [see [3.2](#3.2)] by placing an upper limit on the mutual recursion between `request` and `onNext` (and eventually `onComplete` / `onError`). Implementations are RECOMMENDED to limit this mutual recursion to a depth of `1` (ONE)—for the sake of conserving stack space. An example for undesirable synchronous, open recursion would be Subscriber.onNext -> Subscription.request -> Subscriber.onNext -> …, as it otherwise will result in blowing the calling Thread´s stack.* |
| [:bulb:](#3.3 "3.3 explained") | *The intent of this rule is to complement [see [3.2](#3.2)] by placing an upper limit on the mutual recursion between `request` and `onNext` (and eventually `onComplete` / `onError`). Implementations are RECOMMENDED to limit this mutual recursion to a depth of `1` (ONE)—for the sake of conserving stack space. An example for undesirable synchronous, open recursion would be Subscriber.onNext -> Subscription.request -> Subscriber.onNext -> …, as it otherwise will result in blowing the calling thread´s stack.* |
| <a name="3.4">4</a> | `Subscription.request` SHOULD respect the responsivity of its caller by returning in a timely manner. |
| [:bulb:](#3.4 "3.4 explained") | *The intent of this rule is to establish that `request` is intended to be a [non-obstructing](#term_non-obstructing) method, and should be as quick to execute as possible on the calling thread, so avoid heavy computations and other things that would stall the caller´s thread of execution.* |
| <a name="3.5">5</a> | `Subscription.cancel` MUST respect the responsivity of its caller by returning in a timely manner, MUST be idempotent and MUST be thread-safe. |
| <a name="3.5">5</a> | `Subscription.cancel` MUST respect the responsivity of its caller by returning in a timely manner, MUST be idempotent and MUST be [thread-safe](#term_thread-safe). |
| [:bulb:](#3.5 "3.5 explained") | *The intent of this rule is to establish that `cancel` is intended to be a [non-obstructing](#term_non-obstructing) method, and should be as quick to execute as possible on the calling thread, so avoid heavy computations and other things that would stall the caller´s thread of execution. Furthermore, it is also important that it is possible to call it multiple times without any adverse effects.* |
| <a name="3.6">6</a> | After the `Subscription` is cancelled, additional `Subscription.request(long n)` MUST be [NOPs](#term_nop). |
| [:bulb:](#3.6 "3.6 explained") | *The intent of this rule is to establish a causal relationship between cancellation of a subscription and the subsequent non-operation of requesting more elements.* |
Expand Down Expand Up @@ -218,7 +219,7 @@ public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
| <a name="4.2">2</a> | A `Processor` MAY choose to recover an `onError` signal. If it chooses to do so, it MUST consider the `Subscription` cancelled, otherwise it MUST propagate the `onError` signal to its Subscribers immediately. |
| [:bulb:](#4.2 "4.2 explained") | *The intent of this rule is to inform that it’s possible for implementations to be more than simple transformations.* |

While not mandated, it can be a good idea to cancel a `Processors` upstream `Subscription` when/if its last `Subscriber` cancels their `Subscription`,
While not mandated, it can be a good idea to cancel a `Processor`´s upstream `Subscription` when/if its last `Subscriber` cancels their `Subscription`,
to let the cancellation signal propagate upstream.

### Asynchronous vs Synchronous Processing ###
Expand Down