From 0c49c7f14936644b8780ac885e411186abc31e35 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 14 Jul 2014 13:12:20 +0200 Subject: [PATCH] Augments the size of Subscription.request(n) from int to long and adds a spec provision that a cumulative demand of Long.MAX_VALUE MAY be treated as 'effectively unbounded' --- README.md | 20 ++++++++++--------- .../org/reactivestreams/Subscription.java | 5 +++-- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index ae2e8111..5a30f488 100644 --- a/README.md +++ b/README.md @@ -104,7 +104,7 @@ public interface Subscriber { | ID | Rule | | ------ | ------------------------------------------------------------------------------------------------------ | -| 1 | A `Subscriber` MUST signal demand via `Subscription.request(int n)` to receive `onNext` signals | +| 1 | A `Subscriber` MUST signal demand via `Subscription.request(long n)` to receive `onNext` signals | | 2 | If a `Subscriber` suspects that its processing of signals will negatively impact its `Publisher`'s responsivity, it is RECOMMENDED that it asynchronously dispatches its signals | | 3 | `Subscriber.onComplete()` and `Subscriber.onError(Throwable t)` MUST NOT call any methods on the `Subscription` or the `Publisher` | | 4 | `Subscriber.onComplete()` and `Subscriber.onError(Throwable t)` MUST consider the Subscription cancelled after having received the signal | @@ -112,8 +112,8 @@ public interface Subscriber { | 6 | A `Subscriber` MUST call `Subscription.cancel()` if it is no longer valid to the `Publisher` without the `Publisher` having signaled `onError` or `onComplete` | | 7 | A `Subscriber` MUST ensure that all calls on its `Subscription` take place from the same thread or provide for respective external synchronization | | 8 | A `Subscriber` MUST be prepared to receive one or more `onNext` signals after having called `Subscription.cancel()` if there are still requested elements pending [see 3.12]. `Subscription.cancel()` does not guarantee to perform the underlying cleaning operations immediately | -| 9 | A `Subscriber` MUST be prepared to receive an `onComplete` signal with or without a preceding `Subscription.request(int n)` call | -| 10 | A `Subscriber` MUST be prepared to receive an `onError` signal with or without a preceding `Subscription.request(int n)` call | +| 9 | A `Subscriber` MUST be prepared to receive an `onComplete` signal with or without a preceding `Subscription.request(long n)` call | +| 10 | A `Subscriber` MUST be prepared to receive an `onError` signal with or without a preceding `Subscription.request(long n)` call | | 11 | A `Subscriber` MUST make sure that all calls on its `onXXX` methods happen-before [1] the processing of the respective signals. I.e. the Subscriber must take care of properly publishing the signal to its processing logic | | 12 | `Subscriber.onSubscribe` MUST NOT be called more than once (based on object equality) | | 13 | A failing `onComplete` invocation (e.g. throwing an exception) is a specification violation and MUST signal `onError` with `java.lang.IllegalStateException`. The cause message MUST include a reference to this rule and/or quote the full rule | @@ -137,18 +137,20 @@ public interface Subscription { | 3 | `Subscription.request` MUST NOT allow unbounded recursion such as `Subscriber.onNext` -> `Subscription.request` -> `Subscriber.onNext` | | 4 | `Subscription.request` SHOULD NOT synchronously perform heavy computations that would impact its caller's responsivity | | 5 | `Subscription.cancel` MUST NOT synchronously perform heavy computations, MUST be idempotent and MUST be thread-safe | -| 6 | After the `Subscription` is cancelled, additional `Subscription.request(int n)` MUST be NOPs | +| 6 | After the `Subscription` is cancelled, additional `Subscription.request(long n)` MUST be NOPs | | 7 | After the `Subscription` is cancelled, additional `Subscription.cancel()` MUST be NOPs | -| 8 | While the `Subscription` is not cancelled, `Subscription.request(int n)` MUST register the given number of additional elements to be produced to the respective subscriber | -| 9 | While the `Subscription` is not cancelled, `Subscription.request(int n)` MUST throw a `java.lang.IllegalArgumentException` if the argument is <= 0. The cause message MUST include a reference to this rule and/or quote the full rule | -| 10 | While the `Subscription` is not cancelled, `Subscription.request(int n)` MAY synchronously call `onNext` on this (or other) subscriber(s) | -| 11 | While the `Subscription` is not cancelled, `Subscription.request(int n)` MAY synchronously call `onComplete` or `onError` on this (or other) subscriber(s) | +| 8 | While the `Subscription` is not cancelled, `Subscription.request(long n)` MUST register the given number of additional elements to be produced to the respective subscriber | +| 9 | While the `Subscription` is not cancelled, `Subscription.request(long n)` MUST throw a `java.lang.IllegalArgumentException` if the argument is <= 0. The cause message MUST include a reference to this rule and/or quote the full rule | +| 10 | While the `Subscription` is not cancelled, `Subscription.request(long n)` MAY synchronously call `onNext` on this (or other) subscriber(s) | +| 11 | While the `Subscription` is not cancelled, `Subscription.request(long n)` MAY synchronously call `onComplete` or `onError` on this (or other) subscriber(s) | | 12 | While the `Subscription` is not cancelled, `Subscription.cancel()` MUST request the `Publisher` to eventually stop signaling its `Subscriber`. The operation is NOT REQUIRED to affect the `Subscription` immediately. | | 13 | While the `Subscription` is not cancelled, `Subscription.cancel()` MUST request the `Publisher` to eventually drop any references to the corresponding subscriber. Re-subscribing with the same `Subscriber` object is discouraged [see 2.12], but this specification does not mandate that it is disallowed since that would mean having to store previously canceled subscriptions indefinitely | | 14 | While the `Subscription` is not cancelled, invoking `Subscription.cancel` MAY cause the `Publisher` to transition into the `shut-down` state if no other `Subscription` exists at this point [see 1.17]. | 15 | `Subscription.cancel` MUST NOT throw an `Exception` and MUST signal `onError` to its `Subscriber` | | 16 | `Subscription.request` MUST NOT throw an `Exception` and MUST signal `onError` to its `Subscriber` | -| 17 | A `Subscription MUST support an unbounded number of calls to request and MUST support a pending request count up to 2^63-1 (java.lang.Long.MAX_VALUE). If more than 2^63-1 are requested in pending then it MUST signal an onError with `java.lang.IllegalStateException` on the given `Subscriber`. The cause message MUST include a reference to this rule and/or quote the full rule. | +| 17 | A `Subscription MUST support an unbounded number of calls to request and MUST support a pending request count up to 2^63-1 (java.lang.Long.MAX_VALUE). A pending request count of exactly 2^63-1 (java.lang.Long.MAX_VALUE) MAY be considered by the `Publisher` as `effectively unbounded`[1]. If more than 2^63-1 are requested in pending then it MUST signal an onError with `java.lang.IllegalStateException` on the given `Subscriber`. The cause message MUST include a reference to this rule and/or quote the full rule. | + +[1] : As it is not feasibly reachable with current or forseen hardware within a reasonable amount of time (1 element per nanosecond would take 292 years) to fulfill a demand of 2^63-1. A `Subscription` is shared by exactly one `Publisher` and one `Subscriber` for the purpose of mediating the data exchange between this pair. This is the reason why the `subscribe()` method does not return the created `Subscription`, but instead returns `void`; the `Subscription` is only passed to the `Subscriber` via the `onSubscribe` callback. diff --git a/api/src/main/java/org/reactivestreams/Subscription.java b/api/src/main/java/org/reactivestreams/Subscription.java index 402893f8..bab4236d 100644 --- a/api/src/main/java/org/reactivestreams/Subscription.java +++ b/api/src/main/java/org/reactivestreams/Subscription.java @@ -12,7 +12,8 @@ public interface Subscription { /** * No events will be sent by a {@link Publisher} until demand is signaled via this method. *

- * It can be called however often and whenever needed. + * It can be called however often and whenever needed—but the outstanding cumulative demand must never exceed Long.MAX_VALUE. + * An outstanding cumulative demand of Long.MAX_VALUE may be treated by the {@link Publisher} as "effectively unbounded". *

* Whatever has been requested can be sent by the {@link Publisher} so only signal demand for what can be safely handled. *

@@ -21,7 +22,7 @@ public interface Subscription { * * @param n the strictly positive number of elements to requests to the upstream {@link Publisher} */ - public void request(int n); + public void request(long n); /** * Request the {@link Publisher} to stop sending data and clean up resources.