Skip to content

Commit c3dd8a1

Browse files
committed
!tck from createHelper to createElement style
1 parent 9efb141 commit c3dd8a1

14 files changed

+226
-418
lines changed

examples/src/test/java/org/reactivestreams/example/unicast/AsyncSubscriberTest.java

+4
Original file line numberDiff line numberDiff line change
@@ -56,4 +56,8 @@ public AsyncSubscriberTest() {
5656
latch.await(DefaultTimeoutMillis * 10, TimeUnit.MILLISECONDS);
5757
assertEquals(i.get(), 45);
5858
}
59+
60+
@Override public Integer createElement(int element) {
61+
return element;
62+
}
5963
}

examples/src/test/java/org/reactivestreams/example/unicast/SyncSubscriberTest.java

+3
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,7 @@ public SyncSubscriberTest() {
3737
};
3838
}
3939

40+
@Override public Integer createElement(int element) {
41+
return element;
42+
}
4043
}

tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java

+23-10
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import java.util.HashSet;
1717
import java.util.Set;
1818

19-
public abstract class IdentityProcessorVerification<T> {
19+
public abstract class IdentityProcessorVerification<T> extends WithHelperPublisher<T> {
2020

2121
private final TestEnvironment env;
2222

@@ -64,6 +64,10 @@ public Subscriber<T> createSubscriber(WhiteboxSubscriberProbe<T> probe) {
6464
return IdentityProcessorVerification.this.createSubscriber(probe);
6565
}
6666

67+
@Override public T createElement(int element) {
68+
return IdentityProcessorVerification.this.createElement(element);
69+
}
70+
6771
@Override
6872
public Publisher<T> createHelperPublisher(long elements) {
6973
return IdentityProcessorVerification.this.createHelperPublisher(elements);
@@ -108,15 +112,24 @@ public boolean skipStochasticTests() {
108112
public abstract Processor<T, T> createIdentityProcessor(int bufferSize);
109113

110114
/**
111-
* Helper method required for running the Publisher rules against a Publisher.
112-
* It must create a Publisher for a stream with exactly the given number of elements.
113-
* If {@code elements} is {@code Long.MAX_VALUE} the produced stream must be infinite.
114-
*
115-
* The stream must not produce the same element twice (in case of an infinite stream this requirement
116-
* is relaxed to only apply to the elements that are actually requested during all tests).
117-
*
118-
* @param elements exact number of elements this publisher should emit,
119-
* unless equal to {@code Long.MAX_VALUE} in which case the stream should be effectively infinite
115+
* Helper method required for creating the Publisher to which the tested Subscriber will be subscribed and tested against.
116+
* <p>
117+
* By default an <b>asynchronously signalling Publisher</b> is provided, which will use
118+
* {@link org.reactivestreams.tck.SubscriberBlackboxVerification#createElement(int)} to generate elements type
119+
* your Subscriber is able to consume.
120+
* <p>
121+
* Sometimes you may want to implement your own custom custom helper Publisher - to validate behaviour of a Subscriber
122+
* when facing a synchronous Publisher for example. If you do, it MUST emit the exact number of elements asked for
123+
* (via the {@code elements} parameter) and MUST also must treat the following numbers of elements in these specific ways:
124+
* <ul>
125+
* <li>
126+
* If {@code elements} is {@code Long.MAX_VALUE} the produced stream must be infinite.
127+
* </li>
128+
* <li>
129+
* If {@code elements} is {@code 0} the {@code Publisher} should signal {@code onComplete} immediatly.
130+
* In other words, it should represent a "completed stream".
131+
* </li>
132+
* </ul>
120133
*/
121134
public abstract Publisher<T> createHelperPublisher(long elements);
122135

tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java

+2-63
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,16 @@
33
import org.reactivestreams.Publisher;
44
import org.reactivestreams.Subscriber;
55
import org.reactivestreams.Subscription;
6-
import org.reactivestreams.example.unicast.InfiniteIncrementNumberPublisher;
7-
import org.reactivestreams.example.unicast.NumberIterablePublisher;
86
import org.reactivestreams.tck.TestEnvironment.ManualPublisher;
97
import org.reactivestreams.tck.TestEnvironment.ManualSubscriber;
108
import org.reactivestreams.tck.support.Optional;
119
import org.reactivestreams.tck.support.TestException;
12-
import org.reactivestreams.tck.support.UnableToProvidePublisherException;
1310
import org.testng.SkipException;
1411
import org.testng.annotations.AfterClass;
1512
import org.testng.annotations.BeforeClass;
1613
import org.testng.annotations.BeforeMethod;
17-
import org.testng.annotations.BeforeTest;
1814
import org.testng.annotations.Test;
1915

20-
import java.lang.reflect.ParameterizedType;
2116
import java.util.concurrent.ExecutorService;
2217
import java.util.concurrent.Executors;
2318

@@ -36,7 +31,7 @@
3631
* @see org.reactivestreams.Subscriber
3732
* @see org.reactivestreams.Subscription
3833
*/
39-
public abstract class SubscriberBlackboxVerification<T> {
34+
public abstract class SubscriberBlackboxVerification<T> extends WithHelperPublisher<T> {
4035

4136
private final TestEnvironment env;
4237

@@ -61,63 +56,7 @@ protected SubscriberBlackboxVerification(TestEnvironment env) {
6156
private ExecutorService publisherExecutor;
6257
@BeforeClass public void startPublisherExecutorService() { publisherExecutor = Executors.newFixedThreadPool(4); }
6358
@AfterClass public void shutdownPublisherExecutorService() { if (publisherExecutor != null) publisherExecutor.shutdown(); }
64-
public ExecutorService publisherExecutorService() { return publisherExecutor; }
65-
66-
/**
67-
* Returns the Class of the {@code <T>} type parameter with which this SubscriberVerification was instanciated.
68-
* This is used to validate if the default provided helper {@link Publisher} can drive the tests for the tested
69-
* {@link Subscriber} or not.
70-
*
71-
* @see #createHelperPublisher(long)
72-
*/
73-
@SuppressWarnings("unchecked")
74-
public Class<T> elementClass() {
75-
return (Class<T>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
76-
}
77-
78-
/**
79-
* Validates if the required element type is able to be signalled by the default Publisher.
80-
* <p>
81-
* If it is able to create such Publisher returns {@code true},
82-
* otherwise throws an {@link org.reactivestreams.tck.support.UnableToProvidePublisherException}.
83-
*/
84-
@BeforeTest public boolean validateAbilityToProvidePublisher() {
85-
if (!elementClass().isAssignableFrom(Integer.class)) {
86-
throw new UnableToProvidePublisherException(String.format("Unable to provide Publisher of `%s` elements (can only emit `java.lang.Integer`)!", elementClass().getCanonicalName()));
87-
} else return true;
88-
}
89-
90-
/**
91-
* Helper method required for creating the Publisher to which the tested Subscriber will be subscribed and tested against.
92-
* <p>
93-
* By default an <b>Asynchronous Integer signalling Publisher</b> is provided, however if this SubscriberVerification
94-
* was created using a different type (and your Subscriber is only able to consume those signals (e.g. "MyMessage"))
95-
* the default provided Publisher will NOT be able to generate those messages and will fail the entire test class and
96-
* asking you to provide a custom {@link Publisher} which is able to generate elements of type {@code T}.
97-
* <p>
98-
* When implementing a custom helper Publisher it MUST emit the exact number of elements asked for (via the {@code elements}
99-
* parameter) and MUST also must treat the following numbers of elements in these specific ways:
100-
* <ul>
101-
* <li>
102-
* If {@code elements} is {@code Long.MAX_VALUE} the produced stream must be infinite.
103-
* </li>
104-
* <li>
105-
* If {@code elements} is {@code 0} the {@code Publisher} should signal {@code onComplete} immediatly.
106-
* In other words, it should represent a "completed stream".
107-
* </li>
108-
* </ul>
109-
*/
110-
@SuppressWarnings("unchecked")
111-
public Publisher<T> createHelperPublisher(long elements) {
112-
if (validateAbilityToProvidePublisher()) {
113-
// we can safely provide the Integer based Publishers
114-
if (elements > Integer.MAX_VALUE) {
115-
return (Publisher<T>) new InfiniteIncrementNumberPublisher(publisherExecutorService());
116-
} else {
117-
return (Publisher<T>) new NumberIterablePublisher(0, (int)elements, publisherExecutorService());
118-
}
119-
} else throw new RuntimeException("Unreachable. `validateAbilityToProvidePublisher` should have thrown!");
120-
}
59+
@Override public ExecutorService publisherExecutorService() { return publisherExecutor; }
12160

12261
////////////////////// TEST ENV CLEANUP /////////////////////////////////////
12362

tck/src/main/java/org/reactivestreams/tck/SubscriberWhiteboxVerification.java

+2-64
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,16 @@
33
import org.reactivestreams.Publisher;
44
import org.reactivestreams.Subscriber;
55
import org.reactivestreams.Subscription;
6-
import org.reactivestreams.example.unicast.InfiniteIncrementNumberPublisher;
7-
import org.reactivestreams.example.unicast.NumberIterablePublisher;
86
import org.reactivestreams.tck.Annotations.Additional;
97
import org.reactivestreams.tck.TestEnvironment.*;
10-
import org.reactivestreams.tck.support.Function;
118
import org.reactivestreams.tck.support.Optional;
129
import org.reactivestreams.tck.support.TestException;
13-
import org.reactivestreams.tck.support.UnableToProvidePublisherException;
1410
import org.testng.SkipException;
1511
import org.testng.annotations.AfterClass;
1612
import org.testng.annotations.BeforeClass;
1713
import org.testng.annotations.BeforeMethod;
18-
import org.testng.annotations.BeforeTest;
1914
import org.testng.annotations.Test;
2015

21-
import java.lang.reflect.ParameterizedType;
2216
import java.util.concurrent.ExecutorService;
2317
import java.util.concurrent.Executors;
2418

@@ -33,7 +27,7 @@
3327
* @see org.reactivestreams.Subscriber
3428
* @see org.reactivestreams.Subscription
3529
*/
36-
public abstract class SubscriberWhiteboxVerification<T> {
30+
public abstract class SubscriberWhiteboxVerification<T> extends WithHelperPublisher<T> {
3731

3832
private final TestEnvironment env;
3933

@@ -61,63 +55,7 @@ protected SubscriberWhiteboxVerification(TestEnvironment env) {
6155
private ExecutorService publisherExecutor;
6256
@BeforeClass public void startPublisherExecutorService() { publisherExecutor = Executors.newFixedThreadPool(4); }
6357
@AfterClass public void shutdownPublisherExecutorService() { if (publisherExecutor != null) publisherExecutor.shutdown(); }
64-
public ExecutorService publisherExecutorService() { return publisherExecutor; }
65-
66-
/**
67-
* Returns the Class of the {@code <T>} type parameter with which this SubscriberVerification was instanciated.
68-
* This is used to validate if the default provided helper {@link Publisher} can drive the tests for the tested
69-
* {@link Subscriber} or not.
70-
*
71-
* @see #createHelperPublisher(long)
72-
*/
73-
@SuppressWarnings("unchecked")
74-
public Class<T> elementClass() {
75-
return (Class<T>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
76-
}
77-
78-
/**
79-
* Validates if the required element type is able to be signalled by the default Publisher.
80-
* <p>
81-
* If it is able to create such Publisher returns {@code true},
82-
* otherwise throws an {@link org.reactivestreams.tck.support.UnableToProvidePublisherException}.
83-
*/
84-
@BeforeTest public boolean validateAbilityToProvidePublisher() {
85-
if (!elementClass().isAssignableFrom(Integer.class)) {
86-
throw new UnableToProvidePublisherException(String.format("Unable to provide Publisher of `%s` elements (can only emit `java.lang.Integer`)!", elementClass().getCanonicalName()));
87-
} else return true;
88-
}
89-
90-
/**
91-
* Helper method required for creating the Publisher to which the tested Subscriber will be subscribed and tested against.
92-
* <p>
93-
* By default an <b>Asynchronous Integer signalling Publisher</b> is provided, however if this SubscriberVerification
94-
* was created using a different type (and your Subscriber is only able to consume those signals (e.g. "MyMessage"))
95-
* the default provided Publisher will NOT be able to generate those messages and will fail the entire test class and
96-
* asking you to provide a custom {@link Publisher} which is able to generate elements of type {@code T}.
97-
* <p>
98-
* When implementing a custom helper Publisher it MUST emit the exact number of elements asked for (via the {@code elements}
99-
* parameter) and MUST also must treat the following numbers of elements in these specific ways:
100-
* <ul>
101-
* <li>
102-
* If {@code elements} is {@code Long.MAX_VALUE} the produced stream must be infinite.
103-
* </li>
104-
* <li>
105-
* If {@code elements} is {@code 0} the {@code Publisher} should signal {@code onComplete} immediatly.
106-
* In other words, it should represent a "completed stream".
107-
* </li>
108-
* </ul>
109-
*/
110-
@SuppressWarnings("unchecked")
111-
public Publisher<T> createHelperPublisher(long elements) {
112-
if (validateAbilityToProvidePublisher()) {
113-
// we can safely provide the Integer based Publishers
114-
if (elements > Integer.MAX_VALUE) {
115-
return (Publisher<T>) new InfiniteIncrementNumberPublisher(publisherExecutorService());
116-
} else {
117-
return (Publisher<T>) new NumberIterablePublisher(0, (int)elements, publisherExecutorService());
118-
}
119-
} else throw new RuntimeException("Unreachable. `validateAbilityToProvidePublisher` should have thrown!");
120-
}
58+
@Override public ExecutorService publisherExecutorService() { return publisherExecutor; }
12159

12260
////////////////////// TEST ENV CLEANUP /////////////////////////////////////
12361

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package org.reactivestreams.tck;
2+
3+
import org.reactivestreams.Publisher;
4+
import org.reactivestreams.tck.support.Function;
5+
import org.reactivestreams.tck.support.HelperPublisher;
6+
import org.reactivestreams.tck.support.InfiniteHelperPublisher;
7+
8+
import java.util.concurrent.ExecutorService;
9+
10+
/**
11+
* Type which is able to create elements based on a seed {@code id} value.
12+
* <p>
13+
* Simplest implementations will simply return the incoming id as the element.
14+
*
15+
* @param <T> type of element to be delivered to the Subscriber
16+
*/
17+
public abstract class WithHelperPublisher<T> {
18+
19+
/** ExecutorService to be used by the provided helper {@link org.reactivestreams.Publisher} */
20+
public abstract ExecutorService publisherExecutorService();
21+
22+
/**
23+
* Implement this method to match your expected element type.
24+
* In case of implementing a simple Subscriber which is able to consume any kind of element simply return the
25+
* incoming {@code element} element.
26+
* <p/>
27+
* Sometimes the Subscriber may be limited in what type of element it is able to consume, this you may have to implement
28+
* this method such that the emitted element matches the Subscribers requirements. Simplest implementations would be
29+
* to simply pass in the {@code element} as payload of your custom element, such as appending it to a String or other identifier.
30+
*
31+
* @return element of the matching type {@code T} that will be delivered to the tested Subscriber
32+
*/
33+
public abstract T createElement(int element);
34+
35+
/**
36+
* Helper method required for creating the Publisher to which the tested Subscriber will be subscribed and tested against.
37+
* <p>
38+
* By default an <b>asynchronously signalling Publisher</b> is provided, which will use
39+
* {@link org.reactivestreams.tck.SubscriberBlackboxVerification#createElement(int)} to generate elements type
40+
* your Subscriber is able to consume.
41+
* <p>
42+
* Sometimes you may want to implement your own custom custom helper Publisher - to validate behaviour of a Subscriber
43+
* when facing a synchronous Publisher for example. If you do, it MUST emit the exact number of elements asked for
44+
* (via the {@code elements} parameter) and MUST also must treat the following numbers of elements in these specific ways:
45+
* <ul>
46+
* <li>
47+
* If {@code elements} is {@code Long.MAX_VALUE} the produced stream must be infinite.
48+
* </li>
49+
* <li>
50+
* If {@code elements} is {@code 0} the {@code Publisher} should signal {@code onComplete} immediatly.
51+
* In other words, it should represent a "completed stream".
52+
* </li>
53+
* </ul>
54+
*/
55+
@SuppressWarnings("unchecked")
56+
public Publisher<T> createHelperPublisher(long elements) {
57+
final Function<Integer, T> mkElement = new Function<Integer, T>() {
58+
@Override public T apply(Integer id) throws Throwable {
59+
return createElement(id);
60+
}
61+
};
62+
63+
if (elements > Integer.MAX_VALUE) return new InfiniteHelperPublisher(mkElement, publisherExecutorService());
64+
else return new HelperPublisher(0, (int) elements, mkElement, publisherExecutorService());
65+
}
66+
67+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package org.reactivestreams.tck.support;
2+
3+
import java.util.Collections;
4+
import java.util.Iterator;
5+
import java.util.concurrent.Executor;
6+
import org.reactivestreams.Subscription;
7+
import org.reactivestreams.Subscriber;
8+
import org.reactivestreams.Publisher;
9+
import org.reactivestreams.example.unicast.AsyncIterablePublisher;
10+
11+
public class HelperPublisher<T> extends AsyncIterablePublisher<T> {
12+
13+
public HelperPublisher(final int from, final int to, final Function<Integer, T> create, final Executor executor) {
14+
super(new Iterable<T>() {
15+
{ if(from > to) throw new IllegalArgumentException("from must be equal or greater than to!"); }
16+
@Override public Iterator<T> iterator() {
17+
return new Iterator<T>() {
18+
private int at = from;
19+
@Override public boolean hasNext() { return at < to; }
20+
@Override public T next() {
21+
if (!hasNext()) return Collections.<T>emptyList().iterator().next();
22+
else try {
23+
return create.apply(at++);
24+
} catch (Throwable t) {
25+
throw new HelperPublisherException(
26+
String.format("Failed to create element in %s for id %s!", getClass().getSimpleName(), at - 1), t);
27+
}
28+
}
29+
@Override public void remove() { throw new UnsupportedOperationException(); }
30+
};
31+
}
32+
}, executor);
33+
}
34+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package org.reactivestreams.tck.support;
2+
3+
public final class HelperPublisherException extends RuntimeException {
4+
public HelperPublisherException(String message, Throwable cause) {
5+
super(message, cause);
6+
}
7+
}

0 commit comments

Comments
 (0)