Skip to content

Augments the size of Subscription.request(n) from int to long and adds a... #82

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 22, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,16 @@ public interface Subscriber<T> {

| ID | Rule |
| ------ | ------------------------------------------------------------------------------------------------------ |
| 1 | A `Subscriber` MUST signal demand via `Subscription.request(int n)` to receive `onNext` signals |
| 1 | A `Subscriber` MUST signal demand via `Subscription.request(long n)` to receive `onNext` signals |
| 2 | If a `Subscriber` suspects that its processing of signals will negatively impact its `Publisher`'s responsivity, it is RECOMMENDED that it asynchronously dispatches its signals |
| 3 | `Subscriber.onComplete()` and `Subscriber.onError(Throwable t)` MUST NOT call any methods on the `Subscription` or the `Publisher` |
| 4 | `Subscriber.onComplete()` and `Subscriber.onError(Throwable t)` MUST consider the Subscription cancelled after having received the signal |
| 5 | A `Subscriber` MUST call `Subscription.cancel()` on the given `Subscription` after an `onSubscribe` signal if it already has an active `Subscription` |
| 6 | A `Subscriber` MUST call `Subscription.cancel()` if it is no longer valid to the `Publisher` without the `Publisher` having signaled `onError` or `onComplete` |
| 7 | A `Subscriber` MUST ensure that all calls on its `Subscription` take place from the same thread or provide for respective external synchronization |
| 8 | A `Subscriber` MUST be prepared to receive one or more `onNext` signals after having called `Subscription.cancel()` if there are still requested elements pending [see 3.12]. `Subscription.cancel()` does not guarantee to perform the underlying cleaning operations immediately |
| 9 | A `Subscriber` MUST be prepared to receive an `onComplete` signal with or without a preceding `Subscription.request(int n)` call |
| 10 | A `Subscriber` MUST be prepared to receive an `onError` signal with or without a preceding `Subscription.request(int n)` call |
| 9 | A `Subscriber` MUST be prepared to receive an `onComplete` signal with or without a preceding `Subscription.request(long n)` call |
| 10 | A `Subscriber` MUST be prepared to receive an `onError` signal with or without a preceding `Subscription.request(long n)` call |
| 11 | A `Subscriber` MUST make sure that all calls on its `onXXX` methods happen-before [1] the processing of the respective signals. I.e. the Subscriber must take care of properly publishing the signal to its processing logic |
| 12 | `Subscriber.onSubscribe` MUST NOT be called more than once (based on object equality) |
| 13 | A failing `onComplete` invocation (e.g. throwing an exception) is a specification violation and MUST signal `onError` with `java.lang.IllegalStateException`. The cause message MUST include a reference to this rule and/or quote the full rule |
Expand All @@ -137,18 +137,20 @@ public interface Subscription {
| 3 | `Subscription.request` MUST NOT allow unbounded recursion such as `Subscriber.onNext` -> `Subscription.request` -> `Subscriber.onNext` |
| 4 | `Subscription.request` SHOULD NOT synchronously perform heavy computations that would impact its caller's responsivity |
| 5 | `Subscription.cancel` MUST NOT synchronously perform heavy computations, MUST be idempotent and MUST be thread-safe |
| 6 | After the `Subscription` is cancelled, additional `Subscription.request(int n)` MUST be NOPs |
| 6 | After the `Subscription` is cancelled, additional `Subscription.request(long n)` MUST be NOPs |
| 7 | After the `Subscription` is cancelled, additional `Subscription.cancel()` MUST be NOPs |
| 8 | While the `Subscription` is not cancelled, `Subscription.request(int n)` MUST register the given number of additional elements to be produced to the respective subscriber |
| 9 | While the `Subscription` is not cancelled, `Subscription.request(int n)` MUST throw a `java.lang.IllegalArgumentException` if the argument is <= 0. The cause message MUST include a reference to this rule and/or quote the full rule |
| 10 | While the `Subscription` is not cancelled, `Subscription.request(int n)` MAY synchronously call `onNext` on this (or other) subscriber(s) |
| 11 | While the `Subscription` is not cancelled, `Subscription.request(int n)` MAY synchronously call `onComplete` or `onError` on this (or other) subscriber(s) |
| 8 | While the `Subscription` is not cancelled, `Subscription.request(long n)` MUST register the given number of additional elements to be produced to the respective subscriber |
| 9 | While the `Subscription` is not cancelled, `Subscription.request(long n)` MUST throw a `java.lang.IllegalArgumentException` if the argument is <= 0. The cause message MUST include a reference to this rule and/or quote the full rule |
| 10 | While the `Subscription` is not cancelled, `Subscription.request(long n)` MAY synchronously call `onNext` on this (or other) subscriber(s) |
| 11 | While the `Subscription` is not cancelled, `Subscription.request(long n)` MAY synchronously call `onComplete` or `onError` on this (or other) subscriber(s) |
| 12 | While the `Subscription` is not cancelled, `Subscription.cancel()` MUST request the `Publisher` to eventually stop signaling its `Subscriber`. The operation is NOT REQUIRED to affect the `Subscription` immediately. |
| 13 | While the `Subscription` is not cancelled, `Subscription.cancel()` MUST request the `Publisher` to eventually drop any references to the corresponding subscriber. Re-subscribing with the same `Subscriber` object is discouraged [see 2.12], but this specification does not mandate that it is disallowed since that would mean having to store previously canceled subscriptions indefinitely |
| 14 | While the `Subscription` is not cancelled, invoking `Subscription.cancel` MAY cause the `Publisher` to transition into the `shut-down` state if no other `Subscription` exists at this point [see 1.17].
| 15 | `Subscription.cancel` MUST NOT throw an `Exception` and MUST signal `onError` to its `Subscriber` |
| 16 | `Subscription.request` MUST NOT throw an `Exception` and MUST signal `onError` to its `Subscriber` |
| 17 | A `Subscription MUST support an unbounded number of calls to request and MUST support a pending request count up to 2^63-1 (java.lang.Long.MAX_VALUE). If more than 2^63-1 are requested in pending then it MUST signal an onError with `java.lang.IllegalStateException` on the given `Subscriber`. The cause message MUST include a reference to this rule and/or quote the full rule. |
| 17 | A `Subscription MUST support an unbounded number of calls to request and MUST support a pending request count up to 2^63-1 (java.lang.Long.MAX_VALUE). A pending request count of exactly 2^63-1 (java.lang.Long.MAX_VALUE) MAY be considered by the `Publisher` as `effectively unbounded`[1]. If more than 2^63-1 are requested in pending then it MUST signal an onError with `java.lang.IllegalStateException` on the given `Subscriber`. The cause message MUST include a reference to this rule and/or quote the full rule. |

[1] : As it is not feasibly reachable with current or forseen hardware within a reasonable amount of time (1 element per nanosecond would take 292 years) to fulfill a demand of 2^63-1.

A `Subscription` is shared by exactly one `Publisher` and one `Subscriber` for the purpose of mediating the data exchange between this pair. This is the reason why the `subscribe()` method does not return the created `Subscription`, but instead returns `void`; the `Subscription` is only passed to the `Subscriber` via the `onSubscribe` callback.

Expand Down
5 changes: 3 additions & 2 deletions api/src/main/java/org/reactivestreams/Subscription.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ public interface Subscription {
/**
* No events will be sent by a {@link Publisher} until demand is signaled via this method.
* <p>
* It can be called however often and whenever needed.
* It can be called however often and whenever needed—but the outstanding cumulative demand must never exceed Long.MAX_VALUE.
* An outstanding cumulative demand of Long.MAX_VALUE may be treated by the {@link Publisher} as "effectively unbounded".
* <p>
* Whatever has been requested can be sent by the {@link Publisher} so only signal demand for what can be safely handled.
* <p>
Expand All @@ -21,7 +22,7 @@ public interface Subscription {
*
* @param n the strictly positive number of elements to requests to the upstream {@link Publisher}
*/
public void request(int n);
public void request(long n);

/**
* Request the {@link Publisher} to stop sending data and clean up resources.
Expand Down