From 2b1c7e2e027ed6d95d9347327a6954db63410047 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Thu, 23 Nov 2017 19:20:44 +0100 Subject: [PATCH 1/6] Allow request-coordinating Processor impls to pass the TCK --- .../tck/IdentityProcessorVerification.java | 72 +++- .../tck/LockstepProcessorTest.java | 337 ++++++++++++++++++ 2 files changed, 393 insertions(+), 16 deletions(-) create mode 100644 tck/src/test/java/org/reactivestreams/tck/LockstepProcessorTest.java diff --git a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java index 2fac7fe7..573c0298 100644 --- a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java @@ -189,6 +189,16 @@ public long maxSupportedSubscribers() { return Long.MAX_VALUE; } + /** + * Override this method and return {@code true} if the {@link Processor} returned by the + * {@link #createIdentityProcessor(int)} coordinates its {@link Subscriber}s + * request amounts and only delivers onNext signals if all Subscribers have + * indicated (via their Subscription#request(long)) they are ready to receive elements. + */ + public boolean doesCoordinatedEmission() { + return false; + } + ////////////////////// TEST ENV CLEANUP ///////////////////////////////////// @BeforeMethod @@ -415,17 +425,33 @@ public TestSetup apply(Long aLong) throws Throwable { final ManualSubscriberWithErrorCollection sub2 = new ManualSubscriberWithErrorCollection(env); env.subscribe(processor, sub2); - sub1.request(1); - expectRequest(); - final T x = sendNextTFromUpstream(); - expectNextElement(sub1, x); - sub1.request(1); + final Exception ex = new RuntimeException("Test exception"); - // sub1 has received one element, and has one demand pending - // sub2 has not yet requested anything + if (doesCoordinatedEmission()) { + sub1.request(1); + sub2.request(1); - final Exception ex = new RuntimeException("Test exception"); - sendError(ex); + final T x = sendNextTFromUpstream(); + + expectNextElement(sub1, x); + expectNextElement(sub2, x); + + sub1.request(1); + sub2.request(1); + + sendError(ex); + } else { + sub1.request(1); + expectRequest(); + final T x = sendNextTFromUpstream(); + expectNextElement(sub1, x); + sub1.request(1); + + // sub1 has received one element, and has one demand pending + // sub2 has not yet requested anything + + sendError(ex); + } sub1.expectError(ex); sub2.expectError(ex); @@ -673,15 +699,29 @@ public TestSetup apply(Long subscribers) throws Throwable { // sub1 now has 18 pending // sub2 has 0 pending - final T z = sendNextTFromUpstream(); - expectNextElement(sub1, z); - sub2.expectNone(); // since sub2 hasn't requested anything yet + if (doesCoordinatedEmission()) { + sub2.expectNone(); // since sub2 hasn't requested anything yet - sub2.request(1); - expectNextElement(sub2, z); + sub2.request(1); - if (totalRequests == 3) { - expectRequest(); + final T z = sendNextTFromUpstream(); + expectNextElement(sub1, z); + expectNextElement(sub2, z); + + if (totalRequests == 3) { + expectRequest(); + } + } else { + final T z = sendNextTFromUpstream(); + expectNextElement(sub1, z); + sub2.expectNone(); // since sub2 hasn't requested anything yet + + sub2.request(1); + expectNextElement(sub2, z); + + if (totalRequests == 3) { + expectRequest(); + } } // to avoid error messages during test harness shutdown diff --git a/tck/src/test/java/org/reactivestreams/tck/LockstepProcessorTest.java b/tck/src/test/java/org/reactivestreams/tck/LockstepProcessorTest.java new file mode 100644 index 00000000..b207d6a1 --- /dev/null +++ b/tck/src/test/java/org/reactivestreams/tck/LockstepProcessorTest.java @@ -0,0 +1,337 @@ +/************************************************************************ + * Licensed under Public Domain (CC0) * + * * + * To the extent possible under law, the person who associated CC0 with * + * this code has waived all copyright and related or neighboring * + * rights to this code. * + * * + * You should have received a copy of the CC0 legalcode along with this * + * work. If not, see .* + ************************************************************************/ + +package org.reactivestreams.tck; + +import org.reactivestreams.*; +import org.testng.annotations.Test; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +@Test +public class LockstepProcessorTest extends IdentityProcessorVerification { + + public LockstepProcessorTest() { + super(new TestEnvironment()); + } + @Override + public Processor createIdentityProcessor(int bufferSize) { + return new LockstepProcessor(); + } + + @Override + public Publisher createFailedPublisher() { + LockstepProcessor proc = new LockstepProcessor(); + proc.onError(new Exception()); + return proc; + } + + @Override + public ExecutorService publisherExecutorService() { + return Executors.newCachedThreadPool(); + } + + @Override + public Integer createElement(int element) { + return element; + } + + @Override + public long maxSupportedSubscribers() { + return 2; + } + + @Override + public boolean doesCoordinatedEmission() { + return true; + } + + static final class LockstepProcessor implements Processor { + + final AtomicReference[]> subscribers = + new AtomicReference[]>(EMPTY); + + static final LockstepSubscription[] EMPTY = new LockstepSubscription[0]; + static final LockstepSubscription[] TERMINATED = new LockstepSubscription[0]; + + volatile boolean done; + Throwable error; + + final AtomicReference upstream = + new AtomicReference(); + + final AtomicReferenceArray queue = + new AtomicReferenceArray(BUFFER_MASK + 1); + + final AtomicLong producerIndex = new AtomicLong(); + + final AtomicLong consumerIndex = new AtomicLong(); + + final AtomicInteger wip = new AtomicInteger(); + + static final int BUFFER_MASK = 127; + + int consumed; + + @Override + public void subscribe(Subscriber s) { + LockstepSubscription subscription = new LockstepSubscription(s, this); + s.onSubscribe(subscription); + if (add(subscription)) { + if (subscription.isCancelled()) { + remove(subscription); + } else { + drain(); + } + } else { + Throwable ex = error; + if (ex != null) { + s.onError(ex); + } else { + s.onComplete(); + } + } + } + + boolean add(LockstepSubscription sub) { + for (;;) { + LockstepSubscription[] a = subscribers.get(); + if (a == TERMINATED) { + return false; + } + int n = a.length; + LockstepSubscription[] b = new LockstepSubscription[n + 1]; + System.arraycopy(a, 0, b, 0, n); + b[n] = sub; + if (subscribers.compareAndSet(a, b)) { + return true; + } + } + } + + void remove(LockstepSubscription sub) { + for (;;) { + LockstepSubscription[] a = subscribers.get(); + int n = a.length; + + if (n == 0) { + break; + } + + int j = -1; + for (int i = 0; i < n; i++) { + if (a[i] == sub) { + j = i; + break; + } + } + + if (j < 0) { + break; + } + LockstepSubscription[] b; + if (n == 1) { + b = TERMINATED; + } else { + b = new LockstepSubscription[n - 1]; + System.arraycopy(a, 0, b, 0, j); + System.arraycopy(a, j + 1, b, j, n - j - 1); + } + if (subscribers.compareAndSet(a, b)) { + if (b == TERMINATED) { + Subscription s = upstream.getAndSet(CancelledSubscription.INSTANCE); + if (s != null) { + s.cancel(); + } + } + break; + } + } + } + + @Override + public void onSubscribe(Subscription s) { + if (upstream.compareAndSet(null, s)) { + s.request(BUFFER_MASK + 1); + } else { + s.cancel(); + } + } + + @Override + public void onNext(T t) { + if (t == null) { + throw new NullPointerException("t == null"); + } + long pi = producerIndex.get(); + queue.lazySet((int)pi & BUFFER_MASK, t); + producerIndex.lazySet(pi + 1); + drain(); + } + + @Override + public void onError(Throwable t) { + if (t == null) { + throw new NullPointerException("t == null"); + } + error = t; + done = true; + drain(); + } + + @Override + public void onComplete() { + done = true; + drain(); + } + + void drain() { + if (wip.getAndIncrement() != 0) { + return; + } + + int limit = (BUFFER_MASK + 1) - ((BUFFER_MASK + 1) >> 2); + int missed = 1; + for (;;) { + + for (;;) { + LockstepSubscription[] subscribers = this.subscribers.get(); + int n = subscribers.length; + + long ci = consumerIndex.get(); + + boolean d = done; + boolean empty = producerIndex.get() == ci; + + if (d) { + Throwable ex = error; + if (ex != null) { + for (LockstepSubscription sub : this.subscribers.getAndSet(TERMINATED)) { + sub.subscriber.onError(ex); + } + break; + } else if (empty) { + for (LockstepSubscription sub : this.subscribers.getAndSet(TERMINATED)) { + sub.subscriber.onComplete(); + } + break; + } + } + + if (n != 0 && !empty) { + long ready = Long.MAX_VALUE; + int c = 0; + for (LockstepSubscription sub : subscribers) { + long req = sub.get(); + if (req != Long.MIN_VALUE) { + ready = Math.min(ready, req - sub.emitted); + c++; + } + } + + if (ready != 0 && c != 0) { + int offset = (int) ci & BUFFER_MASK; + T value = queue.get(offset); + queue.lazySet(offset, null); + consumerIndex.lazySet(ci + 1); + + for (LockstepSubscription sub : subscribers) { + sub.subscriber.onNext(value); + sub.emitted++; + } + + if (++consumed == limit) { + consumed = 0; + upstream.get().request(limit); + } + } else { + break; + } + } else { + break; + } + } + + missed = wip.addAndGet(-missed); + if (missed == 0) { + break; + } + } + } + + static final class LockstepSubscription extends AtomicLong + implements Subscription { + + final Subscriber subscriber; + + final LockstepProcessor parent; + + long emitted; + + LockstepSubscription(Subscriber subscriber, LockstepProcessor parent) { + this.subscriber = subscriber; + this.parent = parent; + } + + @Override + public void request(long n) { + if (n <= 0L) { + cancel(); + subscriber.onError(new IllegalArgumentException("ยง3.9 violated: positive request amount required")); + return; + } + for (;;) { + long current = get(); + if (current == Long.MIN_VALUE || current == Long.MAX_VALUE) { + break; + } + + long updated = current + n; + if (updated < 0L) { + updated = Long.MAX_VALUE; + } + if (compareAndSet(current, updated)) { + parent.drain(); + break; + } + } + } + + @Override + public void cancel() { + if (getAndSet(Long.MIN_VALUE) != Long.MIN_VALUE) { + parent.remove(this); + parent.drain(); + } + } + + boolean isCancelled() { + return get() == Long.MIN_VALUE; + } + } + } + + enum CancelledSubscription implements Subscription { + + INSTANCE; + + @Override + public void request(long n) { + // Subscription already cancelled + } + + @Override + public void cancel() { + // Subscription already cancelled + } + } +} From 09fe154d5025e294f1746714d4fb0b0cd5fcfdba Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 24 Nov 2017 12:00:55 +0100 Subject: [PATCH 2/6] Add test expl. for the use of doesCoordinatedEmission --- .../tck/IdentityProcessorVerification.java | 45 +++++++++++++++++-- 1 file changed, 41 insertions(+), 4 deletions(-) diff --git a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java index 573c0298..b0e9e84d 100644 --- a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java @@ -411,8 +411,27 @@ public void required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue() publisherVerification.required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue(); } - // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.4 - // for multiple subscribers + + /** + * Asks for a {@code Processor} that supports at least 2 {@code Subscriber}s at once and checks if two {@code Subscriber}s + * receive the same items and a terminal {@code Exception}. + *

+ * If the {@code Processor} requests and/or emits items only when all of its {@code Subscriber}s have requested, + * override {@link #doesCoordinatedEmission()} and return {@code true} to indicate this property. + *

+ * Verifies rule: 1.4 with multiple + * {@code Subscriber}s. + *

+ * The test is not executed if {@link IdentityProcessorVerification#maxSupportedSubscribers()} is less than 2. + *

+ * If this test fails, the following could be checked within the {@code Publisher} implementation: + *

    + *
  • The {@code TestEnvironment} has large enough timeout specified in case the {@code Processor} has some time-delay behavior.
  • + *
  • The {@code Processor} is able to fulfill requests of its {@code Subscriber}s independently of each other's requests or + * else override {@link #doesCoordinatedEmission()} and return {@code true} to indicate the test {@code Subscriber}s + * both have to request first.
  • + *
+ */ @Test public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError() throws Throwable { optionalMultipleSubscribersTest(2, new Function() { @@ -669,8 +688,26 @@ public void untested_spec316_requestMustNotThrowExceptionAndMustOnErrorTheSubscr /////////////////////// ADDITIONAL "COROLLARY" TESTS ////////////////////// - // A Processor - // must trigger `requestFromUpstream` for elements that have been requested 'long ago' + /** + * Asks for a {@code Processor} that supports at least 2 {@code Subscriber}s at once and checks requests + * from {@code Subscriber}s will eventually lead to requests towards the upstream of the {@code Processor}. + *

+ * If the {@code Processor} requests and/or emits items only when all of its {@code Subscriber}s have requested, + * override {@link #doesCoordinatedEmission()} and return {@code true} to indicate this property. + *

+ * Verifies rule: 2.1 with multiple + * {@code Subscriber}s. + *

+ * The test is not executed if {@link IdentityProcessorVerification#maxSupportedSubscribers()} is less than 2. + *

+ * If this test fails, the following could be checked within the {@code Publisher} implementation: + *

    + *
  • The {@code TestEnvironment} has large enough timeout specified in case the {@code Processor} has some time-delay behavior.
  • + *
  • The {@code Processor} is able to fulfill requests of its {@code Subscriber}s independently of each other's requests or + * else override {@link #doesCoordinatedEmission()} and return {@code true} to indicate the test {@code Subscriber}s + * both have to request first.
  • + *
+ */ @Test public void required_mustRequestFromUpstreamForElementsThatHaveBeenRequestedLongAgo() throws Throwable { optionalMultipleSubscribersTest(2, new Function() { From 62d385c2a386b1f5356674ae10aa51838bb22431 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Sun, 26 Nov 2017 15:40:56 +0100 Subject: [PATCH 3/6] Remove code duplications --- .../tck/IdentityProcessorVerification.java | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java index b0e9e84d..e7cab2a2 100644 --- a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java @@ -457,8 +457,6 @@ public TestSetup apply(Long aLong) throws Throwable { sub1.request(1); sub2.request(1); - - sendError(ex); } else { sub1.request(1); expectRequest(); @@ -468,9 +466,9 @@ public TestSetup apply(Long aLong) throws Throwable { // sub1 has received one element, and has one demand pending // sub2 has not yet requested anything - - sendError(ex); } + sendError(ex); + sub1.expectError(ex); sub2.expectError(ex); @@ -744,10 +742,6 @@ public TestSetup apply(Long subscribers) throws Throwable { final T z = sendNextTFromUpstream(); expectNextElement(sub1, z); expectNextElement(sub2, z); - - if (totalRequests == 3) { - expectRequest(); - } } else { final T z = sendNextTFromUpstream(); expectNextElement(sub1, z); @@ -755,10 +749,9 @@ public TestSetup apply(Long subscribers) throws Throwable { sub2.request(1); expectNextElement(sub2, z); - - if (totalRequests == 3) { - expectRequest(); - } + } + if (totalRequests == 3) { + expectRequest(); } // to avoid error messages during test harness shutdown From 984063e0974738ca84247cbc12bc8a43e4fb7e2e Mon Sep 17 00:00:00 2001 From: akarnokd Date: Mon, 27 Nov 2017 10:23:01 +0100 Subject: [PATCH 4/6] Error message about doesCoordinatedEmission. --- .../tck/IdentityProcessorVerification.java | 27 ++++++++++++++++--- .../reactivestreams/tck/TestEnvironment.java | 11 ++++++++ 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java index e7cab2a2..89dfd9ac 100644 --- a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java @@ -450,6 +450,8 @@ public TestSetup apply(Long aLong) throws Throwable { sub1.request(1); sub2.request(1); + expectRequest(); + final T x = sendNextTFromUpstream(); expectNextElement(sub1, x); @@ -459,9 +461,18 @@ public TestSetup apply(Long aLong) throws Throwable { sub2.request(1); } else { sub1.request(1); - expectRequest(); + + expectRequest(env.defaultTimeoutMillis(), + "If the Processor coordinates requests/emissions when having multiple Subscribers" + + " at once, please override doesCoordinatedEmission() in this " + + "IdentityProcessorVerification to allow this test to pass."); + final T x = sendNextTFromUpstream(); - expectNextElement(sub1, x); + expectNextElement(sub1, x, + "If the Processor coordinates requests/emissions when having multiple Subscribers" + + " at once, please override doesCoordinatedEmission() in this " + + "IdentityProcessorVerification to allow this test to pass."); + sub1.request(1); // sub1 has received one element, and has one demand pending @@ -744,7 +755,10 @@ public TestSetup apply(Long subscribers) throws Throwable { expectNextElement(sub2, z); } else { final T z = sendNextTFromUpstream(); - expectNextElement(sub1, z); + expectNextElement(sub1, z, + "If the Processor coordinates requests/emissions when having multiple Subscribers" + + " at once, please override doesCoordinatedEmission() in this " + + "IdentityProcessorVerification to allow this test to pass."); sub2.expectNone(); // since sub2 hasn't requested anything yet sub2.request(1); @@ -818,6 +832,13 @@ public void expectNextElement(ManualSubscriber sub, T expected) throws Interr } } + public void expectNextElement(ManualSubscriber sub, T expected, String errorMessageAddendum) throws InterruptedException { + final T elem = sub.nextElement(String.format("timeout while awaiting %s. %s", expected, errorMessageAddendum)); + if (!elem.equals(expected)) { + env.flop(String.format("Received `onNext(%s)` on downstream but expected `onNext(%s)`", elem, expected)); + } + } + public T sendNextTFromUpstream() throws InterruptedException { final T x = nextT(); sendNext(x); diff --git a/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java b/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java index 1d8cf062..8fdad9fc 100644 --- a/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java +++ b/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java @@ -763,6 +763,17 @@ public long expectRequest(long timeoutMillis) throws InterruptedException { } } + + public long expectRequest(long timeoutMillis, String errorMessageAddendum) throws InterruptedException { + long requested = requests.next(timeoutMillis, String.format("Did not receive expected `request` call. %s", errorMessageAddendum)); + if (requested <= 0) { + return env.flopAndFail(String.format("Requests cannot be zero or negative but received request(%s)", requested)); + } else { + pendingDemand += requested; + return requested; + } + } + public void expectExactRequest(long expected) throws InterruptedException { expectExactRequest(expected, env.defaultTimeoutMillis()); } From 721002d79972bcce83d886c12c87a7a2a359e3bb Mon Sep 17 00:00:00 2001 From: akarnokd Date: Mon, 27 Nov 2017 10:28:12 +0100 Subject: [PATCH 5/6] Adjust doesCoordinatedEmission error message --- .../reactivestreams/tck/IdentityProcessorVerification.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java index 89dfd9ac..9d5be07f 100644 --- a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java @@ -464,13 +464,13 @@ public TestSetup apply(Long aLong) throws Throwable { expectRequest(env.defaultTimeoutMillis(), "If the Processor coordinates requests/emissions when having multiple Subscribers" - + " at once, please override doesCoordinatedEmission() in this " + + " at once, please override doesCoordinatedEmission() to return true in this " + "IdentityProcessorVerification to allow this test to pass."); final T x = sendNextTFromUpstream(); expectNextElement(sub1, x, "If the Processor coordinates requests/emissions when having multiple Subscribers" - + " at once, please override doesCoordinatedEmission() in this " + + " at once, please override doesCoordinatedEmission() to return true in this " + "IdentityProcessorVerification to allow this test to pass."); sub1.request(1); @@ -757,7 +757,7 @@ public TestSetup apply(Long subscribers) throws Throwable { final T z = sendNextTFromUpstream(); expectNextElement(sub1, z, "If the Processor coordinates requests/emissions when having multiple Subscribers" - + " at once, please override doesCoordinatedEmission() in this " + + " at once, please override doesCoordinatedEmission() to return true in this " + "IdentityProcessorVerification to allow this test to pass."); sub2.expectNone(); // since sub2 hasn't requested anything yet From 14e821b604a4612c03851ca8d16334a91d4fdb14 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Wed, 29 Nov 2017 18:59:11 +0100 Subject: [PATCH 6/6] Fix javadoc, add Readme.md entry --- tck/README.md | 18 ++++++++++++++++++ .../tck/IdentityProcessorVerification.java | 4 ++-- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/tck/README.md b/tck/README.md index 6fe670c2..872eb8e1 100644 --- a/tck/README.md +++ b/tck/README.md @@ -507,6 +507,24 @@ The `IdentityProcessorVerification` also runs additional "sanity" verifications, Specification rules, but help to verify that a `Processor` won't "get stuck" or face similar problems. Please refer to the sources for details on the tests included. +### Verifying Processors with request-coordinating behavior + +A request-coordinating `Processor` is a kind of a `Processor` implementation which may either + +- coordinate the request amounts of their `Subscriber`s and request only from upstream when all `Subscriber`s requested something; or +- coordinate emissions, requesting a bounded amount upfront from the upstream and then emitting only when all `Subscriber`s have requested something. + +From the downstream `Subscriber`s' perspective (and the TCK), both manifest as lack of emissions, and thus +the following test methods will likely fail with `timeout while awaiting X` error: + +- `required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError` +- `required_mustRequestFromUpstreamForElementsThatHaveBeenRequestedLongAgo` + +To verify such `Processor` implementations, override the `IdentityProcessorVerification.doesCoordinatedEmission()` +method and return `true`, which will instruct the tests above to request from both of their `TestSubscriber`s before +asserting the arrival of the required elements. + + ## Ignoring tests Since the tests are inherited instead of user defined it's not possible to use the usual `@Ignore` annotations to skip certain tests (which may be perfectly reasonable if the implementation has some know constraints on what it diff --git a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java index 9d5be07f..56456ac2 100644 --- a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java @@ -424,7 +424,7 @@ public void required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue() *

* The test is not executed if {@link IdentityProcessorVerification#maxSupportedSubscribers()} is less than 2. *

- * If this test fails, the following could be checked within the {@code Publisher} implementation: + * If this test fails, the following could be checked within the {@code Processor} implementation: *

    *
  • The {@code TestEnvironment} has large enough timeout specified in case the {@code Processor} has some time-delay behavior.
  • *
  • The {@code Processor} is able to fulfill requests of its {@code Subscriber}s independently of each other's requests or @@ -709,7 +709,7 @@ public void untested_spec316_requestMustNotThrowExceptionAndMustOnErrorTheSubscr *

    * The test is not executed if {@link IdentityProcessorVerification#maxSupportedSubscribers()} is less than 2. *

    - * If this test fails, the following could be checked within the {@code Publisher} implementation: + * If this test fails, the following could be checked within the {@code Processor} implementation: *

      *
    • The {@code TestEnvironment} has large enough timeout specified in case the {@code Processor} has some time-delay behavior.
    • *
    • The {@code Processor} is able to fulfill requests of its {@code Subscriber}s independently of each other's requests or