Skip to content

Commit 9ce56b9

Browse files
committed
Review followups and rebase
* Make spec rules linkable (#2.13) and use links in TCK * Verified 1-element-publishers are testable with TCK (skips unsupported tests)
1 parent 1369af3 commit 9ce56b9

File tree

9 files changed

+268
-287
lines changed

9 files changed

+268
-287
lines changed

README.md

+54-54
Large diffs are not rendered by default.

api/src/examples/java/org/reactivestreams/example/multicast/NeverEndingStockStream.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public static synchronized void addHandler(Handler handler) {
3535

3636
public static synchronized void removeHandler(Handler handler) {
3737
// too lazy to do the array handling
38-
HashSet<Handler> set = new HashSet<>(Arrays.asList(INSTANCE.handlers));
38+
HashSet<Handler> set = new HashSet<Handler>(Arrays.asList(INSTANCE.handlers));
3939
set.remove(handler);
4040
INSTANCE.handlers = set.toArray(new Handler[set.size()]);
4141
}

api/src/examples/java/org/reactivestreams/example/multicast/StockPriceSubscriber.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ public class StockPriceSubscriber implements Subscriber<Stock> {
1313
private final int take;
1414

1515
public StockPriceSubscriber(int bufferSize, int delayPerStock, int take) {
16-
this.buffer = new ArrayBlockingQueue<>(bufferSize);
16+
this.buffer = new ArrayBlockingQueue<Stock>(bufferSize);
1717
this.delayPerStock = delayPerStock;
1818
this.take = take;
1919
}

api/src/examples/java/org/reactivestreams/example/unicast/NumberSubscriberThatHopsThreads.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
class NumberSubscriberThatHopsThreads implements Subscriber<Integer> {
99

1010
final int BUFFER_SIZE = 10;
11-
private final ArrayBlockingQueue<Integer> buffer = new ArrayBlockingQueue<>(BUFFER_SIZE);
11+
private final ArrayBlockingQueue<Integer> buffer = new ArrayBlockingQueue<Integer>(BUFFER_SIZE);
1212
private volatile boolean terminated = false;
1313
private final String token;
1414

tck/src/main/java/org/reactivestreams/tck/Annotations.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,15 @@ private Annotations() {}
1717
static @interface NotVerified {
1818
}
1919

20+
/**
21+
* Used to mark stochastic tests which MAY yield false positives (pass) can violate the tested rule in some specific scenario.
22+
*/
23+
@Target(ElementType.METHOD)
24+
@Retention(RetentionPolicy.SOURCE)
25+
static @interface Stochastic {
26+
27+
}
28+
2029
/**
2130
* Used to mark tests that MUST pass in all (even very restricted types of) Publishers / Subscribers.
2231
*/
@@ -31,7 +40,7 @@ private Annotations() {}
3140
*/
3241
@Target(ElementType.METHOD)
3342
@Retention(RetentionPolicy.SOURCE)
34-
@interface Additional {
43+
static @interface Additional {
3544
/** Description of situation when it's OK to not pass this test */
3645
String value() default "";
3746

tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java

+44-41
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44
import org.reactivestreams.Publisher;
55
import org.reactivestreams.Subscriber;
66
import org.reactivestreams.Subscription;
7-
import org.reactivestreams.tck.Annotations.NotVerified;
8-
import org.reactivestreams.tck.Annotations.Required;
97
import org.reactivestreams.tck.TestEnvironment.ManualPublisher;
108
import org.reactivestreams.tck.TestEnvironment.ManualSubscriber;
119
import org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport;
@@ -30,21 +28,25 @@ public abstract class IdentityProcessorVerification<T> {
3028

3129
////////////////// END OF DELEGATED TO SPECS //////////////////
3230

33-
34-
private final int testBufferSize;
31+
//
32+
private final int processorBufferSize;
3533

3634
/**
3735
* Test class must specify the expected time it takes for the publisher to
38-
* shut itself down when the the last downstream Subscription is cancelled.
39-
* Used by `publisherSubscribeWhenInShutDownStateMustTriggerOnErrorAndNotOnSubscribe`.
36+
* shut itself down when the the last downstream {@code Subscription} is cancelled.
4037
*/
38+
@SuppressWarnings("unused")
4139
public IdentityProcessorVerification(TestEnvironment env, long publisherShutdownTimeoutMillis) {
4240
this(env, publisherShutdownTimeoutMillis, TestEnvironment.TEST_BUFFER_SIZE);
4341
}
4442

45-
public IdentityProcessorVerification(final TestEnvironment env, long publisherShutdownTimeoutMillis, int testBufferSize) {
43+
/**
44+
* Test class must specify the expected time it takes for the publisher to
45+
* shut itself down when the the last downstream {@code Subscription} is cancelled.
46+
*/
47+
public IdentityProcessorVerification(final TestEnvironment env, long publisherShutdownTimeoutMillis, int processorBufferSize) {
4648
this.env = env;
47-
this.testBufferSize = testBufferSize;
49+
this.processorBufferSize = processorBufferSize;
4850

4951
this.subscriberVerification = new SubscriberVerification<T>(env) {
5052
@Override
@@ -53,14 +55,14 @@ public Subscriber<T> createSubscriber(SubscriberProbe<T> probe) {
5355
}
5456

5557
@Override
56-
public Publisher<T> createHelperPublisher(int elements) {
58+
public Publisher<T> createHelperPublisher(long elements) {
5759
return IdentityProcessorVerification.this.createHelperPublisher(elements);
5860
}
5961
};
6062

6163
publisherVerification = new PublisherVerification<T>(env, publisherShutdownTimeoutMillis) {
6264
@Override
63-
public Publisher<T> createPublisher(int elements) {
65+
public Publisher<T> createPublisher(long elements) {
6466
return IdentityProcessorVerification.this.createPublisher(elements);
6567
}
6668

@@ -80,17 +82,19 @@ public Publisher<T> createErrorStatePublisher() {
8082
* This is the main method you must implement in your test incarnation.
8183
* It must create a Publisher, which simply forwards all stream elements from its upstream
8284
* to its downstream. It must be able to internally buffer the given number of elements.
85+
*
86+
* @param bufferSize number of elements the processor is required to be able to buffer
8387
*/
8488
public abstract Processor<T, T> createIdentityProcessor(int bufferSize);
8589

8690
/**
8791
* Helper method required for running the Publisher rules against a Publisher.
8892
* It must create a Publisher for a stream with exactly the given number of elements.
89-
* If `elements` is zero the produced stream must be infinite.
93+
* If {@code elements} is {@code Long.MAX_VALUE} the produced stream must be infinite.
9094
* The stream must not produce the same element twice (in case of an infinite stream this requirement
9195
* is relaxed to only apply to the elements that are actually requested during all tests).
9296
*/
93-
public abstract Publisher<T> createHelperPublisher(int elements);
97+
public abstract Publisher<T> createHelperPublisher(long elements);
9498

9599
/**
96100
* Return a Publisher in {@code completed} state in order to run additional tests on it,
@@ -112,22 +116,19 @@ public void setUp() throws Exception {
112116
}
113117

114118
////////////////////// PUBLISHER RULES VERIFICATION ///////////////////////////
115-
// 4.1
116-
// A Processor represents a processing stage—which is both a Subscriber and a Publisher
117-
// It MUST obey the contracts of both [1]
118-
119119

120-
// A Publisher
121-
// must obey all Publisher rules on its producing side
122-
public Publisher<T> createPublisher(int elements) {
123-
Processor<T, T> processor = createIdentityProcessor(testBufferSize);
120+
// A Processor
121+
// must obey all Publisher rules on its publishing side
122+
public Publisher<T> createPublisher(long elements) {
123+
Processor<T, T> processor = createIdentityProcessor(processorBufferSize);
124124
Publisher<T> pub = createHelperPublisher(elements);
125125
pub.subscribe(processor);
126126
return processor; // we run the PublisherVerification against this
127127
}
128128

129129
/////////////////////// DELEGATED TESTS, A PROCESSOR "IS A" PUBLISHER //////////////////////
130-
130+
// Verifies rule: https://github.com/reactive-streams/reactive-streams#4.1
131+
131132
@Test
132133
public void createPublisher3MustProduceAStreamOfExactly3Elements() throws Throwable {
133134
publisherVerification.createPublisher3MustProduceAStreamOfExactly3Elements();
@@ -144,7 +145,7 @@ public void spec102_maySignalLessThanRequestedAndTerminateSubscription() throws
144145
}
145146

146147
@Test
147-
public void spec103_mustSignalOnMethodsSequentially() throws Exception {
148+
public void spec103_mustSignalOnMethodsSequentially() throws Throwable {
148149
publisherVerification.spec103_mustSignalOnMethodsSequentially();
149150
}
150151

@@ -213,11 +214,11 @@ public void spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSu
213214
publisherVerification.spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber();
214215
}
215216

216-
// A Processor
217-
// must call `onError` on all its subscribers if it encounters a non-recoverable error
217+
// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.4
218+
// for multiple subscribers
218219
@Test
219220
public void spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError() throws Exception {
220-
new TestSetup(env, testBufferSize) {{
221+
new TestSetup(env, processorBufferSize) {{
221222
ManualSubscriberWithErrorCollection<T> sub1 = new ManualSubscriberWithErrorCollection<T>(env);
222223
env.subscribe(processor, sub1);
223224
ManualSubscriberWithErrorCollection<T> sub2 = new ManualSubscriberWithErrorCollection<T>(env);
@@ -242,11 +243,12 @@ public void spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecovera
242243
}
243244

244245
////////////////////// SUBSCRIBER RULES VERIFICATION ///////////////////////////
246+
// Verifies rule: https://github.com/reactive-streams/reactive-streams#4.1
245247

246248
// A Processor
247249
// must obey all Subscriber rules on its consuming side
248250
public Subscriber<T> createSubscriber(final SubscriberVerification.SubscriberProbe<T> probe) {
249-
Processor<T, T> processor = createIdentityProcessor(testBufferSize);
251+
Processor<T, T> processor = createIdentityProcessor(processorBufferSize);
250252
processor.subscribe(
251253
new Subscriber<T>() {
252254
public void onSubscribe(final Subscription subscription) {
@@ -288,7 +290,7 @@ public void onError(Throwable cause) {
288290
// must cancel its upstream Subscription if its last downstream Subscription has been cancelled
289291
@Test
290292
public void mustCancelItsUpstreamSubscriptionIfItsLastDownstreamSubscriptionHasBeenCancelled() throws Exception {
291-
new TestSetup(env, testBufferSize) {{
293+
new TestSetup(env, processorBufferSize) {{
292294
ManualSubscriber<T> sub = newSubscriber();
293295
sub.cancel();
294296
expectCancelling();
@@ -301,7 +303,7 @@ public void mustCancelItsUpstreamSubscriptionIfItsLastDownstreamSubscriptionHasB
301303
// must immediately pass on `onError` events received from its upstream to its downstream
302304
@Test
303305
public void mustImmediatelyPassOnOnErrorEventsReceivedFromItsUpstreamToItsDownstream() throws Exception {
304-
new TestSetup(env, testBufferSize) {{
306+
new TestSetup(env, processorBufferSize) {{
305307
ManualSubscriberWithErrorCollection<T> sub = new ManualSubscriberWithErrorCollection<T>(env);
306308
env.subscribe(processor, sub);
307309

@@ -317,7 +319,7 @@ public void mustImmediatelyPassOnOnErrorEventsReceivedFromItsUpstreamToItsDownst
317319
// must be prepared to receive incoming elements from its upstream even if a downstream subscriber has not requested anything yet
318320
@Test
319321
public void mustBePreparedToReceiveIncomingElementsFromItsUpstreamEvenIfADownstreamSubscriberHasNotRequestedYet() throws Exception {
320-
new TestSetup(env, testBufferSize) {{
322+
new TestSetup(env, processorBufferSize) {{
321323
ManualSubscriber<T> sub = newSubscriber();
322324
final T x = sendNextTFromUpstream();
323325
sub.expectNone(50);
@@ -337,6 +339,7 @@ public void mustBePreparedToReceiveIncomingElementsFromItsUpstreamEvenIfADownstr
337339
}
338340

339341
/////////////////////// DELEGATED TESTS, A PROCESSOR "IS A" SUBSCRIBER //////////////////////
342+
// Verifies rule: https://github.com/reactive-streams/reactive-streams#4.1
340343

341344
@Test
342345
public void exerciseHappyPath() throws Throwable {
@@ -520,10 +523,11 @@ public void spec317_mustSupportAPendingElementCountUpToLongMaxValue() throws Thr
520523

521524
/////////////////////// ADDITIONAL "COROLLARY" TESTS //////////////////////
522525

523-
// trigger `requestFromUpstream` for elements that have been requested 'long ago'
526+
// A Processor
527+
// must trigger `requestFromUpstream` for elements that have been requested 'long ago'
524528
@Test
525529
public void mustRequestFromUpstreamForElementsThatHaveBeenRequestedLongAgo() throws Exception {
526-
new TestSetup(env, testBufferSize) {{
530+
new TestSetup(env, processorBufferSize) {{
527531
ManualSubscriber<T> sub1 = newSubscriber();
528532
sub1.request(20);
529533

@@ -566,19 +570,20 @@ public void mustRequestFromUpstreamForElementsThatHaveBeenRequestedLongAgo() thr
566570
}};
567571
}
568572

569-
// unblock the stream if a 'blocking' subscription has been cancelled
573+
// A Processor
574+
// must unblock the stream if a 'blocking' subscription has been cancelled
570575
@Test
571576
@SuppressWarnings("unchecked")
572577
public void mustUnblockTheStreamIfABlockingSubscriptionHasBeenCancelled() throws InterruptedException {
573-
new TestSetup(env, testBufferSize) {{
578+
new TestSetup(env, processorBufferSize) {{
574579
ManualSubscriber<T> sub1 = newSubscriber();
575580
ManualSubscriber<T> sub2 = newSubscriber();
576581

577-
sub1.request(testBufferSize + 1);
582+
sub1.request(processorBufferSize + 1);
578583
long pending = 0;
579584
int sent = 0;
580-
final T[] tees = (T[]) new Object[testBufferSize];
581-
while (sent < testBufferSize) {
585+
final T[] tees = (T[]) new Object[processorBufferSize];
586+
while (sent < processorBufferSize) {
582587
if (pending == 0) {
583588
pending = expectRequest();
584589
}
@@ -588,7 +593,7 @@ public void mustUnblockTheStreamIfABlockingSubscriptionHasBeenCancelled() throws
588593
pending -= 1;
589594
}
590595

591-
expectNoRequest(); // because we only have buffer size testBufferSize and sub2 hasn't seen the first value yet
596+
expectNoRequest(); // because we only have buffer size processorBufferSize and sub2 hasn't seen the first value yet
592597
sub2.cancel(); // must "unblock"
593598

594599
expectRequest();
@@ -606,16 +611,14 @@ public void mustUnblockTheStreamIfABlockingSubscriptionHasBeenCancelled() throws
606611
/////////////////////// TEST INFRASTRUCTURE //////////////////////
607612

608613
public abstract class TestSetup extends ManualPublisher<T> {
609-
private ManualSubscriber<T> tees; // gives us access to an infinite stream of T values
614+
final private ManualSubscriber<T> tees; // gives us access to an infinite stream of T values
610615
private Set<T> seenTees = new HashSet<T>();
611616

612617
final Processor<T, T> processor;
613-
final int testBufferSize;
614618

615619
public TestSetup(TestEnvironment env, int testBufferSize) throws InterruptedException {
616620
super(env);
617-
this.testBufferSize = testBufferSize;
618-
tees = env.newManualSubscriber(createHelperPublisher(0));
621+
tees = env.newManualSubscriber(createHelperPublisher(Long.MAX_VALUE));
619622
processor = createIdentityProcessor(testBufferSize);
620623
subscribe(processor);
621624
}

0 commit comments

Comments
 (0)