From 6fbc3c6ca3e00de7f5ed5eddf52d641d15d3ec3c Mon Sep 17 00:00:00 2001 From: Konrad 'ktoso' Malawski Date: Wed, 24 Dec 2014 15:15:50 +0100 Subject: [PATCH 1/5] +tck #181 provides default helperPublisher for subscriber tests + When subscriber verifications are extended using Integer we can use the existing publisher implementations from examples to drive the tests. This allows implementers who only care about implementing Subscribers to not care at all about implementing Publishers + In case the subscriber verification is extended using a custom signal type (defined as "not Integer") we do not magically generate a Publisher of this type, but instead fail the tests telling the implementer that he/she will need to implement a custom helperPublisher which is exactly what we have up until now always demanded from implementations. + We also provide tests to verify the test failures work as expected, and that the messages are indeed helpful. + Updated TCK README.md to clarify when to implement a custom publisher + fixed mistakenly added `` on NumberPublisher - it is bound to Ints + TCK now depends on examples, as it uses the well documented async publisher to provide the default publisher for numbers. As well as in order to allow implementers to reuse those publishers when implementing custom Publishers to drive their Subscriber specs. ! Moved away from createHelper to createElement style of testing subscribers --- .travis.yml | 4 +- .../example/unicast/AsyncSubscriber.java | 1 - .../example/unicast/SyncSubscriber.java | 1 - .../example/unicast/AsyncSubscriberTest.java | 12 ++-- .../example/unicast/SyncSubscriberTest.java | 16 ++--- tck/README.md | 40 ++++++++--- tck/build.gradle | 1 + .../tck/IdentityProcessorVerification.java | 35 +++++++--- .../tck/SubscriberBlackboxVerification.java | 32 ++++----- .../tck/SubscriberWhiteboxVerification.java | 41 +++++------- .../tck/WithHelperPublisher.java | 67 +++++++++++++++++++ .../tck/support/HelperPublisher.java | 34 ++++++++++ .../tck/support/HelperPublisherException.java | 7 ++ .../tck/support/InfiniteHelperPublisher.java | 30 +++++++++ .../IdentityProcessorVerificationTest.java | 23 ++++++- .../SubscriberBlackboxVerificationTest.java | 31 +++++---- .../SubscriberWhiteboxVerificationTest.java | 23 ++++--- 17 files changed, 293 insertions(+), 105 deletions(-) create mode 100644 tck/src/main/java/org/reactivestreams/tck/WithHelperPublisher.java create mode 100644 tck/src/main/java/org/reactivestreams/tck/support/HelperPublisher.java create mode 100644 tck/src/main/java/org/reactivestreams/tck/support/HelperPublisherException.java create mode 100644 tck/src/main/java/org/reactivestreams/tck/support/InfiniteHelperPublisher.java diff --git a/.travis.yml b/.travis.yml index cd68d6b8..8b0f02bd 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,11 +1,11 @@ language: java script: -- ./gradlew check + - ./gradlew check cache: directories: - $HOME/.gradle jdk: -- openjdk6 + - openjdk6 env: global: - TERM=dumb diff --git a/examples/src/main/java/org/reactivestreams/example/unicast/AsyncSubscriber.java b/examples/src/main/java/org/reactivestreams/example/unicast/AsyncSubscriber.java index f8883a22..d5660067 100644 --- a/examples/src/main/java/org/reactivestreams/example/unicast/AsyncSubscriber.java +++ b/examples/src/main/java/org/reactivestreams/example/unicast/AsyncSubscriber.java @@ -1,6 +1,5 @@ package org.reactivestreams.example.unicast; -import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; diff --git a/examples/src/main/java/org/reactivestreams/example/unicast/SyncSubscriber.java b/examples/src/main/java/org/reactivestreams/example/unicast/SyncSubscriber.java index 0ab1dbf6..6f2ef338 100644 --- a/examples/src/main/java/org/reactivestreams/example/unicast/SyncSubscriber.java +++ b/examples/src/main/java/org/reactivestreams/example/unicast/SyncSubscriber.java @@ -1,6 +1,5 @@ package org.reactivestreams.example.unicast; -import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; diff --git a/examples/src/test/java/org/reactivestreams/example/unicast/AsyncSubscriberTest.java b/examples/src/test/java/org/reactivestreams/example/unicast/AsyncSubscriberTest.java index cbd301d0..326d5d0c 100644 --- a/examples/src/test/java/org/reactivestreams/example/unicast/AsyncSubscriberTest.java +++ b/examples/src/test/java/org/reactivestreams/example/unicast/AsyncSubscriberTest.java @@ -1,7 +1,5 @@ package org.reactivestreams.example.unicast; -import java.util.Collections; -import java.util.Iterator; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.tck.SubscriberBlackboxVerification; @@ -31,17 +29,11 @@ public AsyncSubscriberTest() { @Override public Subscriber createSubscriber() { return new AsyncSubscriber(e) { - private long acc; @Override protected boolean whenNext(final Integer element) { return true; } }; } - @SuppressWarnings("unchecked") - @Override public Publisher createHelperPublisher(long elements) { - if (elements > Integer.MAX_VALUE) return new InfiniteIncrementNumberPublisher(e); - else return new NumberIterablePublisher(0, (int)elements, e); - } @Test public void testAccumulation() throws InterruptedException { @@ -64,4 +56,8 @@ public AsyncSubscriberTest() { latch.await(DefaultTimeoutMillis * 10, TimeUnit.MILLISECONDS); assertEquals(i.get(), 45); } + + @Override public Integer createElement(int element) { + return element; + } } diff --git a/examples/src/test/java/org/reactivestreams/example/unicast/SyncSubscriberTest.java b/examples/src/test/java/org/reactivestreams/example/unicast/SyncSubscriberTest.java index 9948b343..53afa873 100644 --- a/examples/src/test/java/org/reactivestreams/example/unicast/SyncSubscriberTest.java +++ b/examples/src/test/java/org/reactivestreams/example/unicast/SyncSubscriberTest.java @@ -1,17 +1,14 @@ package org.reactivestreams.example.unicast; -import java.util.Collections; -import java.util.Iterator; -import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.tck.SubscriberBlackboxVerification; import org.reactivestreams.tck.TestEnvironment; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.AfterClass; -import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; @Test // Must be here for TestNG to find and run this, do not remove public class SyncSubscriberTest extends SubscriberBlackboxVerification { @@ -39,9 +36,8 @@ public SyncSubscriberTest() { } }; } - @SuppressWarnings("unchecked") - @Override public Publisher createHelperPublisher(long elements) { - if (elements > Integer.MAX_VALUE) return new InfiniteIncrementNumberPublisher(e); - else return new NumberIterablePublisher(0, (int)elements, e); + + @Override public Integer createElement(int element) { + return element; } } diff --git a/tck/README.md b/tck/README.md index db9663d2..8a31f091 100644 --- a/tck/README.md +++ b/tck/README.md @@ -180,6 +180,36 @@ Subscriber rules Verification is split up into two files (styles) of tests. The Blackbox Verification tests do not require the implementation under test to be modified at all, yet they are *not* able to verify most rules. In Whitebox Verification, more control over `request()` calls etc. is required in order to validate rules more precisely. +### Helper Publisher implementations +Since testing a `Subscriber` is not possible without a corresponding `Publisher` the TCK Subscriber Verifications +both provide a default "helper publisher" to drive its test and also alow to replace this Publisher with a custom implementation. + +For simple Subscribers which are able to consume elements of *any type*, it is **highly recommmended** to extend the +SubscriberVerification (described below) classes providing the element type `java.lang.Integer`, like so: `... extends SubscriberBlackboxVerification`. +The reason for this is, that the TCK contains a default Publisher implementation which is able to signal `Integer` elements, +thus alowing the implementer to strictly focus on only implementing a proper `Subscriber`, instead of having to implement +an additional Publisher only in order to drive the Subscribers tests. This is especially important for library implementers +which only want to implement a Subscriber – and do not want to spend time or thought on implementing a valid Publisher. + +If however any SubscriberVerification class is extended using a custom element type, e.g. like this `... extends SubscriberBlackboxVerification`, +*the TCK will immediatly fail the entire subscriber test class* as it is unable to properly create signals of type `Message` +(which can be some custom message type the `Subscriber` is able to consume). The exception thrown (`UnableToProvidePublisherException`) +contains some information and directs the implementer towards implementing a custom helper publisher, +which is done by overriding the `Publisher createHelperPublisher(long elements)` method: + +```java +@Override public Publisher createHelperPublisher(long elements) { + return new Publisher() { /* IMPL HERE */ }; +} +``` + +Summing up, we recommend implementing Subscribers which are able to consume any type of element, in this case the TCK +should be driven using `Integer` elements as default publishers are already implemented for this type. If the +`Subscriber` is unable to consume `Integer` elements, the implementer MUST implement a custom `Publisher` that will +be able to signal the required element types. It is of course both possible and recommended to re-use existing +implemenations (which can be seen in the examples sub-project) to create these custom Publishers – an example of +such re-use can be found in [ProvidedHelperPublisherForSubscriberVerificationTest#createStringPublisher](https://github.com/reactive-streams/reactive-streams/blob/master/tck/src/test/java/org/reactivestreams/tck/ProvidedHelperPublisherForSubscriberVerificationTest.java#L215) + ### Subscriber Blackbox Verification Blackbox Verification does not require any additional work except from providing a `Subscriber` and `Publisher` instances to the TCK: @@ -205,11 +235,6 @@ public class MySubscriberBlackboxVerificationTest extends SubscriberBlackboxVeri public Subscriber createSubscriber() { return new MySubscriber(); } - - @Override - public Publisher createHelperPublisher(long elements) { - return new MyRangePublisher(1, elements); - } } ``` @@ -285,11 +310,6 @@ public class MySubscriberWhiteboxVerificationTest extends SubscriberWhiteboxVeri }; } - @Override - public Publisher createHelperPublisher(long elements) { - return new MyRangePublisher(1, elements); - } - } ``` diff --git a/tck/build.gradle b/tck/build.gradle index 6fb509f0..52cd98cd 100644 --- a/tck/build.gradle +++ b/tck/build.gradle @@ -2,5 +2,6 @@ description = 'reactive-streams-tck' dependencies { compile group: 'org.testng', name: 'testng', version:'5.14.10' compile project(':reactive-streams') + compile project(':reactive-streams-examples') } test.useTestNG() diff --git a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java index 8f85d4de..1808c6c3 100644 --- a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java @@ -4,6 +4,7 @@ import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import org.reactivestreams.tck.Annotations.Subscribers; import org.reactivestreams.tck.TestEnvironment.ManualPublisher; import org.reactivestreams.tck.TestEnvironment.ManualSubscriber; import org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport; @@ -17,7 +18,8 @@ import java.util.HashSet; import java.util.Set; -public abstract class IdentityProcessorVerification implements SubscriberWhiteboxVerificationRules, PublisherVerificationRules { +public abstract class IdentityProcessorVerification extends WithHelperPublisher + implements SubscriberWhiteboxVerificationRules, PublisherVerificationRules { private final TestEnvironment env; @@ -65,6 +67,10 @@ public Subscriber createSubscriber(WhiteboxSubscriberProbe probe) { return IdentityProcessorVerification.this.createSubscriber(probe); } + @Override public T createElement(int element) { + return IdentityProcessorVerification.this.createElement(element); + } + @Override public Publisher createHelperPublisher(long elements) { return IdentityProcessorVerification.this.createHelperPublisher(elements); @@ -109,15 +115,24 @@ public boolean skipStochasticTests() { public abstract Processor createIdentityProcessor(int bufferSize); /** - * Helper method required for running the Publisher rules against a Publisher. - * It must create a Publisher for a stream with exactly the given number of elements. - * If {@code elements} is {@code Long.MAX_VALUE} the produced stream must be infinite. - * - * The stream must not produce the same element twice (in case of an infinite stream this requirement - * is relaxed to only apply to the elements that are actually requested during all tests). - * - * @param elements exact number of elements this publisher should emit, - * unless equal to {@code Long.MAX_VALUE} in which case the stream should be effectively infinite + * Helper method required for creating the Publisher to which the tested Subscriber will be subscribed and tested against. + *

+ * By default an asynchronously signalling Publisher is provided, which will use + * {@link org.reactivestreams.tck.SubscriberBlackboxVerification#createElement(int)} to generate elements type + * your Subscriber is able to consume. + *

+ * Sometimes you may want to implement your own custom custom helper Publisher - to validate behaviour of a Subscriber + * when facing a synchronous Publisher for example. If you do, it MUST emit the exact number of elements asked for + * (via the {@code elements} parameter) and MUST also must treat the following numbers of elements in these specific ways: + *

    + *
  • + * If {@code elements} is {@code Long.MAX_VALUE} the produced stream must be infinite. + *
  • + *
  • + * If {@code elements} is {@code 0} the {@code Publisher} should signal {@code onComplete} immediatly. + * In other words, it should represent a "completed stream". + *
  • + *
*/ public abstract Publisher createHelperPublisher(long elements); diff --git a/tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java b/tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java index c4e34b88..29c95b66 100644 --- a/tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java @@ -9,9 +9,14 @@ import org.reactivestreams.tck.support.TestException; import org.reactivestreams.tck.support.SubscriberBlackboxVerificationRules; import org.testng.SkipException; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + import static org.reactivestreams.tck.SubscriberWhiteboxVerification.BlackboxSubscriberProxy; /** @@ -25,7 +30,8 @@ * @see org.reactivestreams.Subscriber * @see org.reactivestreams.Subscription */ -public abstract class SubscriberBlackboxVerification implements SubscriberBlackboxVerificationRules { +public abstract class SubscriberBlackboxVerification extends WithHelperPublisher + implements SubscriberBlackboxVerificationRules { private final TestEnvironment env; @@ -33,28 +39,24 @@ protected SubscriberBlackboxVerification(TestEnvironment env) { this.env = env; } + // USER API + /** * This is the main method you must implement in your test incarnation. * It must create a new {@link org.reactivestreams.Subscriber} instance to be subjected to the testing logic. */ public abstract Subscriber createSubscriber(); + // ENV SETUP + /** - * Helper method required for generating test elements. - * It must create a {@link org.reactivestreams.Publisher} for a stream with exactly the given number of elements. - *

- * It also must treat the following numbers of elements in these specific ways: - *

    - *
  • - * If {@code elements} is {@code Long.MAX_VALUE} the produced stream must be infinite. - *
  • - *
  • - * If {@code elements} is {@code 0} the {@code Publisher} should signal {@code onComplete} immediatly. - * In other words, it should represent a "completed stream". - *
  • - *
+ * Executor service used by the default provided asynchronous Publisher. + * @see #createHelperPublisher(long) */ - public abstract Publisher createHelperPublisher(long elements); + private ExecutorService publisherExecutor; + @BeforeClass public void startPublisherExecutorService() { publisherExecutor = Executors.newFixedThreadPool(4); } + @AfterClass public void shutdownPublisherExecutorService() { if (publisherExecutor != null) publisherExecutor.shutdown(); } + @Override public ExecutorService publisherExecutorService() { return publisherExecutor; } ////////////////////// TEST ENV CLEANUP ///////////////////////////////////// diff --git a/tck/src/main/java/org/reactivestreams/tck/SubscriberWhiteboxVerification.java b/tck/src/main/java/org/reactivestreams/tck/SubscriberWhiteboxVerification.java index acde4217..3fcef13c 100644 --- a/tck/src/main/java/org/reactivestreams/tck/SubscriberWhiteboxVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/SubscriberWhiteboxVerification.java @@ -9,9 +9,14 @@ import org.reactivestreams.tck.support.TestException; import org.reactivestreams.tck.support.SubscriberWhiteboxVerificationRules; import org.testng.SkipException; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; @@ -21,7 +26,8 @@ * @see org.reactivestreams.Subscriber * @see org.reactivestreams.Subscription */ -public abstract class SubscriberWhiteboxVerification implements SubscriberWhiteboxVerificationRules { +public abstract class SubscriberWhiteboxVerification extends WithHelperPublisher + implements SubscriberWhiteboxVerificationRules { private final TestEnvironment env; @@ -29,6 +35,8 @@ protected SubscriberWhiteboxVerification(TestEnvironment env) { this.env = env; } + // USER API + /** * This is the main method you must implement in your test incarnation. * It must create a new {@link org.reactivestreams.Subscriber} instance to be subjected to the testing logic. @@ -38,33 +46,16 @@ protected SubscriberWhiteboxVerification(TestEnvironment env) { */ public abstract Subscriber createSubscriber(WhiteboxSubscriberProbe probe); - /** - * Helper method required for generating test elements. - * It must create a {@link org.reactivestreams.Publisher} for a stream with exactly the given number of elements. - *

- * It also must treat the following numbers of elements in these specific ways: - *

    - *
  • - * If {@code elements} is {@code Long.MAX_VALUE} the produced stream must be infinite. - *
  • - *
  • - * If {@code elements} is {@code 0} the {@code Publisher} should signal {@code onComplete} immediatly. - * In other words, it should represent a "completed stream". - *
  • - *
- */ - public abstract Publisher createHelperPublisher(long elements); + // ENV SETUP /** - * Used to break possibly infinite wait-loops. - * Some Rules use the "eventually stop signalling" wording, which requires the test to spin accepting {@code onNext} - * signals until no more are signalled. In these tests, this value will be used as upper bound on the number of spin iterations. - * - * Override this method in case your implementation synchronously signals very large batches before reacting to cancellation (for example). + * Executor service used by the default provided asynchronous Publisher. + * @see #createHelperPublisher(long) */ - public long maxOnNextSignalsInTest() { - return 100; - } + private ExecutorService publisherExecutor; + @BeforeClass public void startPublisherExecutorService() { publisherExecutor = Executors.newFixedThreadPool(4); } + @AfterClass public void shutdownPublisherExecutorService() { if (publisherExecutor != null) publisherExecutor.shutdown(); } + @Override public ExecutorService publisherExecutorService() { return publisherExecutor; } ////////////////////// TEST ENV CLEANUP ///////////////////////////////////// diff --git a/tck/src/main/java/org/reactivestreams/tck/WithHelperPublisher.java b/tck/src/main/java/org/reactivestreams/tck/WithHelperPublisher.java new file mode 100644 index 00000000..fe8ef6c5 --- /dev/null +++ b/tck/src/main/java/org/reactivestreams/tck/WithHelperPublisher.java @@ -0,0 +1,67 @@ +package org.reactivestreams.tck; + +import org.reactivestreams.Publisher; +import org.reactivestreams.tck.support.Function; +import org.reactivestreams.tck.support.HelperPublisher; +import org.reactivestreams.tck.support.InfiniteHelperPublisher; + +import java.util.concurrent.ExecutorService; + +/** + * Type which is able to create elements based on a seed {@code id} value. + *

+ * Simplest implementations will simply return the incoming id as the element. + * + * @param type of element to be delivered to the Subscriber + */ +public abstract class WithHelperPublisher { + + /** ExecutorService to be used by the provided helper {@link org.reactivestreams.Publisher} */ + public abstract ExecutorService publisherExecutorService(); + + /** + * Implement this method to match your expected element type. + * In case of implementing a simple Subscriber which is able to consume any kind of element simply return the + * incoming {@code element} element. + *

+ * Sometimes the Subscriber may be limited in what type of element it is able to consume, this you may have to implement + * this method such that the emitted element matches the Subscribers requirements. Simplest implementations would be + * to simply pass in the {@code element} as payload of your custom element, such as appending it to a String or other identifier. + * + * @return element of the matching type {@code T} that will be delivered to the tested Subscriber + */ + public abstract T createElement(int element); + + /** + * Helper method required for creating the Publisher to which the tested Subscriber will be subscribed and tested against. + *

+ * By default an asynchronously signalling Publisher is provided, which will use + * {@link org.reactivestreams.tck.SubscriberBlackboxVerification#createElement(int)} to generate elements type + * your Subscriber is able to consume. + *

+ * Sometimes you may want to implement your own custom custom helper Publisher - to validate behaviour of a Subscriber + * when facing a synchronous Publisher for example. If you do, it MUST emit the exact number of elements asked for + * (via the {@code elements} parameter) and MUST also must treat the following numbers of elements in these specific ways: + *

    + *
  • + * If {@code elements} is {@code Long.MAX_VALUE} the produced stream must be infinite. + *
  • + *
  • + * If {@code elements} is {@code 0} the {@code Publisher} should signal {@code onComplete} immediatly. + * In other words, it should represent a "completed stream". + *
  • + *
+ */ + @SuppressWarnings("unchecked") + public Publisher createHelperPublisher(long elements) { + final Function mkElement = new Function() { + @Override public T apply(Integer id) throws Throwable { + return createElement(id); + } + }; + + if (elements > Integer.MAX_VALUE) return new InfiniteHelperPublisher(mkElement, publisherExecutorService()); + else return new HelperPublisher(0, (int) elements, mkElement, publisherExecutorService()); + } + +} diff --git a/tck/src/main/java/org/reactivestreams/tck/support/HelperPublisher.java b/tck/src/main/java/org/reactivestreams/tck/support/HelperPublisher.java new file mode 100644 index 00000000..7168fd0b --- /dev/null +++ b/tck/src/main/java/org/reactivestreams/tck/support/HelperPublisher.java @@ -0,0 +1,34 @@ +package org.reactivestreams.tck.support; + +import java.util.Collections; +import java.util.Iterator; +import java.util.concurrent.Executor; +import org.reactivestreams.Subscription; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Publisher; +import org.reactivestreams.example.unicast.AsyncIterablePublisher; + +public class HelperPublisher extends AsyncIterablePublisher { + + public HelperPublisher(final int from, final int to, final Function create, final Executor executor) { + super(new Iterable() { + { if(from > to) throw new IllegalArgumentException("from must be equal or greater than to!"); } + @Override public Iterator iterator() { + return new Iterator() { + private int at = from; + @Override public boolean hasNext() { return at < to; } + @Override public T next() { + if (!hasNext()) return Collections.emptyList().iterator().next(); + else try { + return create.apply(at++); + } catch (Throwable t) { + throw new HelperPublisherException( + String.format("Failed to create element in %s for id %s!", getClass().getSimpleName(), at - 1), t); + } + } + @Override public void remove() { throw new UnsupportedOperationException(); } + }; + } + }, executor); + } +} \ No newline at end of file diff --git a/tck/src/main/java/org/reactivestreams/tck/support/HelperPublisherException.java b/tck/src/main/java/org/reactivestreams/tck/support/HelperPublisherException.java new file mode 100644 index 00000000..4cb22524 --- /dev/null +++ b/tck/src/main/java/org/reactivestreams/tck/support/HelperPublisherException.java @@ -0,0 +1,7 @@ +package org.reactivestreams.tck.support; + +public final class HelperPublisherException extends RuntimeException { + public HelperPublisherException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/tck/src/main/java/org/reactivestreams/tck/support/InfiniteHelperPublisher.java b/tck/src/main/java/org/reactivestreams/tck/support/InfiniteHelperPublisher.java new file mode 100644 index 00000000..7b7cabda --- /dev/null +++ b/tck/src/main/java/org/reactivestreams/tck/support/InfiniteHelperPublisher.java @@ -0,0 +1,30 @@ +package org.reactivestreams.tck.support; + +import org.reactivestreams.example.unicast.AsyncIterablePublisher; + +import java.util.Iterator; +import java.util.concurrent.Executor; + +public class InfiniteHelperPublisher extends AsyncIterablePublisher { + + public InfiniteHelperPublisher(final Function create, final Executor executor) { + super(new Iterable() { + @Override public Iterator iterator() { + return new Iterator() { + private int at = 0; + + @Override public boolean hasNext() { return true; } + @Override public T next() { + try { + return create.apply(at++); // Wraps around on overflow + } catch (Throwable t) { + throw new HelperPublisherException( + String.format("Failed to create element in %s for id %s!", getClass().getSimpleName(), at - 1), t); + } + } + @Override public void remove() { throw new UnsupportedOperationException(); } + }; + } + }, executor); + } +} \ No newline at end of file diff --git a/tck/src/test/java/org/reactivestreams/tck/IdentityProcessorVerificationTest.java b/tck/src/test/java/org/reactivestreams/tck/IdentityProcessorVerificationTest.java index 0ff89e92..18cfb507 100644 --- a/tck/src/test/java/org/reactivestreams/tck/IdentityProcessorVerificationTest.java +++ b/tck/src/test/java/org/reactivestreams/tck/IdentityProcessorVerificationTest.java @@ -5,8 +5,13 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import org.reactivestreams.tck.support.TCKVerificationSupport; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + /** * Validates that the TCK's {@link IdentityProcessorVerification} fails with nice human readable errors. * Important: Please note that all Processors implemented in this file are *wrong*! @@ -15,6 +20,10 @@ public class IdentityProcessorVerificationTest extends TCKVerificationSupport { static final int DEFAULT_TIMEOUT_MILLIS = 100; + private ExecutorService ex; + @BeforeClass void before() { ex = Executors.newFixedThreadPool(4); } + @AfterClass void after() { if (ex != null) ex.shutdown(); } + @Test public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError_shouldBeIgnored() throws Throwable { requireTestSkip(new ThrowingRunnable() { @@ -24,6 +33,12 @@ public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANo return new NoopProcessor(); } + @Override public ExecutorService publisherExecutorService() { return ex; } + + @Override public Object createElement(int element) { + return null; + } + @Override public Publisher createHelperPublisher(long elements) { return SKIP; } @@ -44,7 +59,7 @@ public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANo public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError_shouldFailWhileWaitingForOnError() throws Throwable { requireTestFailure(new ThrowingRunnable() { @Override public void run() throws Throwable { - new IdentityProcessorVerification(newTestEnvironment(), DEFAULT_TIMEOUT_MILLIS) { + new IdentityProcessorVerification(newTestEnvironment(), DEFAULT_TIMEOUT_MILLIS) { @Override public Processor createIdentityProcessor(int bufferSize) { return new Processor() { @Override public void subscribe(final Subscriber s) { @@ -76,6 +91,10 @@ public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANo }; } + @Override public ExecutorService publisherExecutorService() { return ex; } + + @Override public Integer createElement(int element) { return element; } + @Override public Publisher createHelperPublisher(long elements) { return new Publisher() { @Override public void subscribe(final Subscriber s) { @@ -90,7 +109,7 @@ public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANo }; } - @Override public Publisher createErrorStatePublisher() { + @Override public Publisher createErrorStatePublisher() { return SKIP; } }.required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError(); diff --git a/tck/src/test/java/org/reactivestreams/tck/SubscriberBlackboxVerificationTest.java b/tck/src/test/java/org/reactivestreams/tck/SubscriberBlackboxVerificationTest.java index aefa3efd..3d0d94ad 100644 --- a/tck/src/test/java/org/reactivestreams/tck/SubscriberBlackboxVerificationTest.java +++ b/tck/src/test/java/org/reactivestreams/tck/SubscriberBlackboxVerificationTest.java @@ -1,13 +1,14 @@ package org.reactivestreams.tck; -import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import org.reactivestreams.tck.support.TCKVerificationSupport; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** * Validates that the TCK's {@link org.reactivestreams.tck.SubscriberBlackboxVerification} fails with nice human readable errors. @@ -17,6 +18,10 @@ public class SubscriberBlackboxVerificationTest extends TCKVerificationSupport { static final int DEFAULT_TIMEOUT_MILLIS = 100; + private ExecutorService ex; + @BeforeClass void before() { ex = Executors.newFixedThreadPool(4); } + @AfterClass void after() { if (ex != null) ex.shutdown(); } + @Test public void required_spec201_blackbox_mustSignalDemandViaSubscriptionRequest_shouldFailBy_notGettingRequestCall() throws Throwable { requireTestFailure(new ThrowingRunnable() { @@ -135,15 +140,17 @@ public void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWith /** * Verification using a Subscriber that doesn't do anything on any of the callbacks */ - final SubscriberBlackboxVerification noopSubscriberVerification() { + final SubscriberBlackboxVerification noopSubscriberVerification() throws Exception { return new SubscriberBlackboxVerification(newTestEnvironment()) { @Override public Subscriber createSubscriber() { return new NoopSubscriber(); } - @Override public Publisher createHelperPublisher(long elements) { - return newSimpleIntsPublisher(elements); + @Override public Integer createElement(int element) { + return element; } + + @Override public ExecutorService publisherExecutorService() { return ex; } }; } @@ -167,9 +174,9 @@ final SubscriberBlackboxVerification simpleSubscriberVerification() { }; } - @Override public Publisher createHelperPublisher(long elements) { - return newSimpleIntsPublisher(elements); - } + @Override public Integer createElement(int element) { return element; } + + @Override public ExecutorService publisherExecutorService() { return ex; } }; } @@ -182,9 +189,9 @@ final SubscriberBlackboxVerification customSubscriberVerification(final return sub; } - @Override public Publisher createHelperPublisher(long elements) { - return newSimpleIntsPublisher(elements); - } + @Override public Integer createElement(int element) { return element; } + + @Override public ExecutorService publisherExecutorService() { return ex; } }; } diff --git a/tck/src/test/java/org/reactivestreams/tck/SubscriberWhiteboxVerificationTest.java b/tck/src/test/java/org/reactivestreams/tck/SubscriberWhiteboxVerificationTest.java index cf62f8b8..7a291465 100644 --- a/tck/src/test/java/org/reactivestreams/tck/SubscriberWhiteboxVerificationTest.java +++ b/tck/src/test/java/org/reactivestreams/tck/SubscriberWhiteboxVerificationTest.java @@ -1,17 +1,18 @@ package org.reactivestreams.tck; -import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import org.reactivestreams.tck.SubscriberWhiteboxVerification.SubscriberPuppet; -import org.reactivestreams.tck.SubscriberWhiteboxVerification.SubscriberPuppeteer; import org.reactivestreams.tck.SubscriberWhiteboxVerification.WhiteboxSubscriberProbe; import org.reactivestreams.tck.support.Function; import org.reactivestreams.tck.support.TCKVerificationSupport; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; /** * Validates that the TCK's {@link SubscriberWhiteboxVerification} fails with nice human readable errors. @@ -21,6 +22,10 @@ public class SubscriberWhiteboxVerificationTest extends TCKVerificationSupport { static final int DEFAULT_TIMEOUT_MILLIS = 100; + private ExecutorService ex; + @BeforeClass void before() { ex = Executors.newFixedThreadPool(4); } + @AfterClass void after() { if (ex != null) ex.shutdown(); } + @Test public void required_spec201_mustSignalDemandViaSubscriptionRequest_shouldFailBy_notGettingRequestCall() throws Throwable { // this mostly verifies the probe is injected correctly @@ -335,9 +340,9 @@ public Subscriber createSubscriber(final WhiteboxSubscriberProbe createHelperPublisher(long elements) { - return newSimpleIntsPublisher(elements); - } + @Override public Integer createElement(int element) { return element; } + + @Override public ExecutorService publisherExecutorService() { return ex; } }; } @@ -355,9 +360,9 @@ public Subscriber createSubscriber(WhiteboxSubscriberProbe pro } } - @Override public Publisher createHelperPublisher(long elements) { - return newSimpleIntsPublisher(elements); - } + @Override public Integer createElement(int element) { return element; } + + @Override public ExecutorService publisherExecutorService() { return ex; } }; } From e9ee2cda4167f4a39098494959c35257ff07607e Mon Sep 17 00:00:00 2001 From: Konrad 'ktoso' Malawski Date: Mon, 29 Dec 2014 19:19:26 +0100 Subject: [PATCH 2/5] +tck 2.9 blackbox verification should fully drive publisher It should not use the helper publisher as it may signal asynchronously as well as onComplete in undefined ways - we need to control the publisher in this case because of the very specific test scenario --- .../example/unicast/AsyncSubscriberTest.java | 10 +++- tck/README.md | 60 ++++++++++++------- .../tck/IdentityProcessorVerification.java | 1 - .../tck/SubscriberBlackboxVerification.java | 20 ++++++- .../tck/WithHelperPublisher.java | 3 + .../tck/support/HelperPublisher.java | 3 +- .../tck/support/HelperPublisherException.java | 7 --- .../tck/support/InfiniteHelperPublisher.java | 2 +- .../IdentityProcessorVerificationTest.java | 12 ++-- 9 files changed, 75 insertions(+), 43 deletions(-) delete mode 100644 tck/src/main/java/org/reactivestreams/tck/support/HelperPublisherException.java diff --git a/examples/src/test/java/org/reactivestreams/example/unicast/AsyncSubscriberTest.java b/examples/src/test/java/org/reactivestreams/example/unicast/AsyncSubscriberTest.java index 326d5d0c..d971c2ab 100644 --- a/examples/src/test/java/org/reactivestreams/example/unicast/AsyncSubscriberTest.java +++ b/examples/src/test/java/org/reactivestreams/example/unicast/AsyncSubscriberTest.java @@ -17,14 +17,14 @@ @Test // Must be here for TestNG to find and run this, do not remove public class AsyncSubscriberTest extends SubscriberBlackboxVerification { - final static long DefaultTimeoutMillis = 100; + final static long DEFAULT_TIMEOUT_MILLIS = 100; private ExecutorService e; @BeforeClass void before() { e = Executors.newFixedThreadPool(4); } @AfterClass void after() { if (e != null) e.shutdown(); } public AsyncSubscriberTest() { - super(new TestEnvironment(DefaultTimeoutMillis)); + super(new TestEnvironment(DEFAULT_TIMEOUT_MILLIS)); } @Override public Subscriber createSubscriber() { @@ -53,11 +53,15 @@ public AsyncSubscriberTest() { }; new NumberIterablePublisher(0, 10, e).subscribe(sub); - latch.await(DefaultTimeoutMillis * 10, TimeUnit.MILLISECONDS); + latch.await(DEFAULT_TIMEOUT_MILLIS * 10, TimeUnit.MILLISECONDS); assertEquals(i.get(), 45); } @Override public Integer createElement(int element) { return element; } + + @Override public Publisher createHelperPublisher(long elements) { + return super.createHelperPublisher(elements); + } } diff --git a/tck/README.md b/tck/README.md index 8a31f091..435b3664 100644 --- a/tck/README.md +++ b/tck/README.md @@ -51,7 +51,7 @@ Specification rule abides the following naming convention: `spec###_DESC` where: ``` The prefixes of the names of the test methods are used in order to signify the character of the test. For example, these are the kinds of prefixes you may find: -"required_", "optional_", "stochastic_", "untested_". +"`required_`", "`optional_`", "`stochastic_`", "`untested_`". Explanations: @@ -180,22 +180,37 @@ Subscriber rules Verification is split up into two files (styles) of tests. The Blackbox Verification tests do not require the implementation under test to be modified at all, yet they are *not* able to verify most rules. In Whitebox Verification, more control over `request()` calls etc. is required in order to validate rules more precisely. -### Helper Publisher implementations +### createElement and Helper Publisher implementations Since testing a `Subscriber` is not possible without a corresponding `Publisher` the TCK Subscriber Verifications -both provide a default "helper publisher" to drive its test and also alow to replace this Publisher with a custom implementation. +both provide a default "*helper publisher*" to drive its test and also alow to replace this Publisher with a custom implementation. +The helper publisher is an asynchronous publisher by default - meaning that your subscriber can not blindly assume single threaded execution. -For simple Subscribers which are able to consume elements of *any type*, it is **highly recommmended** to extend the -SubscriberVerification (described below) classes providing the element type `java.lang.Integer`, like so: `... extends SubscriberBlackboxVerification`. -The reason for this is, that the TCK contains a default Publisher implementation which is able to signal `Integer` elements, -thus alowing the implementer to strictly focus on only implementing a proper `Subscriber`, instead of having to implement -an additional Publisher only in order to drive the Subscribers tests. This is especially important for library implementers -which only want to implement a Subscriber – and do not want to spend time or thought on implementing a valid Publisher. +While the `Publisher` implementation is provided, creating the signal elements is not – this is because a given Subscriber +may for example only work with `HashedMessage` or some other specific kind of signal. The TCK is unable to generate such +special messages automatically, so we provide the `T createElement(Integer id)` method to be implemented as part of +Subscriber Verifications which should take the given ID and return an element of type `T` (where `T` is the type of +elements flowing into the `Subscriber`, as known thanks to `... extends WhiteboxSubscriberVerification`) representing +an element of the stream that will be passed on to the Subscriber. -If however any SubscriberVerification class is extended using a custom element type, e.g. like this `... extends SubscriberBlackboxVerification`, -*the TCK will immediatly fail the entire subscriber test class* as it is unable to properly create signals of type `Message` -(which can be some custom message type the `Subscriber` is able to consume). The exception thrown (`UnableToProvidePublisherException`) -contains some information and directs the implementer towards implementing a custom helper publisher, -which is done by overriding the `Publisher createHelperPublisher(long elements)` method: +The simplest valid implemenation is to return the incoming `id` *as the element* in a verification using `Integer`s as element types: + +```java +public class MySubscriberTest extends SubscriberBlackboxVerification { + + // ... + + @Override + public Integer createElement(int element) { return element; } +} +``` + + +The `createElement` method MAY be called from multiple +threads, so in case of more complicated implementations, please be aware of this fact. + +**Very Advanced**: While we do not expect many implementations having to do so, it is possible to take full control of the `Publisher` +which will be driving the TCKs test. You can do this by implementing the `createHelperPublisher` method in which you can implement your +own Publisher which will then be used by the TCK to drive your Subscriber tests: ```java @Override public Publisher createHelperPublisher(long elements) { @@ -203,13 +218,6 @@ which is done by overriding the `Publisher createHelperPublisher(long element } ``` -Summing up, we recommend implementing Subscribers which are able to consume any type of element, in this case the TCK -should be driven using `Integer` elements as default publishers are already implemented for this type. If the -`Subscriber` is unable to consume `Integer` elements, the implementer MUST implement a custom `Publisher` that will -be able to signal the required element types. It is of course both possible and recommended to re-use existing -implemenations (which can be seen in the examples sub-project) to create these custom Publishers – an example of -such re-use can be found in [ProvidedHelperPublisherForSubscriberVerificationTest#createStringPublisher](https://github.com/reactive-streams/reactive-streams/blob/master/tck/src/test/java/org/reactivestreams/tck/ProvidedHelperPublisherForSubscriberVerificationTest.java#L215) - ### Subscriber Blackbox Verification Blackbox Verification does not require any additional work except from providing a `Subscriber` and `Publisher` instances to the TCK: @@ -235,6 +243,11 @@ public class MySubscriberBlackboxVerificationTest extends SubscriberBlackboxVeri public Subscriber createSubscriber() { return new MySubscriber(); } + + @Override + public Integer createElement(int element) { + return element; + } } ``` @@ -310,6 +323,11 @@ public class MySubscriberWhiteboxVerificationTest extends SubscriberWhiteboxVeri }; } + @Override + public Integer createElement(int element) { + return element; + } + } ``` diff --git a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java index 1808c6c3..55276acb 100644 --- a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java @@ -4,7 +4,6 @@ import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; -import org.reactivestreams.tck.Annotations.Subscribers; import org.reactivestreams.tck.TestEnvironment.ManualPublisher; import org.reactivestreams.tck.TestEnvironment.ManualSubscriber; import org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport; diff --git a/tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java b/tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java index 29c95b66..37cb91a9 100644 --- a/tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java @@ -16,6 +16,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; import static org.reactivestreams.tck.SubscriberWhiteboxVerification.BlackboxSubscriberProxy; @@ -224,7 +225,24 @@ public void required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalW blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() { @Override public void run(BlackboxTestStage stage) throws Throwable { - final Publisher pub = createHelperPublisher(0); + final Publisher pub = new Publisher() { + @Override public void subscribe(final Subscriber s) { + s.onSubscribe(new Subscription() { + private boolean completed = false; + + @Override public void request(long n) { + if (!completed) { + completed = true; + s.onComplete(); // Publisher now realises that it is in fact already completed + } + } + + @Override public void cancel() { + // noop, ignore + } + }); + } + }; final Subscriber sub = createSubscriber(); final BlackboxSubscriberProxy probe = stage.createBlackboxSubscriberProxy(env, sub); diff --git a/tck/src/main/java/org/reactivestreams/tck/WithHelperPublisher.java b/tck/src/main/java/org/reactivestreams/tck/WithHelperPublisher.java index fe8ef6c5..2380350a 100644 --- a/tck/src/main/java/org/reactivestreams/tck/WithHelperPublisher.java +++ b/tck/src/main/java/org/reactivestreams/tck/WithHelperPublisher.java @@ -27,6 +27,9 @@ public abstract class WithHelperPublisher { * Sometimes the Subscriber may be limited in what type of element it is able to consume, this you may have to implement * this method such that the emitted element matches the Subscribers requirements. Simplest implementations would be * to simply pass in the {@code element} as payload of your custom element, such as appending it to a String or other identifier. + *

+ * Warning: This method may be called concurrently by the helper publisher, thus it should be implemented in a + * thread-safe manner. * * @return element of the matching type {@code T} that will be delivered to the tested Subscriber */ diff --git a/tck/src/main/java/org/reactivestreams/tck/support/HelperPublisher.java b/tck/src/main/java/org/reactivestreams/tck/support/HelperPublisher.java index 7168fd0b..ba0fce68 100644 --- a/tck/src/main/java/org/reactivestreams/tck/support/HelperPublisher.java +++ b/tck/src/main/java/org/reactivestreams/tck/support/HelperPublisher.java @@ -22,8 +22,7 @@ public HelperPublisher(final int from, final int to, final Function else try { return create.apply(at++); } catch (Throwable t) { - throw new HelperPublisherException( - String.format("Failed to create element in %s for id %s!", getClass().getSimpleName(), at - 1), t); + throw new IllegalStateException(String.format("Failed to create element for id %d!", at - 1), t); } } @Override public void remove() { throw new UnsupportedOperationException(); } diff --git a/tck/src/main/java/org/reactivestreams/tck/support/HelperPublisherException.java b/tck/src/main/java/org/reactivestreams/tck/support/HelperPublisherException.java deleted file mode 100644 index 4cb22524..00000000 --- a/tck/src/main/java/org/reactivestreams/tck/support/HelperPublisherException.java +++ /dev/null @@ -1,7 +0,0 @@ -package org.reactivestreams.tck.support; - -public final class HelperPublisherException extends RuntimeException { - public HelperPublisherException(String message, Throwable cause) { - super(message, cause); - } -} diff --git a/tck/src/main/java/org/reactivestreams/tck/support/InfiniteHelperPublisher.java b/tck/src/main/java/org/reactivestreams/tck/support/InfiniteHelperPublisher.java index 7b7cabda..94897e4b 100644 --- a/tck/src/main/java/org/reactivestreams/tck/support/InfiniteHelperPublisher.java +++ b/tck/src/main/java/org/reactivestreams/tck/support/InfiniteHelperPublisher.java @@ -18,7 +18,7 @@ public InfiniteHelperPublisher(final Function create, final Executor try { return create.apply(at++); // Wraps around on overflow } catch (Throwable t) { - throw new HelperPublisherException( + throw new IllegalStateException( String.format("Failed to create element in %s for id %s!", getClass().getSimpleName(), at - 1), t); } } diff --git a/tck/src/test/java/org/reactivestreams/tck/IdentityProcessorVerificationTest.java b/tck/src/test/java/org/reactivestreams/tck/IdentityProcessorVerificationTest.java index 18cfb507..28f146e0 100644 --- a/tck/src/test/java/org/reactivestreams/tck/IdentityProcessorVerificationTest.java +++ b/tck/src/test/java/org/reactivestreams/tck/IdentityProcessorVerificationTest.java @@ -28,22 +28,20 @@ public class IdentityProcessorVerificationTest extends TCKVerificationSupport { public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError_shouldBeIgnored() throws Throwable { requireTestSkip(new ThrowingRunnable() { @Override public void run() throws Throwable { - new IdentityProcessorVerification(newTestEnvironment(), DEFAULT_TIMEOUT_MILLIS){ - @Override public Processor createIdentityProcessor(int bufferSize) { + new IdentityProcessorVerification(newTestEnvironment(), DEFAULT_TIMEOUT_MILLIS){ + @Override public Processor createIdentityProcessor(int bufferSize) { return new NoopProcessor(); } @Override public ExecutorService publisherExecutorService() { return ex; } - @Override public Object createElement(int element) { - return null; - } + @Override public Integer createElement(int element) { return element; } - @Override public Publisher createHelperPublisher(long elements) { + @Override public Publisher createHelperPublisher(long elements) { return SKIP; } - @Override public Publisher createErrorStatePublisher() { + @Override public Publisher createErrorStatePublisher() { return SKIP; } From b17a364911cbde7bb594a490dd6159b0c8ada20a Mon Sep 17 00:00:00 2001 From: Konrad 'ktoso' Malawski Date: Fri, 2 Jan 2015 23:06:36 +0100 Subject: [PATCH 3/5] +tck timeouts now have default values as well as are ENV overrideable --- .travis.yml | 2 + .../example/unicast/AsyncSubscriberTest.java | 5 +- .../unicast/IterablePublisherTest.java | 5 +- .../example/unicast/SyncSubscriberTest.java | 4 +- ...nboundedIntegerIncrementPublisherTest.java | 4 +- tck/README.md | 74 +++++++++++-- .../tck/IdentityProcessorVerification.java | 23 ++-- .../tck/PublisherVerification.java | 102 +++++++++++++----- .../tck/SubscriberBlackboxVerification.java | 5 +- .../reactivestreams/tck/TestEnvironment.java | 48 +++++++++ .../IdentityProcessorVerificationTest.java | 2 +- .../tck/PublisherVerificationTest.java | 15 ++- .../SubscriberBlackboxVerificationTest.java | 4 +- .../SubscriberWhiteboxVerificationTest.java | 4 +- 14 files changed, 220 insertions(+), 77 deletions(-) diff --git a/.travis.yml b/.travis.yml index 8b0f02bd..f987c6f1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -9,3 +9,5 @@ jdk: env: global: - TERM=dumb + - DEFAULT_TIMEOUT_MILLIS=300 + - PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS=300 diff --git a/examples/src/test/java/org/reactivestreams/example/unicast/AsyncSubscriberTest.java b/examples/src/test/java/org/reactivestreams/example/unicast/AsyncSubscriberTest.java index d971c2ab..6fb98472 100644 --- a/examples/src/test/java/org/reactivestreams/example/unicast/AsyncSubscriberTest.java +++ b/examples/src/test/java/org/reactivestreams/example/unicast/AsyncSubscriberTest.java @@ -17,14 +17,13 @@ @Test // Must be here for TestNG to find and run this, do not remove public class AsyncSubscriberTest extends SubscriberBlackboxVerification { - final static long DEFAULT_TIMEOUT_MILLIS = 100; private ExecutorService e; @BeforeClass void before() { e = Executors.newFixedThreadPool(4); } @AfterClass void after() { if (e != null) e.shutdown(); } public AsyncSubscriberTest() { - super(new TestEnvironment(DEFAULT_TIMEOUT_MILLIS)); + super(new TestEnvironment()); } @Override public Subscriber createSubscriber() { @@ -53,7 +52,7 @@ public AsyncSubscriberTest() { }; new NumberIterablePublisher(0, 10, e).subscribe(sub); - latch.await(DEFAULT_TIMEOUT_MILLIS * 10, TimeUnit.MILLISECONDS); + latch.await(env.defaultTimeoutMillis() * 10, TimeUnit.MILLISECONDS); assertEquals(i.get(), 45); } diff --git a/examples/src/test/java/org/reactivestreams/example/unicast/IterablePublisherTest.java b/examples/src/test/java/org/reactivestreams/example/unicast/IterablePublisherTest.java index ef8cc8af..15d5ec9a 100644 --- a/examples/src/test/java/org/reactivestreams/example/unicast/IterablePublisherTest.java +++ b/examples/src/test/java/org/reactivestreams/example/unicast/IterablePublisherTest.java @@ -15,15 +15,12 @@ @Test // Must be here for TestNG to find and run this, do not remove public class IterablePublisherTest extends PublisherVerification { - final static long DefaultTimeoutMillis = 100; - final static long PublisherReferenceGCTimeoutMillis = 300; - private ExecutorService e; @BeforeClass void before() { e = Executors.newFixedThreadPool(4); } @AfterClass void after() { if (e != null) e.shutdown(); } public IterablePublisherTest() { - super(new TestEnvironment(DefaultTimeoutMillis), PublisherReferenceGCTimeoutMillis); + super(new TestEnvironment()); } @SuppressWarnings("unchecked") diff --git a/examples/src/test/java/org/reactivestreams/example/unicast/SyncSubscriberTest.java b/examples/src/test/java/org/reactivestreams/example/unicast/SyncSubscriberTest.java index 53afa873..c9303a08 100644 --- a/examples/src/test/java/org/reactivestreams/example/unicast/SyncSubscriberTest.java +++ b/examples/src/test/java/org/reactivestreams/example/unicast/SyncSubscriberTest.java @@ -13,14 +13,12 @@ @Test // Must be here for TestNG to find and run this, do not remove public class SyncSubscriberTest extends SubscriberBlackboxVerification { - final static long DefaultTimeoutMillis = 100; - private ExecutorService e; @BeforeClass void before() { e = Executors.newFixedThreadPool(4); } @AfterClass void after() { if (e != null) e.shutdown(); } public SyncSubscriberTest() { - super(new TestEnvironment(DefaultTimeoutMillis)); + super(new TestEnvironment()); } @Override public Subscriber createSubscriber() { diff --git a/examples/src/test/java/org/reactivestreams/example/unicast/UnboundedIntegerIncrementPublisherTest.java b/examples/src/test/java/org/reactivestreams/example/unicast/UnboundedIntegerIncrementPublisherTest.java index 6a5432b4..40548933 100644 --- a/examples/src/test/java/org/reactivestreams/example/unicast/UnboundedIntegerIncrementPublisherTest.java +++ b/examples/src/test/java/org/reactivestreams/example/unicast/UnboundedIntegerIncrementPublisherTest.java @@ -11,15 +11,13 @@ @Test // Must be here for TestNG to find and run this, do not remove public class UnboundedIntegerIncrementPublisherTest extends PublisherVerification { - final static long DefaultTimeoutMillis = 200; - final static long PublisherReferenceGCTimeoutMillis = 500; private ExecutorService e; @BeforeClass void before() { e = Executors.newFixedThreadPool(4); } @AfterClass void after() { if (e != null) e.shutdown(); } public UnboundedIntegerIncrementPublisherTest() { - super(new TestEnvironment(DefaultTimeoutMillis), PublisherReferenceGCTimeoutMillis); + super(new TestEnvironment()); } @Override public Publisher createPublisher(long elements) { diff --git a/tck/README.md b/tck/README.md index 435b3664..21a20978 100644 --- a/tck/README.md +++ b/tck/README.md @@ -132,11 +132,8 @@ import org.reactivestreams.tck.TestEnvironment; public class RangePublisherTest extends PublisherVerification { - public static final long DEFAULT_TIMEOUT_MILLIS = 300L; - public static final long PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS = 1000L; - public RangePublisherTest() { - super(new TestEnvironment(DEFAULT_TIMEOUT_MILLIS), PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS); + super(new TestEnvironment()); } @Override @@ -174,6 +171,39 @@ Notable configuration options include: * `boundedDepthOfOnNextAndRequestRecursion` – which should only be overridden in case of synchronous Publishers. This number will be used to validate if a `Subscription` actually solves the "unbounded recursion" problem (Rule 3.3). +### Timeout configuration +Publisher tests make use of two kinds of timeouts, one is the `defaultTimeoutMillis` which corresponds to all methods used +within the TCK which await for something to happen. The other timeout is `publisherReferenceGCTimeoutMillis` which is only used in order to verify +[Rule 3.13](https://github.com/reactive-streams/reactive-streams#3.13) which defines that subscriber references MUST be dropped +by the Publisher. + +In order to configure these timeouts (for example when running on a slow continious integtation machine), you can either: + +**Use env variables** to set these timeouts, in which case the you can just: + +```bash +export DEFAULT_TIMEOUT_MILLIS=300 +export PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS=500 +``` + +Or **define the timeouts explicitly in code**: + +```java +public class RangePublisherTest extends PublisherVerification { + + public static final long DEFAULT_TIMEOUT_MILLIS = 300L; + public static final long PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS = 500L; + + public RangePublisherTest() { + super(new TestEnvironment(DEFAULT_TIMEOUT_MILLIS), PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS); + } + + // ... +} +``` + +Note that hard-coded values *take precedence* over environment set values (!). + ## Subscriber Verification Subscriber rules Verification is split up into two files (styles) of tests. @@ -233,10 +263,8 @@ import org.reactivestreams.tck.TestEnvironment; public class MySubscriberBlackboxVerificationTest extends SubscriberBlackboxVerification { - public static final long DEFAULT_TIMEOUT_MILLIS = 300L; - public MySubscriberBlackboxVerificationTest() { - super(new TestEnvironment(DEFAULT_TIMEOUT_MILLIS)); + super(new TestEnvironment()); } @Override @@ -273,10 +301,8 @@ import org.reactivestreams.tck.TestEnvironment; public class MySubscriberWhiteboxVerificationTest extends SubscriberWhiteboxVerification { - public static final long DEFAULT_TIMEOUT_MILLIS = 300L; - public MySubscriberWhiteboxVerificationTest() { - super(new TestEnvironment(DEFAULT_TIMEOUT_MILLIS)); + super(new TestEnvironment()); } @Override @@ -331,6 +357,34 @@ public class MySubscriberWhiteboxVerificationTest extends SubscriberWhiteboxVeri } ``` +### Timeout configuration +Similarily to `PublisherVerification`, it is possible to set the timeouts used by the TCK to validate subscriber behaviour. +This can be set either by using env variables or hardcoded explicitly. + +**Use env variables** to set the timeout value to be used by the TCK: + +```bash +export DEFAULT_TIMEOUT_MILLIS=300 +``` + +Or **define the timeout explicitly in code**: + +```java +public class MySubscriberTest extends BlackboxSubscriberVerification { + + public static final long DEFAULT_TIMEOUT_MILLIS = 300L; + + public RangePublisherTest() { + super(new MySubscriberTest(DEFAULT_TIMEOUT_MILLIS)); + } + + // ... +} +``` + +Note that hard-coded values *take precedence* over environment set values (!). + + ## Subscription Verification Please note that while `Subscription` does **not** have it's own test class, it's rules are validated inside of the `Publisher` and `Subscriber` tests – depending if the Rule demands specific action to be taken by the publishing, or subscribing side of the subscription contract. diff --git a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java index 55276acb..3660200a 100644 --- a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java @@ -36,27 +36,38 @@ public abstract class IdentityProcessorVerification extends WithHelperPublish // without dropping elements. Defaults to `TestEnvironment.TEST_BUFFER_SIZE`. private final int processorBufferSize; + /** + * Test class must specify the expected time it takes for the publisher to + * shut itself down when the the last downstream {@code Subscription} is cancelled. + * + * The processor will be required to be able to buffer {@code TestEnvironment.TEST_BUFFER_SIZE} elements. + */ + @SuppressWarnings("unused") + public IdentityProcessorVerification(final TestEnvironment env) { + this(env, PublisherVerification.envPublisherReferenceGCTimeoutMillis(), TestEnvironment.TEST_BUFFER_SIZE); + } + /** * Test class must specify the expected time it takes for the publisher to * shut itself down when the the last downstream {@code Subscription} is cancelled. * * The processor will be required to be able to buffer {@code TestEnvironment.TEST_BUFFER_SIZE} elements. * - * @param publisherShutdownTimeoutMillis expected time which a processor requires to shut itself down + * @param publisherReferenceGCTimeoutMillis used to determine after how much time a reference to a Subscriber should be already dropped by the Publisher. */ @SuppressWarnings("unused") - public IdentityProcessorVerification(final TestEnvironment env, long publisherShutdownTimeoutMillis) { - this(env, publisherShutdownTimeoutMillis, TestEnvironment.TEST_BUFFER_SIZE); + public IdentityProcessorVerification(final TestEnvironment env, long publisherReferenceGCTimeoutMillis) { + this(env, publisherReferenceGCTimeoutMillis, TestEnvironment.TEST_BUFFER_SIZE); } /** * Test class must specify the expected time it takes for the publisher to * shut itself down when the the last downstream {@code Subscription} is cancelled. * - * @param publisherShutdownTimeoutMillis expected time which a processor requires to shut itself down + * @param publisherReferenceGCTimeoutMillis used to determine after how much time a reference to a Subscriber should be already dropped by the Publisher. * @param processorBufferSize number of elements the processor is required to be able to buffer. */ - public IdentityProcessorVerification(final TestEnvironment env, long publisherShutdownTimeoutMillis, int processorBufferSize) { + public IdentityProcessorVerification(final TestEnvironment env, long publisherReferenceGCTimeoutMillis, int processorBufferSize) { this.env = env; this.processorBufferSize = processorBufferSize; @@ -76,7 +87,7 @@ public Publisher createHelperPublisher(long elements) { } }; - publisherVerification = new PublisherVerification(env, publisherShutdownTimeoutMillis) { + publisherVerification = new PublisherVerification(env, publisherReferenceGCTimeoutMillis) { @Override public Publisher createPublisher(long elements) { return IdentityProcessorVerification.this.createPublisher(elements); diff --git a/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java b/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java index 4033461e..64f1ab54 100644 --- a/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java @@ -33,14 +33,50 @@ */ public abstract class PublisherVerification implements PublisherVerificationRules { + private static final String PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS_ENV = "PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS"; + private static final long DEFAULT_PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS = 300L; + private final TestEnvironment env; private final long publisherReferenceGCTimeoutMillis; + /** + * Constructs a new verification class using the given env and configuration. + * + * @param publisherReferenceGCTimeoutMillis used to determine after how much time a reference to a Subscriber should be already dropped by the Publisher. + */ public PublisherVerification(TestEnvironment env, long publisherReferenceGCTimeoutMillis) { this.env = env; this.publisherReferenceGCTimeoutMillis = publisherReferenceGCTimeoutMillis; } + /** + * Constructs a new verification class using the given env and configuration. + * + * The value for {@code publisherReferenceGCTimeoutMillis} will be obtained by using {@link PublisherVerification#envPublisherReferenceGCTimeoutMillis()}. + */ + public PublisherVerification(TestEnvironment env) { + this.env = env; + this.publisherReferenceGCTimeoutMillis = envPublisherReferenceGCTimeoutMillis(); + } + + /** + * Tries to parse the env variable {@code PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS} as long and returns the value if present, + * OR its default value ({@link PublisherVerification#DEFAULT_PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS}). + * + * This value is used to determine after how much time a reference to a Subscriber should be already dropped by the Publisher. + * + * @throws java.lang.IllegalArgumentException when unable to parse the env variable + */ + public static long envPublisherReferenceGCTimeoutMillis() { + final String envMillis = System.getenv(PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS_ENV); + if (envMillis == null) return DEFAULT_PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS; + else try { + return Long.parseLong(envMillis); + } catch(NumberFormatException ex) { + throw new IllegalArgumentException(String.format("Unable to parse %s env value [%s] as long!", PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS_ENV, envMillis), ex); + } + } + /** * This is the main method you must implement in your test incarnation. * It must create a Publisher for a stream with exactly the given number of elements. @@ -88,6 +124,14 @@ public long boundedDepthOfOnNextAndRequestRecursion() { return 1; } + /** + * The amount of time after which a cancelled Subscriber reference should be dropped. + * See Rule 3.13 for details. + */ + final public long publisherReferenceGCTimeoutMillis() { + return publisherReferenceGCTimeoutMillis; + } + ////////////////////// TEST ENV CLEANUP ///////////////////////////////////// @BeforeMethod @@ -298,20 +342,20 @@ public void onComplete() { public void optional_spec104_mustSignalOnErrorWhenFails() throws Throwable { try { whenHasErrorPublisherTest(new PublisherTestRun() { - @Override - public void run(final Publisher pub) throws InterruptedException { - final Latch latch = new Latch(env); - pub.subscribe(new TestEnvironment.TestSubscriber(env) { - @Override - public void onError(Throwable cause) { - latch.assertOpen(String.format("Error-state Publisher %s called `onError` twice on new Subscriber", pub)); - latch.close(); - } - }); + @Override + public void run(final Publisher pub) throws InterruptedException { + final Latch latch = new Latch(env); + pub.subscribe(new TestEnvironment.TestSubscriber(env) { + @Override + public void onError(Throwable cause) { + latch.assertOpen(String.format("Error-state Publisher %s called `onError` twice on new Subscriber", pub)); + latch.close(); + } + }); - latch.expectClose(String.format("Error-state Publisher %s did not call `onError` on new Subscriber", pub)); + latch.expectClose(String.format("Error-state Publisher %s did not call `onError` on new Subscriber", pub)); - env.verifyNoAsyncErrors(env.defaultTimeoutMillis()); + env.verifyNoAsyncErrors(env.defaultTimeoutMillis()); } }); } catch (SkipException se) { @@ -404,25 +448,25 @@ public void run(Publisher pub) throws Throwable { @Override @Test public void required_spec112_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe() throws Throwable { whenHasErrorPublisherTest(new PublisherTestRun() { - @Override - public void run(Publisher pub) throws Throwable { - final Latch onErrorLatch = new Latch(env); - ManualSubscriberWithSubscriptionSupport sub = new ManualSubscriberWithSubscriptionSupport(env) { - @Override - public void onError(Throwable cause) { - onErrorLatch.assertOpen("Only one onError call expected"); - onErrorLatch.close(); - } + @Override + public void run(Publisher pub) throws Throwable { + final Latch onErrorLatch = new Latch(env); + ManualSubscriberWithSubscriptionSupport sub = new ManualSubscriberWithSubscriptionSupport(env) { + @Override + public void onError(Throwable cause) { + onErrorLatch.assertOpen("Only one onError call expected"); + onErrorLatch.close(); + } - @Override - public void onSubscribe(Subscription subs) { - env.flop("onSubscribe should not be called if Publisher is unable to subscribe a Subscriber"); - } - }; - pub.subscribe(sub); - onErrorLatch.assertClosed("Should have received onError"); + @Override + public void onSubscribe(Subscription subs) { + env.flop("onSubscribe should not be called if Publisher is unable to subscribe a Subscriber"); + } + }; + pub.subscribe(sub); + onErrorLatch.assertClosed("Should have received onError"); - env.verifyNoAsyncErrors(); + env.verifyNoAsyncErrors(); } }); } diff --git a/tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java b/tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java index 37cb91a9..8a43a625 100644 --- a/tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java @@ -6,8 +6,8 @@ import org.reactivestreams.tck.TestEnvironment.ManualPublisher; import org.reactivestreams.tck.TestEnvironment.ManualSubscriber; import org.reactivestreams.tck.support.Optional; -import org.reactivestreams.tck.support.TestException; import org.reactivestreams.tck.support.SubscriberBlackboxVerificationRules; +import org.reactivestreams.tck.support.TestException; import org.testng.SkipException; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -16,7 +16,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicBoolean; import static org.reactivestreams.tck.SubscriberWhiteboxVerification.BlackboxSubscriberProxy; @@ -34,7 +33,7 @@ public abstract class SubscriberBlackboxVerification extends WithHelperPublisher implements SubscriberBlackboxVerificationRules { - private final TestEnvironment env; + protected final TestEnvironment env; protected SubscriberBlackboxVerification(TestEnvironment env) { this.env = env; diff --git a/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java b/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java index ad1d16bd..e0cb41af 100644 --- a/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java +++ b/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java @@ -6,6 +6,7 @@ import org.reactivestreams.tck.support.SubscriberBufferOverflowException; import org.reactivestreams.tck.support.Optional; +import java.text.NumberFormat; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; @@ -19,6 +20,9 @@ public class TestEnvironment { public static final int TEST_BUFFER_SIZE = 16; + private static final String DEFAULT_TIMEOUT_MILLIS_ENV = "DEFAULT_TIMEOUT_MILLIS"; + private static final long DEFAULT_TIMEOUT_MILLIS = 100; + private final long defaultTimeoutMillis; private final boolean printlnDebug; @@ -51,10 +55,54 @@ public TestEnvironment(long defaultTimeoutMillis) { this(defaultTimeoutMillis, false); } + /** + * Tests must specify the timeout for expected outcome of asynchronous + * interactions. Longer timeout does not invalidate the correctness of + * the implementation, but can in some cases result in longer time to + * run the tests. + * + * The default timeout for all expect* methods will be obtained by either the env variable {@code DEFAULT_TIMEOUT_MILLIS} + * or the default value ({@see TestEnvironment#DEFAULT_TIMEOUT_MILLIS}) will be used. + * + * @param printlnDebug if true, signals such as OnNext / Request / OnComplete etc will be printed to standard output, + * often helpful to pinpoint simple race conditions etc. + */ + public TestEnvironment(boolean printlnDebug) { + this(envDefaultTimeoutMillis(), printlnDebug); + } + + /** + * Tests must specify the timeout for expected outcome of asynchronous + * interactions. Longer timeout does not invalidate the correctness of + * the implementation, but can in some cases result in longer time to + * run the tests. + * + * The default timeout for all expect* methods will be obtained by either the env variable {@code DEFAULT_TIMEOUT_MILLIS} + * or the default value ({@see TestEnvironment#DEFAULT_TIMEOUT_MILLIS}) will be used. + */ + public TestEnvironment() { + this(envDefaultTimeoutMillis()); + } + public long defaultTimeoutMillis() { return defaultTimeoutMillis; } + /** + * Tries to parse the env variable {@code DEFAULT_TIMEOUT_MILLIS} as long and returns the value if present OR its default value. + * + * @throws java.lang.IllegalArgumentException when unable to parse the env variable + */ + public static long envDefaultTimeoutMillis() { + final String envMillis = System.getenv(DEFAULT_TIMEOUT_MILLIS_ENV); + if (envMillis == null) return DEFAULT_TIMEOUT_MILLIS; + else try { + return Long.parseLong(envMillis); + } catch(NumberFormatException ex) { + throw new IllegalArgumentException(String.format("Unable to parse %s env value [%s] as long!", DEFAULT_TIMEOUT_MILLIS_ENV, envMillis), ex); + } + } + /** * To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously. * This method does *NOT* fail the test - it's up to inspections of the error to fail the test if required. diff --git a/tck/src/test/java/org/reactivestreams/tck/IdentityProcessorVerificationTest.java b/tck/src/test/java/org/reactivestreams/tck/IdentityProcessorVerificationTest.java index 28f146e0..a6a02717 100644 --- a/tck/src/test/java/org/reactivestreams/tck/IdentityProcessorVerificationTest.java +++ b/tck/src/test/java/org/reactivestreams/tck/IdentityProcessorVerificationTest.java @@ -18,7 +18,7 @@ */ public class IdentityProcessorVerificationTest extends TCKVerificationSupport { - static final int DEFAULT_TIMEOUT_MILLIS = 100; + static final long DEFAULT_TIMEOUT_MILLIS = TestEnvironment.envDefaultTimeoutMillis(); private ExecutorService ex; @BeforeClass void before() { ex = Executors.newFixedThreadPool(4); } diff --git a/tck/src/test/java/org/reactivestreams/tck/PublisherVerificationTest.java b/tck/src/test/java/org/reactivestreams/tck/PublisherVerificationTest.java index 15274404..83603d9d 100644 --- a/tck/src/test/java/org/reactivestreams/tck/PublisherVerificationTest.java +++ b/tck/src/test/java/org/reactivestreams/tck/PublisherVerificationTest.java @@ -21,9 +21,6 @@ */ public class PublisherVerificationTest extends TCKVerificationSupport { - static final int DEFAULT_TIMEOUT_MILLIS = 100; - static final int GC_TIMEOUT_MILLIS = 300; - @Test public void required_spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements_shouldFailBy_ExpectingOnError() throws Throwable { requireTestFailure(new ThrowingRunnable() { @@ -527,7 +524,7 @@ static class NoopSubscription implements Subscription { * Skips the error state publisher tests. */ final PublisherVerification noopPublisherVerification() { - return new PublisherVerification(newTestEnvironment(), GC_TIMEOUT_MILLIS) { + return new PublisherVerification(newTestEnvironment()) { @Override public Publisher createPublisher(long elements) { return new Publisher() { @@ -548,7 +545,7 @@ final PublisherVerification noopPublisherVerification() { * Verification using a Publisher that never publishes any element */ final PublisherVerification onErroringPublisherVerification() { - return new PublisherVerification(newTestEnvironment(), GC_TIMEOUT_MILLIS) { + return new PublisherVerification(newTestEnvironment()) { @Override public Publisher createPublisher(long elements) { return new Publisher() { @@ -580,7 +577,7 @@ final PublisherVerification customPublisherVerification(final Publisher * Custom Verification using given Publishers */ final PublisherVerification customPublisherVerification(final Publisher pub, final Publisher errorPub) { - return new PublisherVerification(newTestEnvironment(), GC_TIMEOUT_MILLIS) { + return new PublisherVerification(newTestEnvironment()) { @Override public Publisher createPublisher(long elements) { return pub; } @@ -595,7 +592,7 @@ final PublisherVerification customPublisherVerification(final Publisher * Verification using a Publisher that publishes elements even with no demand available */ final PublisherVerification demandIgnoringSynchronousPublisherVerification() { - return new PublisherVerification(newTestEnvironment(), GC_TIMEOUT_MILLIS) { + return new PublisherVerification(newTestEnvironment()) { @Override public Publisher createPublisher(long elements) { return new Publisher() { @@ -639,7 +636,7 @@ final PublisherVerification demandIgnoringAsynchronousPublisherVerifica final AtomicBoolean concurrentAccessCaused = new AtomicBoolean(false); - return new PublisherVerification(newTestEnvironment(), GC_TIMEOUT_MILLIS) { + return new PublisherVerification(newTestEnvironment()) { @Override public Publisher createPublisher(long elements) { return new Publisher() { @@ -709,7 +706,7 @@ final PublisherVerification demandIgnoringAsynchronousPublisherVerifica } private TestEnvironment newTestEnvironment() { - return new TestEnvironment(DEFAULT_TIMEOUT_MILLIS); + return new TestEnvironment(); } diff --git a/tck/src/test/java/org/reactivestreams/tck/SubscriberBlackboxVerificationTest.java b/tck/src/test/java/org/reactivestreams/tck/SubscriberBlackboxVerificationTest.java index 3d0d94ad..aab1979f 100644 --- a/tck/src/test/java/org/reactivestreams/tck/SubscriberBlackboxVerificationTest.java +++ b/tck/src/test/java/org/reactivestreams/tck/SubscriberBlackboxVerificationTest.java @@ -16,8 +16,6 @@ */ public class SubscriberBlackboxVerificationTest extends TCKVerificationSupport { - static final int DEFAULT_TIMEOUT_MILLIS = 100; - private ExecutorService ex; @BeforeClass void before() { ex = Executors.newFixedThreadPool(4); } @AfterClass void after() { if (ex != null) ex.shutdown(); } @@ -223,7 +221,7 @@ static class KeepSubscriptionSubscriber extends NoopSubscriber { } private TestEnvironment newTestEnvironment() { - return new TestEnvironment(DEFAULT_TIMEOUT_MILLIS); + return new TestEnvironment(); } } diff --git a/tck/src/test/java/org/reactivestreams/tck/SubscriberWhiteboxVerificationTest.java b/tck/src/test/java/org/reactivestreams/tck/SubscriberWhiteboxVerificationTest.java index 7a291465..fb7023f2 100644 --- a/tck/src/test/java/org/reactivestreams/tck/SubscriberWhiteboxVerificationTest.java +++ b/tck/src/test/java/org/reactivestreams/tck/SubscriberWhiteboxVerificationTest.java @@ -20,8 +20,6 @@ */ public class SubscriberWhiteboxVerificationTest extends TCKVerificationSupport { - static final int DEFAULT_TIMEOUT_MILLIS = 100; - private ExecutorService ex; @BeforeClass void before() { ex = Executors.newFixedThreadPool(4); } @AfterClass void after() { if (ex != null) ex.shutdown(); } @@ -411,7 +409,7 @@ public SimpleSubscriberWithProbe(WhiteboxSubscriberProbe probe) { } private TestEnvironment newTestEnvironment() { - return new TestEnvironment(DEFAULT_TIMEOUT_MILLIS); + return new TestEnvironment(); } } From 68a5a7e50c4e57b680f5cc5e75984354c036605f Mon Sep 17 00:00:00 2001 From: Konrad Malawski Date: Wed, 7 Jan 2015 15:11:43 +0100 Subject: [PATCH 4/5] +tck fixes spec303 to not leak "still publishing" publisher Since the Publisher could stay alive and keep publishing, and keep getting demand in the recursion depth detecting test it would exceed the buffer size by accumulating these onNext signals, and then cause an error during *a different tests execution* Now we do not accumulate these elements, and also set an explicit limit on how long we try to blow up the recursive stack depth. Stack depth is now also logged in as debug information which should help implementers notice how their imple behave in this scenario. The publisher is now always canceled at the end of this test, in case it did not reach the max nr of elements it is allowed to publish here, nor has it signalled completion. --- .../example/unicast/AsyncSubscriberTest.java | 3 -- tck/README.md | 2 +- .../tck/IdentityProcessorVerification.java | 22 --------- .../tck/PublisherVerification.java | 48 ++++++++++++++++--- .../tck/WithHelperPublisher.java | 5 +- 5 files changed, 45 insertions(+), 35 deletions(-) diff --git a/examples/src/test/java/org/reactivestreams/example/unicast/AsyncSubscriberTest.java b/examples/src/test/java/org/reactivestreams/example/unicast/AsyncSubscriberTest.java index 6fb98472..596ae1da 100644 --- a/examples/src/test/java/org/reactivestreams/example/unicast/AsyncSubscriberTest.java +++ b/examples/src/test/java/org/reactivestreams/example/unicast/AsyncSubscriberTest.java @@ -60,7 +60,4 @@ public AsyncSubscriberTest() { return element; } - @Override public Publisher createHelperPublisher(long elements) { - return super.createHelperPublisher(elements); - } } diff --git a/tck/README.md b/tck/README.md index 21a20978..6180e1f8 100644 --- a/tck/README.md +++ b/tck/README.md @@ -202,7 +202,7 @@ public class RangePublisherTest extends PublisherVerification { } ``` -Note that hard-coded values *take precedence* over environment set values (!). +Note that explicitly passed in values take precedence over values provided by the environment ## Subscriber Verification diff --git a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java index 3660200a..9daa5a0f 100644 --- a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java @@ -124,28 +124,6 @@ public boolean skipStochasticTests() { */ public abstract Processor createIdentityProcessor(int bufferSize); - /** - * Helper method required for creating the Publisher to which the tested Subscriber will be subscribed and tested against. - *

- * By default an asynchronously signalling Publisher is provided, which will use - * {@link org.reactivestreams.tck.SubscriberBlackboxVerification#createElement(int)} to generate elements type - * your Subscriber is able to consume. - *

- * Sometimes you may want to implement your own custom custom helper Publisher - to validate behaviour of a Subscriber - * when facing a synchronous Publisher for example. If you do, it MUST emit the exact number of elements asked for - * (via the {@code elements} parameter) and MUST also must treat the following numbers of elements in these specific ways: - *

    - *
  • - * If {@code elements} is {@code Long.MAX_VALUE} the produced stream must be infinite. - *
  • - *
  • - * If {@code elements} is {@code 0} the {@code Publisher} should signal {@code onComplete} immediatly. - * In other words, it should represent a "completed stream". - *
  • - *
- */ - public abstract Publisher createHelperPublisher(long elements); - /** * Return a Publisher that immediately signals {@code onError} to incoming subscriptions, * or {@code null} in order to skip them. diff --git a/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java b/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java index 64f1ab54..9c287cbf 100644 --- a/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java @@ -637,18 +637,33 @@ protected Long initialValue() { } }; + final Latch runCompleted = new Latch(env); + final ManualSubscriber sub = new ManualSubscriberWithSubscriptionSupport(env) { + // counts the number of signals received, used to break out from possibly infinite request/onNext loops + long signalsReceived = 0L; + @Override public void onNext(T element) { + // NOT calling super.onNext as this test only cares about stack depths, not the actual values of elements + // which also simplifies this test as we do not have to drain the test buffer, which would otherwise be in danger of overflowing + + signalsReceived += 1; stackDepthCounter.set(stackDepthCounter.get() + 1); - super.onNext(element); + env.debug(String.format("%s(recursion depth: %d)::onNext(%s)", this, stackDepthCounter.get(), element)); - final Long callsUntilNow = stackDepthCounter.get(); + final long callsUntilNow = stackDepthCounter.get(); if (callsUntilNow > boundedDepthOfOnNextAndRequestRecursion()) { env.flop(String.format("Got %d onNext calls within thread: %s, yet expected recursive bound was %d", callsUntilNow, Thread.currentThread(), boundedDepthOfOnNextAndRequestRecursion())); // stop the recursive call chain + runCompleted.close(); + return; + } else if (signalsReceived >= oneMoreThanBoundedLimit) { + // since max number of signals reached, and recursion depth not exceeded, we judge this as a success and + // stop the recursive call chain + runCompleted.close(); return; } @@ -657,14 +672,35 @@ public void onNext(T element) { stackDepthCounter.set(stackDepthCounter.get() - 1); } + + @Override + public void onComplete() { + super.onComplete(); + runCompleted.close(); + } + + @Override + public void onError(Throwable cause) { + super.onError(cause); + runCompleted.close(); + } }; - env.subscribe(pub, sub); + try { + env.subscribe(pub, sub); - sub.request(1); // kick-off the `request -> onNext -> request -> onNext -> ...` + sub.request(1); // kick-off the `request -> onNext -> request -> onNext -> ...` - sub.nextElementOrEndOfStream(); - env.verifyNoAsyncErrors(); + final String msg = String.format("Unable to validate call stack depth safety, " + + "awaited at-most %s signals (`maxOnNextSignalsInRecursionTest()`) or completion", + oneMoreThanBoundedLimit); + runCompleted.expectClose(env.defaultTimeoutMillis(), msg); + env.verifyNoAsyncErrors(); + } finally { + // since the request/onNext recursive calls may keep the publisher running "forever", + // we MUST cancel it manually before exiting this test case + sub.cancel(); + } } }); } diff --git a/tck/src/main/java/org/reactivestreams/tck/WithHelperPublisher.java b/tck/src/main/java/org/reactivestreams/tck/WithHelperPublisher.java index 2380350a..879e9b6a 100644 --- a/tck/src/main/java/org/reactivestreams/tck/WithHelperPublisher.java +++ b/tck/src/main/java/org/reactivestreams/tck/WithHelperPublisher.java @@ -38,9 +38,8 @@ public abstract class WithHelperPublisher { /** * Helper method required for creating the Publisher to which the tested Subscriber will be subscribed and tested against. *

- * By default an asynchronously signalling Publisher is provided, which will use - * {@link org.reactivestreams.tck.SubscriberBlackboxVerification#createElement(int)} to generate elements type - * your Subscriber is able to consume. + * By default an asynchronously signalling Publisher is provided, which will use {@link #createElement(int)} + * to generate elements type your Subscriber is able to consume. *

* Sometimes you may want to implement your own custom custom helper Publisher - to validate behaviour of a Subscriber * when facing a synchronous Publisher for example. If you do, it MUST emit the exact number of elements asked for From fdf9a78159c5a246ebeaa2014aee0959a1a2c62d Mon Sep 17 00:00:00 2001 From: Konrad Malawski Date: Tue, 13 Jan 2015 00:25:27 +0100 Subject: [PATCH 5/5] +build make buildlog include tests run and fail stacktraces Thanks to @alkemist for pointing this out --- build.gradle | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/build.gradle b/build.gradle index de6a436d..eecb748a 100644 --- a/build.gradle +++ b/build.gradle @@ -25,6 +25,13 @@ subprojects { } } + tasks.withType(Test) { + testLogging { + exceptionFormat "full" + events "failed", "started" + } + } + repositories { mavenCentral() }