Skip to content

+TCK: Provide helper publisher (rebased) #193

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
language: java
script:
- ./gradlew check
- ./gradlew check
cache:
directories:
- $HOME/.gradle
jdk:
- openjdk6
- openjdk6
env:
global:
- TERM=dumb
- DEFAULT_TIMEOUT_MILLIS=300
- PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS=300
7 changes: 7 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ subprojects {
}
}

tasks.withType(Test) {
testLogging {
exceptionFormat "full"
events "failed", "started"
}
}

repositories {
mavenCentral()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.reactivestreams.example.unicast;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.reactivestreams.example.unicast;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package org.reactivestreams.example.unicast;

import java.util.Collections;
import java.util.Iterator;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.tck.SubscriberBlackboxVerification;
Expand All @@ -19,29 +17,22 @@

@Test // Must be here for TestNG to find and run this, do not remove
public class AsyncSubscriberTest extends SubscriberBlackboxVerification<Integer> {
final static long DefaultTimeoutMillis = 100;

private ExecutorService e;
@BeforeClass void before() { e = Executors.newFixedThreadPool(4); }
@AfterClass void after() { if (e != null) e.shutdown(); }

public AsyncSubscriberTest() {
super(new TestEnvironment(DefaultTimeoutMillis));
super(new TestEnvironment());
}

@Override public Subscriber<Integer> createSubscriber() {
return new AsyncSubscriber<Integer>(e) {
private long acc;
@Override protected boolean whenNext(final Integer element) {
return true;
}
};
}
@SuppressWarnings("unchecked")
@Override public Publisher<Integer> createHelperPublisher(long elements) {
if (elements > Integer.MAX_VALUE) return new InfiniteIncrementNumberPublisher(e);
else return new NumberIterablePublisher(0, (int)elements, e);
}

@Test public void testAccumulation() throws InterruptedException {

Expand All @@ -61,7 +52,12 @@ public AsyncSubscriberTest() {
};

new NumberIterablePublisher(0, 10, e).subscribe(sub);
latch.await(DefaultTimeoutMillis * 10, TimeUnit.MILLISECONDS);
latch.await(env.defaultTimeoutMillis() * 10, TimeUnit.MILLISECONDS);
assertEquals(i.get(), 45);
}

@Override public Integer createElement(int element) {
return element;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,12 @@
@Test // Must be here for TestNG to find and run this, do not remove
public class IterablePublisherTest extends PublisherVerification<Integer> {

final static long DefaultTimeoutMillis = 100;
final static long PublisherReferenceGCTimeoutMillis = 300;

private ExecutorService e;
@BeforeClass void before() { e = Executors.newFixedThreadPool(4); }
@AfterClass void after() { if (e != null) e.shutdown(); }

public IterablePublisherTest() {
super(new TestEnvironment(DefaultTimeoutMillis), PublisherReferenceGCTimeoutMillis);
super(new TestEnvironment());
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,24 @@
package org.reactivestreams.example.unicast;

import java.util.Collections;
import java.util.Iterator;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.tck.SubscriberBlackboxVerification;
import org.reactivestreams.tck.TestEnvironment;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import org.testng.annotations.BeforeClass;
import org.testng.annotations.AfterClass;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Test // Must be here for TestNG to find and run this, do not remove
public class SyncSubscriberTest extends SubscriberBlackboxVerification<Integer> {

final static long DefaultTimeoutMillis = 100;

private ExecutorService e;
@BeforeClass void before() { e = Executors.newFixedThreadPool(4); }
@AfterClass void after() { if (e != null) e.shutdown(); }

public SyncSubscriberTest() {
super(new TestEnvironment(DefaultTimeoutMillis));
super(new TestEnvironment());
}

@Override public Subscriber<Integer> createSubscriber() {
Expand All @@ -39,9 +34,8 @@ public SyncSubscriberTest() {
}
};
}
@SuppressWarnings("unchecked")
@Override public Publisher<Integer> createHelperPublisher(long elements) {
if (elements > Integer.MAX_VALUE) return new InfiniteIncrementNumberPublisher(e);
else return new NumberIterablePublisher(0, (int)elements, e);

@Override public Integer createElement(int element) {
return element;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,13 @@

@Test // Must be here for TestNG to find and run this, do not remove
public class UnboundedIntegerIncrementPublisherTest extends PublisherVerification<Integer> {
final static long DefaultTimeoutMillis = 200;
final static long PublisherReferenceGCTimeoutMillis = 500;

private ExecutorService e;
@BeforeClass void before() { e = Executors.newFixedThreadPool(4); }
@AfterClass void after() { if (e != null) e.shutdown(); }

public UnboundedIntegerIncrementPublisherTest() {
super(new TestEnvironment(DefaultTimeoutMillis), PublisherReferenceGCTimeoutMillis);
super(new TestEnvironment());
}

@Override public Publisher<Integer> createPublisher(long elements) {
Expand Down
122 changes: 107 additions & 15 deletions tck/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ Specification rule abides the following naming convention: `spec###_DESC` where:
```

The prefixes of the names of the test methods are used in order to signify the character of the test. For example, these are the kinds of prefixes you may find:
"required_", "optional_", "stochastic_", "untested_".
"`required_`", "`optional_`", "`stochastic_`", "`untested_`".
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(:


Explanations:

Expand Down Expand Up @@ -132,11 +132,8 @@ import org.reactivestreams.tck.TestEnvironment;

public class RangePublisherTest extends PublisherVerification<Integer> {

public static final long DEFAULT_TIMEOUT_MILLIS = 300L;
public static final long PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS = 1000L;

public RangePublisherTest() {
super(new TestEnvironment(DEFAULT_TIMEOUT_MILLIS), PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS);
super(new TestEnvironment());
}

@Override
Expand Down Expand Up @@ -174,12 +171,83 @@ Notable configuration options include:
* `boundedDepthOfOnNextAndRequestRecursion` – which should only be overridden in case of synchronous Publishers. This number will be used to validate if a
`Subscription` actually solves the "unbounded recursion" problem (Rule 3.3).

### Timeout configuration
Publisher tests make use of two kinds of timeouts, one is the `defaultTimeoutMillis` which corresponds to all methods used
within the TCK which await for something to happen. The other timeout is `publisherReferenceGCTimeoutMillis` which is only used in order to verify
[Rule 3.13](https://github.com/reactive-streams/reactive-streams#3.13) which defines that subscriber references MUST be dropped
by the Publisher.

In order to configure these timeouts (for example when running on a slow continious integtation machine), you can either:

**Use env variables** to set these timeouts, in which case the you can just:

```bash
export DEFAULT_TIMEOUT_MILLIS=300
export PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS=500
```

Or **define the timeouts explicitly in code**:

```java
public class RangePublisherTest extends PublisherVerification<Integer> {

public static final long DEFAULT_TIMEOUT_MILLIS = 300L;
public static final long PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS = 500L;

public RangePublisherTest() {
super(new TestEnvironment(DEFAULT_TIMEOUT_MILLIS), PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS);
}

// ...
}
```

Note that explicitly passed in values take precedence over values provided by the environment

## Subscriber Verification

Subscriber rules Verification is split up into two files (styles) of tests.

The Blackbox Verification tests do not require the implementation under test to be modified at all, yet they are *not* able to verify most rules. In Whitebox Verification, more control over `request()` calls etc. is required in order to validate rules more precisely.

### createElement and Helper Publisher implementations
Since testing a `Subscriber` is not possible without a corresponding `Publisher` the TCK Subscriber Verifications
both provide a default "*helper publisher*" to drive its test and also alow to replace this Publisher with a custom implementation.
The helper publisher is an asynchronous publisher by default - meaning that your subscriber can not blindly assume single threaded execution.

While the `Publisher` implementation is provided, creating the signal elements is not – this is because a given Subscriber
may for example only work with `HashedMessage` or some other specific kind of signal. The TCK is unable to generate such
special messages automatically, so we provide the `T createElement(Integer id)` method to be implemented as part of
Subscriber Verifications which should take the given ID and return an element of type `T` (where `T` is the type of
elements flowing into the `Subscriber<T>`, as known thanks to `... extends WhiteboxSubscriberVerification<T>`) representing
an element of the stream that will be passed on to the Subscriber.

The simplest valid implemenation is to return the incoming `id` *as the element* in a verification using `Integer`s as element types:

```java
public class MySubscriberTest extends SubscriberBlackboxVerification<Integer> {

// ...

@Override
public Integer createElement(int element) { return element; }
}
```


The `createElement` method MAY be called from multiple
threads, so in case of more complicated implementations, please be aware of this fact.

**Very Advanced**: While we do not expect many implementations having to do so, it is possible to take full control of the `Publisher`
which will be driving the TCKs test. You can do this by implementing the `createHelperPublisher` method in which you can implement your
own Publisher which will then be used by the TCK to drive your Subscriber tests:

```java
@Override public Publisher<Message> createHelperPublisher(long elements) {
return new Publisher<Message>() { /* IMPL HERE */ };
}
```

### Subscriber Blackbox Verification

Blackbox Verification does not require any additional work except from providing a `Subscriber` and `Publisher` instances to the TCK:
Expand All @@ -195,10 +263,8 @@ import org.reactivestreams.tck.TestEnvironment;

public class MySubscriberBlackboxVerificationTest extends SubscriberBlackboxVerification<Integer> {

public static final long DEFAULT_TIMEOUT_MILLIS = 300L;

public MySubscriberBlackboxVerificationTest() {
super(new TestEnvironment(DEFAULT_TIMEOUT_MILLIS));
super(new TestEnvironment());
}

@Override
Expand All @@ -207,8 +273,8 @@ public class MySubscriberBlackboxVerificationTest extends SubscriberBlackboxVeri
}

@Override
public Publisher<Integer> createHelperPublisher(long elements) {
return new MyRangePublisher<Integer>(1, elements);
public Integer createElement(int element) {
return element;
}
}
```
Expand All @@ -235,10 +301,8 @@ import org.reactivestreams.tck.TestEnvironment;

public class MySubscriberWhiteboxVerificationTest extends SubscriberWhiteboxVerification<Integer> {

public static final long DEFAULT_TIMEOUT_MILLIS = 300L;

public MySubscriberWhiteboxVerificationTest() {
super(new TestEnvironment(DEFAULT_TIMEOUT_MILLIS));
super(new TestEnvironment());
}

@Override
Expand Down Expand Up @@ -286,13 +350,41 @@ public class MySubscriberWhiteboxVerificationTest extends SubscriberWhiteboxVeri
}

@Override
public Publisher<Integer> createHelperPublisher(long elements) {
return new MyRangePublisher<Integer>(1, elements);
public Integer createElement(int element) {
return element;
}

}
```

### Timeout configuration
Similarily to `PublisherVerification`, it is possible to set the timeouts used by the TCK to validate subscriber behaviour.
This can be set either by using env variables or hardcoded explicitly.

**Use env variables** to set the timeout value to be used by the TCK:

```bash
export DEFAULT_TIMEOUT_MILLIS=300
```

Or **define the timeout explicitly in code**:

```java
public class MySubscriberTest extends BlackboxSubscriberVerification<Integer> {

public static final long DEFAULT_TIMEOUT_MILLIS = 300L;

public RangePublisherTest() {
super(new MySubscriberTest(DEFAULT_TIMEOUT_MILLIS));
}

// ...
}
```

Note that hard-coded values *take precedence* over environment set values (!).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps better to word it as "Note that explicitly passed in values take precedence over values provided by the environment"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that's better - thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// github is acting weird, I updated that line yet this comment remains expanded - https://github.com/reactive-streams/reactive-streams/pull/193/files#diff-4ce068920e11452416705ccb8f46ec78R205



## Subscription Verification

Please note that while `Subscription` does **not** have it's own test class, it's rules are validated inside of the `Publisher` and `Subscriber` tests – depending if the Rule demands specific action to be taken by the publishing, or subscribing side of the subscription contract.
Expand Down
1 change: 1 addition & 0 deletions tck/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ description = 'reactive-streams-tck'
dependencies {
compile group: 'org.testng', name: 'testng', version:'5.14.10'
compile project(':reactive-streams')
compile project(':reactive-streams-examples')
}
test.useTestNG()
Loading