From a3005b779344bf8ed6047d73d89ad9b39bb1d1a7 Mon Sep 17 00:00:00 2001 From: Roland Kuhn Date: Fri, 16 Jan 2015 09:58:25 +0100 Subject: [PATCH] remove requirement to track demand beyond Long.MAX_VALUE fixes #196 Also terminate spec rule sentences with a full stop where missing. --- README.md | 104 +++++++++--------- .../unicast/AsyncIterablePublisher.java | 8 +- .../tck/IdentityProcessorVerification.java | 4 +- .../tck/PublisherVerification.java | 25 +++-- .../support/PublisherVerificationRules.java | 2 +- .../tck/PublisherVerificationTest.java | 29 +++-- 6 files changed, 89 insertions(+), 83 deletions(-) diff --git a/README.md b/README.md index a417cd44..ca4b94b2 100644 --- a/README.md +++ b/README.md @@ -76,21 +76,21 @@ public interface Publisher { } ```` -| ID | Rule | -| ------------------------- | ------------------------------------------------------------------------------------------------------ | -| 1 | The total number of `onNext` signals sent by a `Publisher` to a `Subscriber` MUST be less than or equal to the total number of elements requested by that `Subscriber`´s `Subscription` at all times | -| 2 | A `Publisher` MAY signal less `onNext` than requested and terminate the `Subscription` by calling `onComplete` or `onError` | -| 3 | `onSubscribe`, `onNext`, `onError` and `onComplete` signaled to a `Subscriber` MUST be signaled sequentially (no concurrent notifications) | -| 4 | If a `Publisher` fails it MUST signal an `onError` | -| 5 | If a `Publisher` terminates successfully (finite stream) it MUST signal an `onComplete` | -| 6 | If a `Publisher` signals either `onError` or `onComplete` on a `Subscriber`, that `Subscriber`’s `Subscription` MUST be considered cancelled | -| 7 | Once a terminal state has been signaled (`onError`, `onComplete`) it is REQUIRED that no further signals occur | -| 8 | If a `Subscription` is cancelled its `Subscriber` MUST eventually stop being signaled | -| 9 | Calling `Publisher.subscribe` MUST return normally. The only legal way to signal failure (or reject a `Subscriber`) is via the `onError` method | -| 10 | `Publisher.subscribe` MAY be called as many times as wanted but MUST be with a different `Subscriber` each time [see [2.12](#2.12)] | -| 11 | A `Publisher` MAY support multi-subscribe and choose whether each `Subscription` is unicast or multicast | -| 12 | A `Publisher` MAY reject calls to its `subscribe` method if it is unable or unwilling to serve them [[1](#footnote-1-1)]. If rejecting it MUST do this by calling `onError` on the `Subscriber` passed to `Publisher.subscribe` instead of calling `onSubscribe` | -| 13 | A `Publisher` MUST produce the same elements, starting with the oldest element still available, in the same sequence for all its subscribers and MAY produce the stream elements at (temporarily) differing rates to different subscribers | +| ID | Rule . | +| ------------------------- | ------------------------------------------------------------------------------------------------------. | +| 1 | The total number of `onNext` signals sent by a `Publisher` to a `Subscriber` MUST be less than or equal to the total number of elements requested by that `Subscriber`´s `Subscription` at all times. | +| 2 | A `Publisher` MAY signal less `onNext` than requested and terminate the `Subscription` by calling `onComplete` or `onError`. | +| 3 | `onSubscribe`, `onNext`, `onError` and `onComplete` signaled to a `Subscriber` MUST be signaled sequentially (no concurrent notifications). | +| 4 | If a `Publisher` fails it MUST signal an `onError`. | +| 5 | If a `Publisher` terminates successfully (finite stream) it MUST signal an `onComplete`. | +| 6 | If a `Publisher` signals either `onError` or `onComplete` on a `Subscriber`, that `Subscriber`’s `Subscription` MUST be considered cancelled. | +| 7 | Once a terminal state has been signaled (`onError`, `onComplete`) it is REQUIRED that no further signals occur. | +| 8 | If a `Subscription` is cancelled its `Subscriber` MUST eventually stop being signaled. | +| 9 | Calling `Publisher.subscribe` MUST return normally. The only legal way to signal failure (or reject a `Subscriber`) is via the `onError` method. | +| 10 | `Publisher.subscribe` MAY be called as many times as wanted but MUST be with a different `Subscriber` each time [see [2.12](#2.12)]. | +| 11 | A `Publisher` MAY support multi-subscribe and choose whether each `Subscription` is unicast or multicast. | +| 12 | A `Publisher` MAY reject calls to its `subscribe` method if it is unable or unwilling to serve them [[1](#footnote-1-1)]. If rejecting it MUST do this by calling `onError` on the `Subscriber` passed to `Publisher.subscribe` instead of calling `onSubscribe`. | +| 13 | A `Publisher` MUST produce the same elements, starting with the oldest element still available, in the same sequence for all its subscribers and MAY produce the stream elements at (temporarily) differing rates to different subscribers. | [1] : A stateful Publisher can be overwhelmed, bounded by a finite number of underlying resources, exhausted, shut-down or in a failed state. @@ -105,21 +105,21 @@ public interface Subscriber { } ```` -| ID | Rule | -| ------------------------- | ------------------------------------------------------------------------------------------------------ | -| 1 | A `Subscriber` MUST signal demand via `Subscription.request(long n)` to receive `onNext` signals | -| 2 | If a `Subscriber` suspects that its processing of signals will negatively impact its `Publisher`'s responsivity, it is RECOMMENDED that it asynchronously dispatches its signals | -| 3 | `Subscriber.onComplete()` and `Subscriber.onError(Throwable t)` MUST NOT call any methods on the `Subscription` or the `Publisher` | -| 4 | `Subscriber.onComplete()` and `Subscriber.onError(Throwable t)` MUST consider the Subscription cancelled after having received the signal | -| 5 | A `Subscriber` MUST call `Subscription.cancel()` on the given `Subscription` after an `onSubscribe` signal if it already has an active `Subscription` | -| 6 | A `Subscriber` MUST call `Subscription.cancel()` if it is no longer valid to the `Publisher` without the `Publisher` having signaled `onError` or `onComplete` | -| 7 | A `Subscriber` MUST ensure that all calls on its `Subscription` take place from the same thread or provide for respective external synchronization | -| 8 | A `Subscriber` MUST be prepared to receive one or more `onNext` signals after having called `Subscription.cancel()` if there are still requested elements pending [see [3.12](#3.12)]. `Subscription.cancel()` does not guarantee to perform the underlying cleaning operations immediately | -| 9 | A `Subscriber` MUST be prepared to receive an `onComplete` signal with or without a preceding `Subscription.request(long n)` call | -| 10 | A `Subscriber` MUST be prepared to receive an `onError` signal with or without a preceding `Subscription.request(long n)` call | -| 11 | A `Subscriber` MUST make sure that all calls on its `onXXX` methods happen-before [[1]](#footnote-2-1) the processing of the respective signals. I.e. the Subscriber must take care of properly publishing the signal to its processing logic | -| 12 | `Subscriber.onSubscribe` MUST be called at most once for a given `Subscriber` (based on object equality) | -| 13 | Calling `onSubscribe`, `onNext`, `onError` or `onComplete` MUST return normally. The only legal way for a `Subscriber` to signal failure is by cancelling its `Subscription`. In the case that this rule is violated, any associated `Subscription` to the `Subscriber` MUST be considered as cancelled, and the caller MUST raise this error condition in a fashion that is adequate for the runtime environment | +| ID | Rule . | +| ------------------------- | ------------------------------------------------------------------------------------------------------. | +| 1 | A `Subscriber` MUST signal demand via `Subscription.request(long n)` to receive `onNext` signals. | +| 2 | If a `Subscriber` suspects that its processing of signals will negatively impact its `Publisher`'s responsivity, it is RECOMMENDED that it asynchronously dispatches its signals. | +| 3 | `Subscriber.onComplete()` and `Subscriber.onError(Throwable t)` MUST NOT call any methods on the `Subscription` or the `Publisher`. | +| 4 | `Subscriber.onComplete()` and `Subscriber.onError(Throwable t)` MUST consider the Subscription cancelled after having received the signal. | +| 5 | A `Subscriber` MUST call `Subscription.cancel()` on the given `Subscription` after an `onSubscribe` signal if it already has an active `Subscription`. | +| 6 | A `Subscriber` MUST call `Subscription.cancel()` if it is no longer valid to the `Publisher` without the `Publisher` having signaled `onError` or `onComplete`. | +| 7 | A `Subscriber` MUST ensure that all calls on its `Subscription` take place from the same thread or provide for respective external synchronization. | +| 8 | A `Subscriber` MUST be prepared to receive one or more `onNext` signals after having called `Subscription.cancel()` if there are still requested elements pending [see [3.12](#3.12)]. `Subscription.cancel()` does not guarantee to perform the underlying cleaning operations immediately. | +| 9 | A `Subscriber` MUST be prepared to receive an `onComplete` signal with or without a preceding `Subscription.request(long n)` call. | +| 10 | A `Subscriber` MUST be prepared to receive an `onError` signal with or without a preceding `Subscription.request(long n)` call. | +| 11 | A `Subscriber` MUST make sure that all calls on its `onXXX` methods happen-before [[1]](#footnote-2-1) the processing of the respective signals. I.e. the Subscriber must take care of properly publishing the signal to its processing logic. | +| 12 | `Subscriber.onSubscribe` MUST be called at most once for a given `Subscriber` (based on object equality). | +| 13 | Calling `onSubscribe`, `onNext`, `onError` or `onComplete` MUST return normally. The only legal way for a `Subscriber` to signal failure is by cancelling its `Subscription`. In the case that this rule is violated, any associated `Subscription` to the `Subscriber` MUST be considered as cancelled, and the caller MUST raise this error condition in a fashion that is adequate for the runtime environment. | [1] : See JMM definition of Happen-Before in section 17.4.5. on http://docs.oracle.com/javase/specs/jls/se7/html/jls-17.html @@ -132,31 +132,31 @@ public interface Subscription { } ```` -| ID | Rule | -| ------------------------- | ------------------------------------------------------------------------------------------------------ | -| 1 | `Subscription.request` and `Subscription.cancel` MUST only be called inside of its `Subscriber` context. A `Subscription` represents the unique relationship between a `Subscriber` and a `Publisher` [see [2.12](#2.12)] | -| 2 | The `Subscription` MUST allow the `Subscriber` to call `Subscription.request` synchronously from within `onNext` or `onSubscribe` | -| 3 | `Subscription.request` MUST place an upper bound on possible synchronous recursion between `Publisher` and `Subscriber`[[1](#footnote-3-1)] | -| 4 | `Subscription.request` SHOULD respect the responsivity of its caller by returning in a timely manner[[2](#footnote-3-2)] | -| 5 | `Subscription.cancel` MUST respect the responsivity of its caller by returning in a timely manner[[2](#footnote-3-2)], MUST be idempotent and MUST be thread-safe | -| 6 | After the `Subscription` is cancelled, additional `Subscription.request(long n)` MUST be NOPs | -| 7 | After the `Subscription` is cancelled, additional `Subscription.cancel()` MUST be NOPs | -| 8 | While the `Subscription` is not cancelled, `Subscription.request(long n)` MUST register the given number of additional elements to be produced to the respective subscriber | -| 9 | While the `Subscription` is not cancelled, `Subscription.request(long n)` MUST signal `onError` with a `java.lang.IllegalArgumentException` if the argument is <= 0. The cause message MUST include a reference to this rule and/or quote the full rule | -| 10 | While the `Subscription` is not cancelled, `Subscription.request(long n)` MAY synchronously call `onNext` on this (or other) subscriber(s) | -| 11 | While the `Subscription` is not cancelled, `Subscription.request(long n)` MAY synchronously call `onComplete` or `onError` on this (or other) subscriber(s) | +| ID | Rule . | +| ------------------------- | ------------------------------------------------------------------------------------------------------. | +| 1 | `Subscription.request` and `Subscription.cancel` MUST only be called inside of its `Subscriber` context. A `Subscription` represents the unique relationship between a `Subscriber` and a `Publisher` [see [2.12](#2.12)]. | +| 2 | The `Subscription` MUST allow the `Subscriber` to call `Subscription.request` synchronously from within `onNext` or `onSubscribe`. | +| 3 | `Subscription.request` MUST place an upper bound on possible synchronous recursion between `Publisher` and `Subscriber`[[1](#footnote-3-1)]. | +| 4 | `Subscription.request` SHOULD respect the responsivity of its caller by returning in a timely manner[[2](#footnote-3-2)]. | +| 5 | `Subscription.cancel` MUST respect the responsivity of its caller by returning in a timely manner[[2](#footnote-3-2)], MUST be idempotent and MUST be thread-safe. | +| 6 | After the `Subscription` is cancelled, additional `Subscription.request(long n)` MUST be NOPs. | +| 7 | After the `Subscription` is cancelled, additional `Subscription.cancel()` MUST be NOPs. | +| 8 | While the `Subscription` is not cancelled, `Subscription.request(long n)` MUST register the given number of additional elements to be produced to the respective subscriber. | +| 9 | While the `Subscription` is not cancelled, `Subscription.request(long n)` MUST signal `onError` with a `java.lang.IllegalArgumentException` if the argument is <= 0. The cause message MUST include a reference to this rule and/or quote the full rule. | +| 10 | While the `Subscription` is not cancelled, `Subscription.request(long n)` MAY synchronously call `onNext` on this (or other) subscriber(s). | +| 11 | While the `Subscription` is not cancelled, `Subscription.request(long n)` MAY synchronously call `onComplete` or `onError` on this (or other) subscriber(s). | | 12 | While the `Subscription` is not cancelled, `Subscription.cancel()` MUST request the `Publisher` to eventually stop signaling its `Subscriber`. The operation is NOT REQUIRED to affect the `Subscription` immediately. | -| 13 | While the `Subscription` is not cancelled, `Subscription.cancel()` MUST request the `Publisher` to eventually drop any references to the corresponding subscriber. Re-subscribing with the same `Subscriber` object is discouraged [see [2.12](#2.12)], but this specification does not mandate that it is disallowed since that would mean having to store previously cancelled subscriptions indefinitely | +| 13 | While the `Subscription` is not cancelled, `Subscription.cancel()` MUST request the `Publisher` to eventually drop any references to the corresponding subscriber. Re-subscribing with the same `Subscriber` object is discouraged [see [2.12](#2.12)], but this specification does not mandate that it is disallowed since that would mean having to store previously cancelled subscriptions indefinitely. | | 14 | While the `Subscription` is not cancelled, calling `Subscription.cancel` MAY cause the `Publisher`, if stateful, to transition into the `shut-down` state if no other `Subscription` exists at this point [see [1.13](#1.13)]. -| 15 | Calling `Subscription.cancel` MUST return normally. The only legal way to signal failure to a `Subscriber` is via the `onError` method | -| 16 | Calling `Subscription.request` MUST return normally. The only legal way to signal failure to a `Subscriber` is via the `onError` method | -| 17 | A `Subscription` MUST support an unbounded number of calls to request and MUST support a demand (sum requested - sum delivered) up to 2^63-1 (`java.lang.Long.MAX_VALUE`). A demand of exactly 2^63-1 (`java.lang.Long.MAX_VALUE`) MAY be considered by the `Publisher` as `effectively unbounded`[[3](#footnote-3-3)]. If demand becomes higher than 2^63-1 then the `Publisher` MUST signal an onError with `java.lang.IllegalStateException` on the given `Subscriber`. The cause message MUST include a reference to this rule and/or quote the full rule | +| 15 | Calling `Subscription.cancel` MUST return normally. The only legal way to signal failure to a `Subscriber` is via the `onError` method. | +| 16 | Calling `Subscription.request` MUST return normally. The only legal way to signal failure to a `Subscriber` is via the `onError` method. | +| 17 | A `Subscription` MUST support an unbounded number of calls to request and MUST support a demand (sum requested - sum delivered) up to 2^63-1 (`java.lang.Long.MAX_VALUE`). A demand equal or greater than 2^63-1 (`java.lang.Long.MAX_VALUE`) MAY be considered by the `Publisher` as “effectively unbounded”[[1](#footnote-3-1)]. | [1] : An example for undesirable synchronous, open recursion would be `Subscriber.onNext` -> `Subscription.request` -> `Subscriber.onNext` -> …, as it very quickly would result in blowing the calling Thread´s stack. [2] : Avoid heavy computations and other things that would stall the caller´s thread of execution -[3] : As it is not feasibly reachable with current or forseen hardware within a reasonable amount of time (1 element per nanosecond would take 292 years) to fulfill a demand of 2^63-1. +[3] : As it is not feasibly reachable with current or foreseen hardware within a reasonable amount of time (1 element per nanosecond would take 292 years) to fulfill a demand of 2^63-1, it is allowed for a `Publisher` to stop tracking demand beyond this point. A `Subscription` is shared by exactly one `Publisher` and one `Subscriber` for the purpose of mediating the data exchange between this pair. This is the reason why the `subscribe()` method does not return the created `Subscription`, but instead returns `void`; the `Subscription` is only passed to the `Subscriber` via the `onSubscribe` callback. @@ -167,10 +167,10 @@ public interface Processor extends Subscriber, Publisher { } ```` -| ID | Rule | -| ------------------------ | ------------------------------------------------------------------------------------------------------ | -| 1 | A `Processor` represents a processing stage—which is both a `Subscriber` and a `Publisher` and MUST obey the contracts of both | -| 2 | A `Processor` MAY choose to recover an `onError` signal. If it chooses to do so, it MUST consider the `Subscription` cancelled, otherwise it MUST propagate the `onError` signal to its Subscribers immediately | +| ID | Rule . | +| ------------------------ | ------------------------------------------------------------------------------------------------------. | +| 1 | A `Processor` represents a processing stage—which is both a `Subscriber` and a `Publisher` and MUST obey the contracts of both. | +| 2 | A `Processor` MAY choose to recover an `onError` signal. If it chooses to do so, it MUST consider the `Subscription` cancelled, otherwise it MUST propagate the `onError` signal to its Subscribers immediately. | While not mandated, it can be a good idea to cancel a `Processors` upstream `Subscription` when/if its last `Subscriber` cancels their `Subscription`, to let the cancellation signal propagate upstream. diff --git a/examples/src/main/java/org/reactivestreams/example/unicast/AsyncIterablePublisher.java b/examples/src/main/java/org/reactivestreams/example/unicast/AsyncIterablePublisher.java index ba64b92d..985144a8 100644 --- a/examples/src/main/java/org/reactivestreams/example/unicast/AsyncIterablePublisher.java +++ b/examples/src/main/java/org/reactivestreams/example/unicast/AsyncIterablePublisher.java @@ -81,9 +81,11 @@ final class SubscriptionImpl implements Subscription, Runnable { private void doRequest(final long n) { if (n < 1) terminateDueTo(new IllegalArgumentException(subscriber + " violated the Reactive Streams rule 3.9 by requesting a non-positive number of elements.")); - else if (demand + n < 1) - terminateDueTo(new IllegalStateException(subscriber + " violated the Reactive Streams rule 3.17 by demanding more elements than Long.MAX_VALUE.")); - else { + else if (demand + n < 1) { + // As governed by rule 3.17, when demand overflows `Long.MAX_VALUE` we treat the signalled demand as "effectively unbounded" + demand = Long.MAX_VALUE; // Here we protect from the overflow and treat it as "effectively unbounded" + doSend(); // Then we proceed with sending data downstream + } else { demand += n; // Here we record the downstream demand doSend(); // Then we can proceed with sending data downstream } diff --git a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java index 0b6fb72c..f2774795 100644 --- a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java @@ -362,8 +362,8 @@ public void required_spec317_mustSupportACumulativePendingElementCountUpToLongMa } @Override @Test - public void required_spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable { - publisherVerification.required_spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue(); + public void required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable { + publisherVerification.required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue(); } // Verifies rule: https://github.com/reactive-streams/reactive-streams#1.4 diff --git a/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java b/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java index 893a0802..a2d5eaa2 100644 --- a/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java @@ -940,7 +940,7 @@ public void required_spec317_mustSupportACumulativePendingElementCountUpToLongMa activePublisherTest(totalElements, true, new PublisherTestRun() { @Override public void run(Publisher pub) throws Throwable { - ManualSubscriber sub = env.newManualSubscriber(pub); + final ManualSubscriber sub = env.newManualSubscriber(pub); sub.request(Long.MAX_VALUE / 2); // pending = Long.MAX_VALUE / 2 sub.request(Long.MAX_VALUE / 2); // pending = Long.MAX_VALUE - 1 sub.request(1); // pending = Long.MAX_VALUE @@ -948,17 +948,22 @@ public void run(Publisher pub) throws Throwable { sub.nextElements(totalElements); sub.expectCompletion(); - env.verifyNoAsyncErrors(); + try { + env.verifyNoAsyncErrors(); + } finally { + sub.cancel(); + } + } }); } // Verifies rule: https://github.com/reactive-streams/reactive-streams#3.17 @Override @Test - public void required_spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable { + public void required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable { activePublisherTest(Integer.MAX_VALUE, false, new PublisherTestRun() { @Override public void run(Publisher pub) throws Throwable { - ManualSubscriberWithSubscriptionSupport sub = new BlackholeSubscriberWithSubscriptionSupport(env) { + final ManualSubscriberWithSubscriptionSupport sub = new BlackholeSubscriberWithSubscriptionSupport(env) { // arbitrarily set limit on nuber of request calls signalled, we expect overflow after already 2 calls, // so 10 is relatively high and safe even if arbitrarily chosen int callsCounter = 10; @@ -982,10 +987,12 @@ public void onNext(T element) { // we're pretty sure to overflow from those sub.request(1); - sub.expectErrorWithMessage(IllegalStateException.class, "3.17"); - - // onError must be signalled only once, even with in-flight other request() messages that would trigger overflow again - env.verifyNoAsyncErrors(env.defaultTimeoutMillis()); + // no onError should be signalled + try { + env.verifyNoAsyncErrors(env.defaultTimeoutMillis()); + } finally { + sub.cancel(); + } } }); } @@ -1105,4 +1112,4 @@ public void notVerified(String message) { throw new SkipException(message); } -} \ No newline at end of file +} diff --git a/tck/src/main/java/org/reactivestreams/tck/support/PublisherVerificationRules.java b/tck/src/main/java/org/reactivestreams/tck/support/PublisherVerificationRules.java index 134c60db..42c3ba7d 100644 --- a/tck/src/main/java/org/reactivestreams/tck/support/PublisherVerificationRules.java +++ b/tck/src/main/java/org/reactivestreams/tck/support/PublisherVerificationRules.java @@ -39,5 +39,5 @@ public interface PublisherVerificationRules { void required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber() throws Throwable; void required_spec317_mustSupportAPendingElementCountUpToLongMaxValue() throws Throwable; void required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue() throws Throwable; - void required_spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable; + void required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable; } \ No newline at end of file diff --git a/tck/src/test/java/org/reactivestreams/tck/PublisherVerificationTest.java b/tck/src/test/java/org/reactivestreams/tck/PublisherVerificationTest.java index 3511415e..ba557cfe 100644 --- a/tck/src/test/java/org/reactivestreams/tck/PublisherVerificationTest.java +++ b/tck/src/test/java/org/reactivestreams/tck/PublisherVerificationTest.java @@ -473,17 +473,7 @@ public void required_spec317_mustSupportAPendingElementCountUpToLongMaxValue_sho } @Test - public void required_spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue_shouldFail_onAsynchDemandIgnoringPublisher() throws Throwable { - final ExecutorService signallersPool = Executors.newFixedThreadPool(2); - requireTestFailure(new ThrowingRunnable() { - @Override public void run() throws Throwable { - demandIgnoringAsynchronousPublisherVerification(signallersPool).required_spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue(); - } - }, "Expected onError(java.lang.IllegalStateException)"); - } - - @Test - public void required_spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue_shouldFail_onSynchDemandIgnoringPublisher() throws Throwable { + public void required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue_shouldFail_onSynchOverflowingPublisher() throws Throwable { requireTestFailure(new ThrowingRunnable() { @Override public void run() throws Throwable { customPublisherVerification(new Publisher() { @@ -494,6 +484,12 @@ public void required_spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue_shoul @Override public void request(long n) { // it does not protect from demand overflow! demand += n; + if (demand < 0) { + // overflow + s.onError(new IllegalStateException("Illegally signalling onError (violates rule 3.17)")); // Illegally signal error + } else { + s.onNext(0); + } } @Override public void cancel() { @@ -501,13 +497,13 @@ public void required_spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue_shoul } }); } - }).required_spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue(); + }).required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue(); } - }, "Expected onError(java.lang.IllegalStateException)"); + }, "Async error during test execution: Illegally signalling onError (violates rule 3.17)"); } @Test - public void required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue_shouldFail_overflowingDemand() throws Throwable { + public void required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue_shouldFailWhenErrorSignalledOnceMaxValueReached() throws Throwable { requireTestFailure(new ThrowingRunnable() { @Override public void run() throws Throwable { customPublisherVerification(new Publisher() { @@ -520,7 +516,8 @@ public void required_spec317_mustSupportACumulativePendingElementCountUpToLongMa // this is a mistake, it should still be able to accumulate such demand if (demand == Long.MAX_VALUE) - s.onError(new IllegalStateException("I'm signalling onError too soon! Cumulative demand equal to Long.MAX_VALUE is OK by the spec.")); + s.onError(new IllegalStateException("Illegally signalling onError too soon! " + + "Cumulative demand equal to Long.MAX_VALUE is legal.")); s.onNext(0); } @@ -528,7 +525,7 @@ public void required_spec317_mustSupportACumulativePendingElementCountUpToLongMa } }).required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue(); } - }, "Async error during test execution: I'm signalling onError too soon!"); + }, "Async error during test execution: Illegally signalling onError too soon!"); }