Skip to content

+tck #181 provides default helperPublisher for subscriber tests #186

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
language: java
script:
- ./gradlew check
- ./gradlew check
cache:
directories:
- $HOME/.gradle
jdk:
- openjdk6
- openjdk6
env:
global:
- TERM=dumb
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.reactivestreams.example.unicast;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.reactivestreams.example.unicast;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package org.reactivestreams.example.unicast;

import java.util.Collections;
import java.util.Iterator;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.tck.SubscriberBlackboxVerification;
Expand All @@ -19,29 +17,23 @@

@Test // Must be here for TestNG to find and run this, do not remove
public class AsyncSubscriberTest extends SubscriberBlackboxVerification<Integer> {
final static long DefaultTimeoutMillis = 100;
final static long DEFAULT_TIMEOUT_MILLIS = 100;

private ExecutorService e;
@BeforeClass void before() { e = Executors.newFixedThreadPool(4); }
@AfterClass void after() { if (e != null) e.shutdown(); }

public AsyncSubscriberTest() {
super(new TestEnvironment(DefaultTimeoutMillis));
super(new TestEnvironment(DEFAULT_TIMEOUT_MILLIS));
}

@Override public Subscriber<Integer> createSubscriber() {
return new AsyncSubscriber<Integer>(e) {
private long acc;
@Override protected boolean whenNext(final Integer element) {
return true;
}
};
}
@SuppressWarnings("unchecked")
@Override public Publisher<Integer> createHelperPublisher(long elements) {
if (elements > Integer.MAX_VALUE) return new InfiniteIncrementNumberPublisher(e);
else return new NumberIterablePublisher(0, (int)elements, e);
}

@Test public void testAccumulation() throws InterruptedException {

Expand All @@ -60,8 +52,16 @@ public AsyncSubscriberTest() {
}
};

new NumberIterablePublisher<Integer>(0, 10, e).subscribe(sub);
latch.await(DefaultTimeoutMillis * 10, TimeUnit.MILLISECONDS);
new NumberIterablePublisher(0, 10, e).subscribe(sub);
latch.await(DEFAULT_TIMEOUT_MILLIS * 10, TimeUnit.MILLISECONDS);
assertEquals(i.get(), 45);
}

@Override public Integer createElement(int element) {
return element;
}

@Override public Publisher<Integer> createHelperPublisher(long elements) {
return super.createHelperPublisher(elements);
}
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
package org.reactivestreams.example.unicast;

import java.util.Collections;
import java.util.Iterator;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.tck.SubscriberBlackboxVerification;
import org.reactivestreams.tck.TestEnvironment;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import org.testng.annotations.BeforeClass;
import org.testng.annotations.AfterClass;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Test // Must be here for TestNG to find and run this, do not remove
public class SyncSubscriberTest extends SubscriberBlackboxVerification<Integer> {
Expand Down Expand Up @@ -39,9 +36,8 @@ public SyncSubscriberTest() {
}
};
}
@SuppressWarnings("unchecked")
@Override public Publisher<Integer> createHelperPublisher(long elements) {
if (elements > Integer.MAX_VALUE) return new InfiniteIncrementNumberPublisher(e);
else return new NumberIterablePublisher(0, (int)elements, e);

@Override public Integer createElement(int element) {
return element;
}
}
46 changes: 42 additions & 4 deletions tck/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,44 @@ Subscriber rules Verification is split up into two files (styles) of tests.

The Blackbox Verification tests do not require the implementation under test to be modified at all, yet they are *not* able to verify most rules. In Whitebox Verification, more control over `request()` calls etc. is required in order to validate rules more precisely.

### createElement and Helper Publisher implementations
Since testing a `Subscriber` is not possible without a corresponding `Publisher` the TCK Subscriber Verifications
both provide a default "*helper publisher*" to drive its test and also alow to replace this Publisher with a custom implementation.
The helper publisher is an asynchronous publisher by default - meaning that your subscriber can not blindly assume single threaded execution.

While the `Publisher` implementation is provided, creating the signal elements is not – this is because a given Subscriber
may for example only work with `HashedMessage` or some other specific kind of signal. The TCK is unable to generate such
special messages automatically, so we provide the `T createElement(Integer id)` method to be implemented as part of
Subscriber Verifications which should take the given ID and return an element of type `T` (where `T` is the type of
elements flowing into the `Subscriber<T>`, as known thanks to `... extends WhiteboxSubscriberVerification<T>`) representing
an element of the stream that will be passed on to the Subscriber.

The simplest valid implemenation is to return the incoming `id` *as the element* in a verification using `Integer`s as element types:

```java
public class MySubscriberTest extends SubscriberBlackboxVerification<Integer> {

// ...

@Override
public Integer createElement(int element) { return element; }
}
```


The `createElement` method MAY be called from multiple
threads, so in case of more complicated implementations, please be aware of this fact.

**Very Advanced**: While we do not expect many implementations having to do so, it is possible to take full control of the `Publisher`
which will be driving the TCKs test. You can do this by implementing the `createHelperPublisher` method in which you can implement your
own Publisher which will then be used by the TCK to drive your Subscriber tests:

```java
@Override public Publisher<Message> createHelperPublisher(long elements) {
return new Publisher<Message>() { /* IMPL HERE */ };
}
```

### Subscriber Blackbox Verification

Blackbox Verification does not require any additional work except from providing a `Subscriber` and `Publisher` instances to the TCK:
Expand Down Expand Up @@ -205,8 +243,8 @@ public class MySubscriberBlackboxVerificationTest extends SubscriberBlackboxVeri
}

@Override
public Publisher<Integer> createHelperPublisher(long elements) {
return new MyRangePublisher<Integer>(1, elements);
public Integer createElement(int element) {
return element;
}
}
```
Expand Down Expand Up @@ -284,8 +322,8 @@ public class MySubscriberWhiteboxVerificationTest extends SubscriberWhiteboxVeri
}

@Override
public Publisher<Integer> createHelperPublisher(long elements) {
return new MyRangePublisher<Integer>(1, elements);
public Integer createElement(int element) {
return element;
}

}
Expand Down
1 change: 1 addition & 0 deletions tck/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ description = 'reactive-streams-tck'
dependencies {
compile group: 'org.testng', name: 'testng', version:'5.14.10'
compile project(':reactive-streams')
compile project(':reactive-streams-examples')
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TCK is pulled in by users into test scope anyway, so this will not really pollute libraries and makes the helpful implementations (examples) available in test scope for users to play around with them

}
test.useTestNG()
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import java.util.HashSet;
import java.util.Set;

public abstract class IdentityProcessorVerification<T> {
public abstract class IdentityProcessorVerification<T> extends WithHelperPublisher<T> {

private final TestEnvironment env;

Expand Down Expand Up @@ -64,6 +64,10 @@ public Subscriber<T> createSubscriber(WhiteboxSubscriberProbe<T> probe) {
return IdentityProcessorVerification.this.createSubscriber(probe);
}

@Override public T createElement(int element) {
return IdentityProcessorVerification.this.createElement(element);
}

@Override
public Publisher<T> createHelperPublisher(long elements) {
return IdentityProcessorVerification.this.createHelperPublisher(elements);
Expand Down Expand Up @@ -108,15 +112,24 @@ public boolean skipStochasticTests() {
public abstract Processor<T, T> createIdentityProcessor(int bufferSize);

/**
* Helper method required for running the Publisher rules against a Publisher.
* It must create a Publisher for a stream with exactly the given number of elements.
* If {@code elements} is {@code Long.MAX_VALUE} the produced stream must be infinite.
*
* The stream must not produce the same element twice (in case of an infinite stream this requirement
* is relaxed to only apply to the elements that are actually requested during all tests).
*
* @param elements exact number of elements this publisher should emit,
* unless equal to {@code Long.MAX_VALUE} in which case the stream should be effectively infinite
* Helper method required for creating the Publisher to which the tested Subscriber will be subscribed and tested against.
* <p>
* By default an <b>asynchronously signalling Publisher</b> is provided, which will use
* {@link org.reactivestreams.tck.SubscriberBlackboxVerification#createElement(int)} to generate elements type
* your Subscriber is able to consume.
* <p>
* Sometimes you may want to implement your own custom custom helper Publisher - to validate behaviour of a Subscriber
* when facing a synchronous Publisher for example. If you do, it MUST emit the exact number of elements asked for
* (via the {@code elements} parameter) and MUST also must treat the following numbers of elements in these specific ways:
* <ul>
* <li>
* If {@code elements} is {@code Long.MAX_VALUE} the produced stream must be infinite.
* </li>
* <li>
* If {@code elements} is {@code 0} the {@code Publisher} should signal {@code onComplete} immediatly.
* In other words, it should represent a "completed stream".
* </li>
* </ul>
*/
public abstract Publisher<T> createHelperPublisher(long elements);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,15 @@
import org.reactivestreams.tck.support.Optional;
import org.reactivestreams.tck.support.TestException;
import org.testng.SkipException;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.reactivestreams.tck.Annotations.NotVerified;
import static org.reactivestreams.tck.Annotations.Required;
import static org.reactivestreams.tck.SubscriberWhiteboxVerification.BlackboxSubscriberProxy;
Expand All @@ -26,36 +32,32 @@
* @see org.reactivestreams.Subscriber
* @see org.reactivestreams.Subscription
*/
public abstract class SubscriberBlackboxVerification<T> {
public abstract class SubscriberBlackboxVerification<T> extends WithHelperPublisher<T> {

private final TestEnvironment env;

protected SubscriberBlackboxVerification(TestEnvironment env) {
this.env = env;
}

// USER API

/**
* This is the main method you must implement in your test incarnation.
* It must create a new {@link org.reactivestreams.Subscriber} instance to be subjected to the testing logic.
*/
public abstract Subscriber<T> createSubscriber();

// ENV SETUP

/**
* Helper method required for generating test elements.
* It must create a {@link org.reactivestreams.Publisher} for a stream with exactly the given number of elements.
* <p>
* It also must treat the following numbers of elements in these specific ways:
* <ul>
* <li>
* If {@code elements} is {@code Long.MAX_VALUE} the produced stream must be infinite.
* </li>
* <li>
* If {@code elements} is {@code 0} the {@code Publisher} should signal {@code onComplete} immediatly.
* In other words, it should represent a "completed stream".
* </li>
* </ul>
* Executor service used by the default provided asynchronous Publisher.
* @see #createHelperPublisher(long)
*/
public abstract Publisher<T> createHelperPublisher(long elements);
private ExecutorService publisherExecutor;
@BeforeClass public void startPublisherExecutorService() { publisherExecutor = Executors.newFixedThreadPool(4); }
@AfterClass public void shutdownPublisherExecutorService() { if (publisherExecutor != null) publisherExecutor.shutdown(); }
@Override public ExecutorService publisherExecutorService() { return publisherExecutor; }

////////////////////// TEST ENV CLEANUP /////////////////////////////////////

Expand Down Expand Up @@ -223,7 +225,24 @@ public void spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithPreced
blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
@Override
public void run(BlackboxTestStage stage) throws Throwable {
final Publisher<T> pub = createHelperPublisher(0);
final Publisher<T> pub = new Publisher<T>() {
@Override public void subscribe(final Subscriber<? super T> s) {
s.onSubscribe(new Subscription() {
private boolean completed = false;

@Override public void request(long n) {
if (!completed) {
completed = true;
s.onComplete(); // Publisher now realises that it is in fact already completed
}
}

@Override public void cancel() {
// noop, ignore
}
});
}
};

final Subscriber<T> sub = createSubscriber();
final BlackboxSubscriberProxy<T> probe = stage.createBlackboxSubscriberProxy(env, sub);
Expand Down
Loading