From e3c4e589c03c0bb0877aecd3b462a2cbbf98b435 Mon Sep 17 00:00:00 2001 From: Konrad 'ktoso' Malawski Date: Wed, 24 Dec 2014 15:15:50 +0100 Subject: [PATCH 1/2] +tck #181 provides default helperPublisher for subscriber tests + When subscriber verifications are extended using Integer we can use the existing publisher implementations from examples to drive the tests. This allows implementers who only care about implementing Subscribers to not care at all about implementing Publishers + In case the subscriber verification is extended using a custom signal type (defined as "not Integer") we do not magically generate a Publisher of this type, but instead fail the tests telling the implementer that he/she will need to implement a custom helperPublisher which is exactly what we have up until now always demanded from implementations. + We also provide tests to verify the test failures work as expected, and that the messages are indeed helpful. + Updated TCK README.md to clarify when to implement a custom publisher + fixed mistakenly added `` on NumberPublisher - it is bound to Ints + TCK now depends on examples, as it uses the well documented async publisher to provide the default publisher for numbers. As well as in order to allow implementers to reuse those publishers when implementing custom Publishers to drive their Subscriber specs. ! Moved away from createHelper to createElement style of testing subscribers --- .travis.yml | 4 +- .../example/unicast/AsyncSubscriber.java | 1 - .../example/unicast/SyncSubscriber.java | 1 - .../example/unicast/AsyncSubscriberTest.java | 14 ++-- .../example/unicast/SyncSubscriberTest.java | 16 ++--- tck/README.md | 40 ++++++++--- tck/build.gradle | 1 + .../tck/IdentityProcessorVerification.java | 33 ++++++--- .../tck/SubscriberBlackboxVerification.java | 31 ++++----- .../tck/SubscriberWhiteboxVerification.java | 41 +++++------- .../tck/WithHelperPublisher.java | 67 +++++++++++++++++++ .../tck/support/HelperPublisher.java | 34 ++++++++++ .../tck/support/HelperPublisherException.java | 7 ++ .../tck/support/InfiniteHelperPublisher.java | 30 +++++++++ .../IdentityProcessorVerificationTest.java | 23 ++++++- .../SubscriberBlackboxVerificationTest.java | 31 +++++---- .../SubscriberWhiteboxVerificationTest.java | 23 ++++--- 17 files changed, 290 insertions(+), 107 deletions(-) create mode 100644 tck/src/main/java/org/reactivestreams/tck/WithHelperPublisher.java create mode 100644 tck/src/main/java/org/reactivestreams/tck/support/HelperPublisher.java create mode 100644 tck/src/main/java/org/reactivestreams/tck/support/HelperPublisherException.java create mode 100644 tck/src/main/java/org/reactivestreams/tck/support/InfiniteHelperPublisher.java diff --git a/.travis.yml b/.travis.yml index cd68d6b8..8b0f02bd 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,11 +1,11 @@ language: java script: -- ./gradlew check + - ./gradlew check cache: directories: - $HOME/.gradle jdk: -- openjdk6 + - openjdk6 env: global: - TERM=dumb diff --git a/examples/src/main/java/org/reactivestreams/example/unicast/AsyncSubscriber.java b/examples/src/main/java/org/reactivestreams/example/unicast/AsyncSubscriber.java index f8883a22..d5660067 100644 --- a/examples/src/main/java/org/reactivestreams/example/unicast/AsyncSubscriber.java +++ b/examples/src/main/java/org/reactivestreams/example/unicast/AsyncSubscriber.java @@ -1,6 +1,5 @@ package org.reactivestreams.example.unicast; -import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; diff --git a/examples/src/main/java/org/reactivestreams/example/unicast/SyncSubscriber.java b/examples/src/main/java/org/reactivestreams/example/unicast/SyncSubscriber.java index 0ab1dbf6..6f2ef338 100644 --- a/examples/src/main/java/org/reactivestreams/example/unicast/SyncSubscriber.java +++ b/examples/src/main/java/org/reactivestreams/example/unicast/SyncSubscriber.java @@ -1,6 +1,5 @@ package org.reactivestreams.example.unicast; -import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; diff --git a/examples/src/test/java/org/reactivestreams/example/unicast/AsyncSubscriberTest.java b/examples/src/test/java/org/reactivestreams/example/unicast/AsyncSubscriberTest.java index 8a1f9cf1..326d5d0c 100644 --- a/examples/src/test/java/org/reactivestreams/example/unicast/AsyncSubscriberTest.java +++ b/examples/src/test/java/org/reactivestreams/example/unicast/AsyncSubscriberTest.java @@ -1,7 +1,5 @@ package org.reactivestreams.example.unicast; -import java.util.Collections; -import java.util.Iterator; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.tck.SubscriberBlackboxVerification; @@ -31,17 +29,11 @@ public AsyncSubscriberTest() { @Override public Subscriber createSubscriber() { return new AsyncSubscriber(e) { - private long acc; @Override protected boolean whenNext(final Integer element) { return true; } }; } - @SuppressWarnings("unchecked") - @Override public Publisher createHelperPublisher(long elements) { - if (elements > Integer.MAX_VALUE) return new InfiniteIncrementNumberPublisher(e); - else return new NumberIterablePublisher(0, (int)elements, e); - } @Test public void testAccumulation() throws InterruptedException { @@ -60,8 +52,12 @@ public AsyncSubscriberTest() { } }; - new NumberIterablePublisher(0, 10, e).subscribe(sub); + new NumberIterablePublisher(0, 10, e).subscribe(sub); latch.await(DefaultTimeoutMillis * 10, TimeUnit.MILLISECONDS); assertEquals(i.get(), 45); } + + @Override public Integer createElement(int element) { + return element; + } } diff --git a/examples/src/test/java/org/reactivestreams/example/unicast/SyncSubscriberTest.java b/examples/src/test/java/org/reactivestreams/example/unicast/SyncSubscriberTest.java index 9948b343..53afa873 100644 --- a/examples/src/test/java/org/reactivestreams/example/unicast/SyncSubscriberTest.java +++ b/examples/src/test/java/org/reactivestreams/example/unicast/SyncSubscriberTest.java @@ -1,17 +1,14 @@ package org.reactivestreams.example.unicast; -import java.util.Collections; -import java.util.Iterator; -import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.tck.SubscriberBlackboxVerification; import org.reactivestreams.tck.TestEnvironment; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.AfterClass; -import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; @Test // Must be here for TestNG to find and run this, do not remove public class SyncSubscriberTest extends SubscriberBlackboxVerification { @@ -39,9 +36,8 @@ public SyncSubscriberTest() { } }; } - @SuppressWarnings("unchecked") - @Override public Publisher createHelperPublisher(long elements) { - if (elements > Integer.MAX_VALUE) return new InfiniteIncrementNumberPublisher(e); - else return new NumberIterablePublisher(0, (int)elements, e); + + @Override public Integer createElement(int element) { + return element; } } diff --git a/tck/README.md b/tck/README.md index 1d1ac378..7c85d929 100644 --- a/tck/README.md +++ b/tck/README.md @@ -178,6 +178,36 @@ Subscriber rules Verification is split up into two files (styles) of tests. The Blackbox Verification tests do not require the implementation under test to be modified at all, yet they are *not* able to verify most rules. In Whitebox Verification, more control over `request()` calls etc. is required in order to validate rules more precisely. +### Helper Publisher implementations +Since testing a `Subscriber` is not possible without a corresponding `Publisher` the TCK Subscriber Verifications +both provide a default "helper publisher" to drive its test and also alow to replace this Publisher with a custom implementation. + +For simple Subscribers which are able to consume elements of *any type*, it is **highly recommmended** to extend the +SubscriberVerification (described below) classes providing the element type `java.lang.Integer`, like so: `... extends SubscriberBlackboxVerification`. +The reason for this is, that the TCK contains a default Publisher implementation which is able to signal `Integer` elements, +thus alowing the implementer to strictly focus on only implementing a proper `Subscriber`, instead of having to implement +an additional Publisher only in order to drive the Subscribers tests. This is especially important for library implementers +which only want to implement a Subscriber – and do not want to spend time or thought on implementing a valid Publisher. + +If however any SubscriberVerification class is extended using a custom element type, e.g. like this `... extends SubscriberBlackboxVerification`, +*the TCK will immediatly fail the entire subscriber test class* as it is unable to properly create signals of type `Message` +(which can be some custom message type the `Subscriber` is able to consume). The exception thrown (`UnableToProvidePublisherException`) +contains some information and directs the implementer towards implementing a custom helper publisher, +which is done by overriding the `Publisher createHelperPublisher(long elements)` method: + +```java +@Override public Publisher createHelperPublisher(long elements) { + return new Publisher() { /* IMPL HERE */ }; +} +``` + +Summing up, we recommend implementing Subscribers which are able to consume any type of element, in this case the TCK +should be driven using `Integer` elements as default publishers are already implemented for this type. If the +`Subscriber` is unable to consume `Integer` elements, the implementer MUST implement a custom `Publisher` that will +be able to signal the required element types. It is of course both possible and recommended to re-use existing +implemenations (which can be seen in the examples sub-project) to create these custom Publishers – an example of +such re-use can be found in [ProvidedHelperPublisherForSubscriberVerificationTest#createStringPublisher](https://github.com/reactive-streams/reactive-streams/blob/master/tck/src/test/java/org/reactivestreams/tck/ProvidedHelperPublisherForSubscriberVerificationTest.java#L215) + ### Subscriber Blackbox Verification Blackbox Verification does not require any additional work except from providing a `Subscriber` and `Publisher` instances to the TCK: @@ -203,11 +233,6 @@ public class MySubscriberBlackboxVerificationTest extends SubscriberBlackboxVeri public Subscriber createSubscriber() { return new MySubscriber(); } - - @Override - public Publisher createHelperPublisher(long elements) { - return new MyRangePublisher(1, elements); - } } ``` @@ -283,11 +308,6 @@ public class MySubscriberWhiteboxVerificationTest extends SubscriberWhiteboxVeri }; } - @Override - public Publisher createHelperPublisher(long elements) { - return new MyRangePublisher(1, elements); - } - } ``` diff --git a/tck/build.gradle b/tck/build.gradle index 6fb509f0..52cd98cd 100644 --- a/tck/build.gradle +++ b/tck/build.gradle @@ -2,5 +2,6 @@ description = 'reactive-streams-tck' dependencies { compile group: 'org.testng', name: 'testng', version:'5.14.10' compile project(':reactive-streams') + compile project(':reactive-streams-examples') } test.useTestNG() diff --git a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java index 39d2be98..3cb76186 100644 --- a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java @@ -16,7 +16,7 @@ import java.util.HashSet; import java.util.Set; -public abstract class IdentityProcessorVerification { +public abstract class IdentityProcessorVerification extends WithHelperPublisher { private final TestEnvironment env; @@ -64,6 +64,10 @@ public Subscriber createSubscriber(WhiteboxSubscriberProbe probe) { return IdentityProcessorVerification.this.createSubscriber(probe); } + @Override public T createElement(int element) { + return IdentityProcessorVerification.this.createElement(element); + } + @Override public Publisher createHelperPublisher(long elements) { return IdentityProcessorVerification.this.createHelperPublisher(elements); @@ -108,15 +112,24 @@ public boolean skipStochasticTests() { public abstract Processor createIdentityProcessor(int bufferSize); /** - * Helper method required for running the Publisher rules against a Publisher. - * It must create a Publisher for a stream with exactly the given number of elements. - * If {@code elements} is {@code Long.MAX_VALUE} the produced stream must be infinite. - * - * The stream must not produce the same element twice (in case of an infinite stream this requirement - * is relaxed to only apply to the elements that are actually requested during all tests). - * - * @param elements exact number of elements this publisher should emit, - * unless equal to {@code Long.MAX_VALUE} in which case the stream should be effectively infinite + * Helper method required for creating the Publisher to which the tested Subscriber will be subscribed and tested against. + *

+ * By default an asynchronously signalling Publisher is provided, which will use + * {@link org.reactivestreams.tck.SubscriberBlackboxVerification#createElement(int)} to generate elements type + * your Subscriber is able to consume. + *

+ * Sometimes you may want to implement your own custom custom helper Publisher - to validate behaviour of a Subscriber + * when facing a synchronous Publisher for example. If you do, it MUST emit the exact number of elements asked for + * (via the {@code elements} parameter) and MUST also must treat the following numbers of elements in these specific ways: + *

    + *
  • + * If {@code elements} is {@code Long.MAX_VALUE} the produced stream must be infinite. + *
  • + *
  • + * If {@code elements} is {@code 0} the {@code Publisher} should signal {@code onComplete} immediatly. + * In other words, it should represent a "completed stream". + *
  • + *
*/ public abstract Publisher createHelperPublisher(long elements); diff --git a/tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java b/tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java index ac31c173..f127b950 100644 --- a/tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java @@ -8,9 +8,14 @@ import org.reactivestreams.tck.support.Optional; import org.reactivestreams.tck.support.TestException; import org.testng.SkipException; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + import static org.reactivestreams.tck.Annotations.NotVerified; import static org.reactivestreams.tck.Annotations.Required; import static org.reactivestreams.tck.SubscriberWhiteboxVerification.BlackboxSubscriberProxy; @@ -26,7 +31,7 @@ * @see org.reactivestreams.Subscriber * @see org.reactivestreams.Subscription */ -public abstract class SubscriberBlackboxVerification { +public abstract class SubscriberBlackboxVerification extends WithHelperPublisher { private final TestEnvironment env; @@ -34,28 +39,24 @@ protected SubscriberBlackboxVerification(TestEnvironment env) { this.env = env; } + // USER API + /** * This is the main method you must implement in your test incarnation. * It must create a new {@link org.reactivestreams.Subscriber} instance to be subjected to the testing logic. */ public abstract Subscriber createSubscriber(); + // ENV SETUP + /** - * Helper method required for generating test elements. - * It must create a {@link org.reactivestreams.Publisher} for a stream with exactly the given number of elements. - *

- * It also must treat the following numbers of elements in these specific ways: - *

    - *
  • - * If {@code elements} is {@code Long.MAX_VALUE} the produced stream must be infinite. - *
  • - *
  • - * If {@code elements} is {@code 0} the {@code Publisher} should signal {@code onComplete} immediatly. - * In other words, it should represent a "completed stream". - *
  • - *
+ * Executor service used by the default provided asynchronous Publisher. + * @see #createHelperPublisher(long) */ - public abstract Publisher createHelperPublisher(long elements); + private ExecutorService publisherExecutor; + @BeforeClass public void startPublisherExecutorService() { publisherExecutor = Executors.newFixedThreadPool(4); } + @AfterClass public void shutdownPublisherExecutorService() { if (publisherExecutor != null) publisherExecutor.shutdown(); } + @Override public ExecutorService publisherExecutorService() { return publisherExecutor; } ////////////////////// TEST ENV CLEANUP ///////////////////////////////////// diff --git a/tck/src/main/java/org/reactivestreams/tck/SubscriberWhiteboxVerification.java b/tck/src/main/java/org/reactivestreams/tck/SubscriberWhiteboxVerification.java index 1397d68a..6948e19e 100644 --- a/tck/src/main/java/org/reactivestreams/tck/SubscriberWhiteboxVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/SubscriberWhiteboxVerification.java @@ -5,13 +5,17 @@ import org.reactivestreams.Subscription; import org.reactivestreams.tck.Annotations.Additional; import org.reactivestreams.tck.TestEnvironment.*; -import org.reactivestreams.tck.support.Function; import org.reactivestreams.tck.support.Optional; import org.reactivestreams.tck.support.TestException; import org.testng.SkipException; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + import static org.reactivestreams.tck.Annotations.NotVerified; import static org.reactivestreams.tck.Annotations.Required; import static org.testng.Assert.assertEquals; @@ -23,7 +27,7 @@ * @see org.reactivestreams.Subscriber * @see org.reactivestreams.Subscription */ -public abstract class SubscriberWhiteboxVerification { +public abstract class SubscriberWhiteboxVerification extends WithHelperPublisher { private final TestEnvironment env; @@ -31,6 +35,8 @@ protected SubscriberWhiteboxVerification(TestEnvironment env) { this.env = env; } + // USER API + /** * This is the main method you must implement in your test incarnation. * It must create a new {@link org.reactivestreams.Subscriber} instance to be subjected to the testing logic. @@ -40,33 +46,16 @@ protected SubscriberWhiteboxVerification(TestEnvironment env) { */ public abstract Subscriber createSubscriber(WhiteboxSubscriberProbe probe); - /** - * Helper method required for generating test elements. - * It must create a {@link org.reactivestreams.Publisher} for a stream with exactly the given number of elements. - *

- * It also must treat the following numbers of elements in these specific ways: - *

    - *
  • - * If {@code elements} is {@code Long.MAX_VALUE} the produced stream must be infinite. - *
  • - *
  • - * If {@code elements} is {@code 0} the {@code Publisher} should signal {@code onComplete} immediatly. - * In other words, it should represent a "completed stream". - *
  • - *
- */ - public abstract Publisher createHelperPublisher(long elements); + // ENV SETUP /** - * Used to break possibly infinite wait-loops. - * Some Rules use the "eventually stop signalling" wording, which requires the test to spin accepting {@code onNext} - * signals until no more are signalled. In these tests, this value will be used as upper bound on the number of spin iterations. - * - * Override this method in case your implementation synchronously signals very large batches before reacting to cancellation (for example). + * Executor service used by the default provided asynchronous Publisher. + * @see #createHelperPublisher(long) */ - public long maxOnNextSignalsInTest() { - return 100; - } + private ExecutorService publisherExecutor; + @BeforeClass public void startPublisherExecutorService() { publisherExecutor = Executors.newFixedThreadPool(4); } + @AfterClass public void shutdownPublisherExecutorService() { if (publisherExecutor != null) publisherExecutor.shutdown(); } + @Override public ExecutorService publisherExecutorService() { return publisherExecutor; } ////////////////////// TEST ENV CLEANUP ///////////////////////////////////// diff --git a/tck/src/main/java/org/reactivestreams/tck/WithHelperPublisher.java b/tck/src/main/java/org/reactivestreams/tck/WithHelperPublisher.java new file mode 100644 index 00000000..fe8ef6c5 --- /dev/null +++ b/tck/src/main/java/org/reactivestreams/tck/WithHelperPublisher.java @@ -0,0 +1,67 @@ +package org.reactivestreams.tck; + +import org.reactivestreams.Publisher; +import org.reactivestreams.tck.support.Function; +import org.reactivestreams.tck.support.HelperPublisher; +import org.reactivestreams.tck.support.InfiniteHelperPublisher; + +import java.util.concurrent.ExecutorService; + +/** + * Type which is able to create elements based on a seed {@code id} value. + *

+ * Simplest implementations will simply return the incoming id as the element. + * + * @param type of element to be delivered to the Subscriber + */ +public abstract class WithHelperPublisher { + + /** ExecutorService to be used by the provided helper {@link org.reactivestreams.Publisher} */ + public abstract ExecutorService publisherExecutorService(); + + /** + * Implement this method to match your expected element type. + * In case of implementing a simple Subscriber which is able to consume any kind of element simply return the + * incoming {@code element} element. + *

+ * Sometimes the Subscriber may be limited in what type of element it is able to consume, this you may have to implement + * this method such that the emitted element matches the Subscribers requirements. Simplest implementations would be + * to simply pass in the {@code element} as payload of your custom element, such as appending it to a String or other identifier. + * + * @return element of the matching type {@code T} that will be delivered to the tested Subscriber + */ + public abstract T createElement(int element); + + /** + * Helper method required for creating the Publisher to which the tested Subscriber will be subscribed and tested against. + *

+ * By default an asynchronously signalling Publisher is provided, which will use + * {@link org.reactivestreams.tck.SubscriberBlackboxVerification#createElement(int)} to generate elements type + * your Subscriber is able to consume. + *

+ * Sometimes you may want to implement your own custom custom helper Publisher - to validate behaviour of a Subscriber + * when facing a synchronous Publisher for example. If you do, it MUST emit the exact number of elements asked for + * (via the {@code elements} parameter) and MUST also must treat the following numbers of elements in these specific ways: + *

    + *
  • + * If {@code elements} is {@code Long.MAX_VALUE} the produced stream must be infinite. + *
  • + *
  • + * If {@code elements} is {@code 0} the {@code Publisher} should signal {@code onComplete} immediatly. + * In other words, it should represent a "completed stream". + *
  • + *
+ */ + @SuppressWarnings("unchecked") + public Publisher createHelperPublisher(long elements) { + final Function mkElement = new Function() { + @Override public T apply(Integer id) throws Throwable { + return createElement(id); + } + }; + + if (elements > Integer.MAX_VALUE) return new InfiniteHelperPublisher(mkElement, publisherExecutorService()); + else return new HelperPublisher(0, (int) elements, mkElement, publisherExecutorService()); + } + +} diff --git a/tck/src/main/java/org/reactivestreams/tck/support/HelperPublisher.java b/tck/src/main/java/org/reactivestreams/tck/support/HelperPublisher.java new file mode 100644 index 00000000..7168fd0b --- /dev/null +++ b/tck/src/main/java/org/reactivestreams/tck/support/HelperPublisher.java @@ -0,0 +1,34 @@ +package org.reactivestreams.tck.support; + +import java.util.Collections; +import java.util.Iterator; +import java.util.concurrent.Executor; +import org.reactivestreams.Subscription; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Publisher; +import org.reactivestreams.example.unicast.AsyncIterablePublisher; + +public class HelperPublisher extends AsyncIterablePublisher { + + public HelperPublisher(final int from, final int to, final Function create, final Executor executor) { + super(new Iterable() { + { if(from > to) throw new IllegalArgumentException("from must be equal or greater than to!"); } + @Override public Iterator iterator() { + return new Iterator() { + private int at = from; + @Override public boolean hasNext() { return at < to; } + @Override public T next() { + if (!hasNext()) return Collections.emptyList().iterator().next(); + else try { + return create.apply(at++); + } catch (Throwable t) { + throw new HelperPublisherException( + String.format("Failed to create element in %s for id %s!", getClass().getSimpleName(), at - 1), t); + } + } + @Override public void remove() { throw new UnsupportedOperationException(); } + }; + } + }, executor); + } +} \ No newline at end of file diff --git a/tck/src/main/java/org/reactivestreams/tck/support/HelperPublisherException.java b/tck/src/main/java/org/reactivestreams/tck/support/HelperPublisherException.java new file mode 100644 index 00000000..4cb22524 --- /dev/null +++ b/tck/src/main/java/org/reactivestreams/tck/support/HelperPublisherException.java @@ -0,0 +1,7 @@ +package org.reactivestreams.tck.support; + +public final class HelperPublisherException extends RuntimeException { + public HelperPublisherException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/tck/src/main/java/org/reactivestreams/tck/support/InfiniteHelperPublisher.java b/tck/src/main/java/org/reactivestreams/tck/support/InfiniteHelperPublisher.java new file mode 100644 index 00000000..7b7cabda --- /dev/null +++ b/tck/src/main/java/org/reactivestreams/tck/support/InfiniteHelperPublisher.java @@ -0,0 +1,30 @@ +package org.reactivestreams.tck.support; + +import org.reactivestreams.example.unicast.AsyncIterablePublisher; + +import java.util.Iterator; +import java.util.concurrent.Executor; + +public class InfiniteHelperPublisher extends AsyncIterablePublisher { + + public InfiniteHelperPublisher(final Function create, final Executor executor) { + super(new Iterable() { + @Override public Iterator iterator() { + return new Iterator() { + private int at = 0; + + @Override public boolean hasNext() { return true; } + @Override public T next() { + try { + return create.apply(at++); // Wraps around on overflow + } catch (Throwable t) { + throw new HelperPublisherException( + String.format("Failed to create element in %s for id %s!", getClass().getSimpleName(), at - 1), t); + } + } + @Override public void remove() { throw new UnsupportedOperationException(); } + }; + } + }, executor); + } +} \ No newline at end of file diff --git a/tck/src/test/java/org/reactivestreams/tck/IdentityProcessorVerificationTest.java b/tck/src/test/java/org/reactivestreams/tck/IdentityProcessorVerificationTest.java index 9c62ef92..cdb30384 100644 --- a/tck/src/test/java/org/reactivestreams/tck/IdentityProcessorVerificationTest.java +++ b/tck/src/test/java/org/reactivestreams/tck/IdentityProcessorVerificationTest.java @@ -5,8 +5,13 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import org.reactivestreams.tck.support.TCKVerificationSupport; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + /** * Validates that the TCK's {@link IdentityProcessorVerification} fails with nice human readable errors. * Important: Please note that all Processors implemented in this file are *wrong*! @@ -15,6 +20,10 @@ public class IdentityProcessorVerificationTest extends TCKVerificationSupport { static final int DEFAULT_TIMEOUT_MILLIS = 100; + private ExecutorService ex; + @BeforeClass void before() { ex = Executors.newFixedThreadPool(4); } + @AfterClass void after() { if (ex != null) ex.shutdown(); } + @Test public void spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError_shouldBeIgnored() throws Throwable { requireTestSkip(new ThrowingRunnable() { @@ -24,6 +33,12 @@ public void spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecovera return new NoopProcessor(); } + @Override public ExecutorService publisherExecutorService() { return ex; } + + @Override public Object createElement(int element) { + return null; + } + @Override public Publisher createHelperPublisher(long elements) { return SKIP; } @@ -44,7 +59,7 @@ public void spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecovera public void spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError_shouldFailWhileWaitingForOnError() throws Throwable { requireTestFailure(new ThrowingRunnable() { @Override public void run() throws Throwable { - new IdentityProcessorVerification(newTestEnvironment(), DEFAULT_TIMEOUT_MILLIS) { + new IdentityProcessorVerification(newTestEnvironment(), DEFAULT_TIMEOUT_MILLIS) { @Override public Processor createIdentityProcessor(int bufferSize) { return new Processor() { @Override public void subscribe(final Subscriber s) { @@ -76,6 +91,10 @@ public void spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecovera }; } + @Override public ExecutorService publisherExecutorService() { return ex; } + + @Override public Integer createElement(int element) { return element; } + @Override public Publisher createHelperPublisher(long elements) { return new Publisher() { @Override public void subscribe(final Subscriber s) { @@ -90,7 +109,7 @@ public void spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecovera }; } - @Override public Publisher createErrorStatePublisher() { + @Override public Publisher createErrorStatePublisher() { return SKIP; } }.spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError(); diff --git a/tck/src/test/java/org/reactivestreams/tck/SubscriberBlackboxVerificationTest.java b/tck/src/test/java/org/reactivestreams/tck/SubscriberBlackboxVerificationTest.java index 8cd350be..ced068b0 100644 --- a/tck/src/test/java/org/reactivestreams/tck/SubscriberBlackboxVerificationTest.java +++ b/tck/src/test/java/org/reactivestreams/tck/SubscriberBlackboxVerificationTest.java @@ -1,13 +1,14 @@ package org.reactivestreams.tck; -import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import org.reactivestreams.tck.support.TCKVerificationSupport; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** * Validates that the TCK's {@link org.reactivestreams.tck.SubscriberBlackboxVerification} fails with nice human readable errors. @@ -17,6 +18,10 @@ public class SubscriberBlackboxVerificationTest extends TCKVerificationSupport { static final int DEFAULT_TIMEOUT_MILLIS = 100; + private ExecutorService ex; + @BeforeClass void before() { ex = Executors.newFixedThreadPool(4); } + @AfterClass void after() { if (ex != null) ex.shutdown(); } + @Test public void spec201_blackbox_mustSignalDemandViaSubscriptionRequest_shouldFailBy_notGettingRequestCall() throws Throwable { requireTestFailure(new ThrowingRunnable() { @@ -135,15 +140,17 @@ public void spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithPreceding /** * Verification using a Subscriber that doesn't do anything on any of the callbacks */ - final SubscriberBlackboxVerification noopSubscriberVerification() { + final SubscriberBlackboxVerification noopSubscriberVerification() throws Exception { return new SubscriberBlackboxVerification(newTestEnvironment()) { @Override public Subscriber createSubscriber() { return new NoopSubscriber(); } - @Override public Publisher createHelperPublisher(long elements) { - return newSimpleIntsPublisher(elements); + @Override public Integer createElement(int element) { + return element; } + + @Override public ExecutorService publisherExecutorService() { return ex; } }; } @@ -167,9 +174,9 @@ final SubscriberBlackboxVerification simpleSubscriberVerification() { }; } - @Override public Publisher createHelperPublisher(long elements) { - return newSimpleIntsPublisher(elements); - } + @Override public Integer createElement(int element) { return element; } + + @Override public ExecutorService publisherExecutorService() { return ex; } }; } @@ -182,9 +189,9 @@ final SubscriberBlackboxVerification customSubscriberVerification(final return sub; } - @Override public Publisher createHelperPublisher(long elements) { - return newSimpleIntsPublisher(elements); - } + @Override public Integer createElement(int element) { return element; } + + @Override public ExecutorService publisherExecutorService() { return ex; } }; } diff --git a/tck/src/test/java/org/reactivestreams/tck/SubscriberWhiteboxVerificationTest.java b/tck/src/test/java/org/reactivestreams/tck/SubscriberWhiteboxVerificationTest.java index 6aa2ee94..3256fb4a 100644 --- a/tck/src/test/java/org/reactivestreams/tck/SubscriberWhiteboxVerificationTest.java +++ b/tck/src/test/java/org/reactivestreams/tck/SubscriberWhiteboxVerificationTest.java @@ -1,17 +1,18 @@ package org.reactivestreams.tck; -import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import org.reactivestreams.tck.SubscriberWhiteboxVerification.SubscriberPuppet; -import org.reactivestreams.tck.SubscriberWhiteboxVerification.SubscriberPuppeteer; import org.reactivestreams.tck.SubscriberWhiteboxVerification.WhiteboxSubscriberProbe; import org.reactivestreams.tck.support.Function; import org.reactivestreams.tck.support.TCKVerificationSupport; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; /** * Validates that the TCK's {@link SubscriberWhiteboxVerification} fails with nice human readable errors. @@ -21,6 +22,10 @@ public class SubscriberWhiteboxVerificationTest extends TCKVerificationSupport { static final int DEFAULT_TIMEOUT_MILLIS = 100; + private ExecutorService ex; + @BeforeClass void before() { ex = Executors.newFixedThreadPool(4); } + @AfterClass void after() { if (ex != null) ex.shutdown(); } + @Test public void spec201_mustSignalDemandViaSubscriptionRequest_shouldFailBy_notGettingRequestCall() throws Throwable { // this mostly verifies the probe is injected correctly @@ -351,9 +356,9 @@ public Subscriber createSubscriber(final WhiteboxSubscriberProbe createHelperPublisher(long elements) { - return newSimpleIntsPublisher(elements); - } + @Override public Integer createElement(int element) { return element; } + + @Override public ExecutorService publisherExecutorService() { return ex; } }; } @@ -371,9 +376,9 @@ public Subscriber createSubscriber(WhiteboxSubscriberProbe pro } } - @Override public Publisher createHelperPublisher(long elements) { - return newSimpleIntsPublisher(elements); - } + @Override public Integer createElement(int element) { return element; } + + @Override public ExecutorService publisherExecutorService() { return ex; } }; } From cf715fa07d5e29561376187de0a8fe0879897315 Mon Sep 17 00:00:00 2001 From: Konrad 'ktoso' Malawski Date: Mon, 29 Dec 2014 19:19:26 +0100 Subject: [PATCH 2/2] +tck 2.9 blackbox verification should fully drive publisher It should not use the helper publisher as it may signal asynchronously as well as onComplete in undefined ways - we need to control the publisher in this case because of the very specific test scenario --- .../example/unicast/AsyncSubscriberTest.java | 10 +++- tck/README.md | 58 ++++++++++++------- .../tck/SubscriberBlackboxVerification.java | 20 ++++++- .../tck/support/HelperPublisher.java | 3 +- .../tck/support/HelperPublisherException.java | 7 --- .../tck/support/InfiniteHelperPublisher.java | 2 +- .../IdentityProcessorVerificationTest.java | 12 ++-- 7 files changed, 71 insertions(+), 41 deletions(-) delete mode 100644 tck/src/main/java/org/reactivestreams/tck/support/HelperPublisherException.java diff --git a/examples/src/test/java/org/reactivestreams/example/unicast/AsyncSubscriberTest.java b/examples/src/test/java/org/reactivestreams/example/unicast/AsyncSubscriberTest.java index 326d5d0c..d971c2ab 100644 --- a/examples/src/test/java/org/reactivestreams/example/unicast/AsyncSubscriberTest.java +++ b/examples/src/test/java/org/reactivestreams/example/unicast/AsyncSubscriberTest.java @@ -17,14 +17,14 @@ @Test // Must be here for TestNG to find and run this, do not remove public class AsyncSubscriberTest extends SubscriberBlackboxVerification { - final static long DefaultTimeoutMillis = 100; + final static long DEFAULT_TIMEOUT_MILLIS = 100; private ExecutorService e; @BeforeClass void before() { e = Executors.newFixedThreadPool(4); } @AfterClass void after() { if (e != null) e.shutdown(); } public AsyncSubscriberTest() { - super(new TestEnvironment(DefaultTimeoutMillis)); + super(new TestEnvironment(DEFAULT_TIMEOUT_MILLIS)); } @Override public Subscriber createSubscriber() { @@ -53,11 +53,15 @@ public AsyncSubscriberTest() { }; new NumberIterablePublisher(0, 10, e).subscribe(sub); - latch.await(DefaultTimeoutMillis * 10, TimeUnit.MILLISECONDS); + latch.await(DEFAULT_TIMEOUT_MILLIS * 10, TimeUnit.MILLISECONDS); assertEquals(i.get(), 45); } @Override public Integer createElement(int element) { return element; } + + @Override public Publisher createHelperPublisher(long elements) { + return super.createHelperPublisher(elements); + } } diff --git a/tck/README.md b/tck/README.md index 7c85d929..d4316e53 100644 --- a/tck/README.md +++ b/tck/README.md @@ -178,22 +178,37 @@ Subscriber rules Verification is split up into two files (styles) of tests. The Blackbox Verification tests do not require the implementation under test to be modified at all, yet they are *not* able to verify most rules. In Whitebox Verification, more control over `request()` calls etc. is required in order to validate rules more precisely. -### Helper Publisher implementations +### createElement and Helper Publisher implementations Since testing a `Subscriber` is not possible without a corresponding `Publisher` the TCK Subscriber Verifications -both provide a default "helper publisher" to drive its test and also alow to replace this Publisher with a custom implementation. +both provide a default "*helper publisher*" to drive its test and also alow to replace this Publisher with a custom implementation. +The helper publisher is an asynchronous publisher by default - meaning that your subscriber can not blindly assume single threaded execution. -For simple Subscribers which are able to consume elements of *any type*, it is **highly recommmended** to extend the -SubscriberVerification (described below) classes providing the element type `java.lang.Integer`, like so: `... extends SubscriberBlackboxVerification`. -The reason for this is, that the TCK contains a default Publisher implementation which is able to signal `Integer` elements, -thus alowing the implementer to strictly focus on only implementing a proper `Subscriber`, instead of having to implement -an additional Publisher only in order to drive the Subscribers tests. This is especially important for library implementers -which only want to implement a Subscriber – and do not want to spend time or thought on implementing a valid Publisher. +While the `Publisher` implementation is provided, creating the signal elements is not – this is because a given Subscriber +may for example only work with `HashedMessage` or some other specific kind of signal. The TCK is unable to generate such +special messages automatically, so we provide the `T createElement(Integer id)` method to be implemented as part of +Subscriber Verifications which should take the given ID and return an element of type `T` (where `T` is the type of +elements flowing into the `Subscriber`, as known thanks to `... extends WhiteboxSubscriberVerification`) representing +an element of the stream that will be passed on to the Subscriber. -If however any SubscriberVerification class is extended using a custom element type, e.g. like this `... extends SubscriberBlackboxVerification`, -*the TCK will immediatly fail the entire subscriber test class* as it is unable to properly create signals of type `Message` -(which can be some custom message type the `Subscriber` is able to consume). The exception thrown (`UnableToProvidePublisherException`) -contains some information and directs the implementer towards implementing a custom helper publisher, -which is done by overriding the `Publisher createHelperPublisher(long elements)` method: +The simplest valid implemenation is to return the incoming `id` *as the element* in a verification using `Integer`s as element types: + +```java +public class MySubscriberTest extends SubscriberBlackboxVerification { + + // ... + + @Override + public Integer createElement(int element) { return element; } +} +``` + + +The `createElement` method MAY be called from multiple +threads, so in case of more complicated implementations, please be aware of this fact. + +**Very Advanced**: While we do not expect many implementations having to do so, it is possible to take full control of the `Publisher` +which will be driving the TCKs test. You can do this by implementing the `createHelperPublisher` method in which you can implement your +own Publisher which will then be used by the TCK to drive your Subscriber tests: ```java @Override public Publisher createHelperPublisher(long elements) { @@ -201,13 +216,6 @@ which is done by overriding the `Publisher createHelperPublisher(long element } ``` -Summing up, we recommend implementing Subscribers which are able to consume any type of element, in this case the TCK -should be driven using `Integer` elements as default publishers are already implemented for this type. If the -`Subscriber` is unable to consume `Integer` elements, the implementer MUST implement a custom `Publisher` that will -be able to signal the required element types. It is of course both possible and recommended to re-use existing -implemenations (which can be seen in the examples sub-project) to create these custom Publishers – an example of -such re-use can be found in [ProvidedHelperPublisherForSubscriberVerificationTest#createStringPublisher](https://github.com/reactive-streams/reactive-streams/blob/master/tck/src/test/java/org/reactivestreams/tck/ProvidedHelperPublisherForSubscriberVerificationTest.java#L215) - ### Subscriber Blackbox Verification Blackbox Verification does not require any additional work except from providing a `Subscriber` and `Publisher` instances to the TCK: @@ -233,6 +241,11 @@ public class MySubscriberBlackboxVerificationTest extends SubscriberBlackboxVeri public Subscriber createSubscriber() { return new MySubscriber(); } + + @Override + public Integer createElement(int element) { + return element; + } } ``` @@ -308,6 +321,11 @@ public class MySubscriberWhiteboxVerificationTest extends SubscriberWhiteboxVeri }; } + @Override + public Integer createElement(int element) { + return element; + } + } ``` diff --git a/tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java b/tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java index f127b950..75769ace 100644 --- a/tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java @@ -15,6 +15,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; import static org.reactivestreams.tck.Annotations.NotVerified; import static org.reactivestreams.tck.Annotations.Required; @@ -224,7 +225,24 @@ public void spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithPreced blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() { @Override public void run(BlackboxTestStage stage) throws Throwable { - final Publisher pub = createHelperPublisher(0); + final Publisher pub = new Publisher() { + @Override public void subscribe(final Subscriber s) { + s.onSubscribe(new Subscription() { + private boolean completed = false; + + @Override public void request(long n) { + if (!completed) { + completed = true; + s.onComplete(); // Publisher now realises that it is in fact already completed + } + } + + @Override public void cancel() { + // noop, ignore + } + }); + } + }; final Subscriber sub = createSubscriber(); final BlackboxSubscriberProxy probe = stage.createBlackboxSubscriberProxy(env, sub); diff --git a/tck/src/main/java/org/reactivestreams/tck/support/HelperPublisher.java b/tck/src/main/java/org/reactivestreams/tck/support/HelperPublisher.java index 7168fd0b..ba0fce68 100644 --- a/tck/src/main/java/org/reactivestreams/tck/support/HelperPublisher.java +++ b/tck/src/main/java/org/reactivestreams/tck/support/HelperPublisher.java @@ -22,8 +22,7 @@ public HelperPublisher(final int from, final int to, final Function else try { return create.apply(at++); } catch (Throwable t) { - throw new HelperPublisherException( - String.format("Failed to create element in %s for id %s!", getClass().getSimpleName(), at - 1), t); + throw new IllegalStateException(String.format("Failed to create element for id %d!", at - 1), t); } } @Override public void remove() { throw new UnsupportedOperationException(); } diff --git a/tck/src/main/java/org/reactivestreams/tck/support/HelperPublisherException.java b/tck/src/main/java/org/reactivestreams/tck/support/HelperPublisherException.java deleted file mode 100644 index 4cb22524..00000000 --- a/tck/src/main/java/org/reactivestreams/tck/support/HelperPublisherException.java +++ /dev/null @@ -1,7 +0,0 @@ -package org.reactivestreams.tck.support; - -public final class HelperPublisherException extends RuntimeException { - public HelperPublisherException(String message, Throwable cause) { - super(message, cause); - } -} diff --git a/tck/src/main/java/org/reactivestreams/tck/support/InfiniteHelperPublisher.java b/tck/src/main/java/org/reactivestreams/tck/support/InfiniteHelperPublisher.java index 7b7cabda..94897e4b 100644 --- a/tck/src/main/java/org/reactivestreams/tck/support/InfiniteHelperPublisher.java +++ b/tck/src/main/java/org/reactivestreams/tck/support/InfiniteHelperPublisher.java @@ -18,7 +18,7 @@ public InfiniteHelperPublisher(final Function create, final Executor try { return create.apply(at++); // Wraps around on overflow } catch (Throwable t) { - throw new HelperPublisherException( + throw new IllegalStateException( String.format("Failed to create element in %s for id %s!", getClass().getSimpleName(), at - 1), t); } } diff --git a/tck/src/test/java/org/reactivestreams/tck/IdentityProcessorVerificationTest.java b/tck/src/test/java/org/reactivestreams/tck/IdentityProcessorVerificationTest.java index cdb30384..b15940fc 100644 --- a/tck/src/test/java/org/reactivestreams/tck/IdentityProcessorVerificationTest.java +++ b/tck/src/test/java/org/reactivestreams/tck/IdentityProcessorVerificationTest.java @@ -28,22 +28,20 @@ public class IdentityProcessorVerificationTest extends TCKVerificationSupport { public void spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError_shouldBeIgnored() throws Throwable { requireTestSkip(new ThrowingRunnable() { @Override public void run() throws Throwable { - new IdentityProcessorVerification(newTestEnvironment(), DEFAULT_TIMEOUT_MILLIS){ - @Override public Processor createIdentityProcessor(int bufferSize) { + new IdentityProcessorVerification(newTestEnvironment(), DEFAULT_TIMEOUT_MILLIS){ + @Override public Processor createIdentityProcessor(int bufferSize) { return new NoopProcessor(); } @Override public ExecutorService publisherExecutorService() { return ex; } - @Override public Object createElement(int element) { - return null; - } + @Override public Integer createElement(int element) { return element; } - @Override public Publisher createHelperPublisher(long elements) { + @Override public Publisher createHelperPublisher(long elements) { return SKIP; } - @Override public Publisher createErrorStatePublisher() { + @Override public Publisher createErrorStatePublisher() { return SKIP; }