Skip to content

Commit e01c0fb

Browse files
committed
Fixes #210 by removing 1.12 and repurposing its TCK checks for 1.09
1 parent ba6d042 commit e01c0fb

File tree

6 files changed

+73
-74
lines changed

6 files changed

+73
-74
lines changed

README.md

+2-3
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,7 @@ public interface Publisher<T> {
9090
| <a name="1.9">9</a> | Calling `Publisher.subscribe` MUST return normally except when the provided `Subscriber` is `null` in which case it MUST throw a `java.lang.NullPointerException` to the caller, for all other situations the only legal way to signal failure (or reject a `Subscriber`) is via the `onError` method. |
9191
| <a name="1.10">10</a> | `Publisher.subscribe` MAY be called as many times as wanted but MUST be with a different `Subscriber` each time [see [2.12](#2.12)]. |
9292
| <a name="1.11">11</a> | A `Publisher` MAY support multiple `Subscriber`s and decides whether each `Subscription` is unicast or multicast. |
93-
| <a name="1.12">12</a> | A `Publisher` MAY reject calls to its `subscribe` method if it is unable or unwilling to serve them [[1](#footnote-1-1)]. If rejecting it MUST do this by calling `onError` on the `Subscriber` passed to `Publisher.subscribe` instead of calling `onSubscribe`. |
94-
| <a name="1.13">13</a> | A `Publisher` MUST produce the same elements, starting with the oldest element still available, in the same sequence for all its subscribers and MAY produce the stream elements at (temporarily) differing rates to different subscribers. |
93+
| <a name="1.12">12</a> | A `Publisher` MUST produce the same elements, starting with the oldest element still available, in the same sequence for all its subscribers and MAY produce the stream elements at (temporarily) differing rates to different subscribers. |
9594

9695
[<a name="footnote-1-1">1</a>] : A stateful Publisher can be overwhelmed, bounded by a finite number of underlying resources, exhausted, shut-down or in a failed state.
9796

@@ -148,7 +147,7 @@ public interface Subscription {
148147
| <a name="3.11">11</a> | While the `Subscription` is not cancelled, `Subscription.request(long n)` MAY synchronously call `onComplete` or `onError` on this (or other) subscriber(s). |
149148
| <a name="3.12">12</a> | While the `Subscription` is not cancelled, `Subscription.cancel()` MUST request the `Publisher` to eventually stop signaling its `Subscriber`. The operation is NOT REQUIRED to affect the `Subscription` immediately. |
150149
| <a name="3.13">13</a> | While the `Subscription` is not cancelled, `Subscription.cancel()` MUST request the `Publisher` to eventually drop any references to the corresponding subscriber. Re-subscribing with the same `Subscriber` object is discouraged [see [2.12](#2.12)], but this specification does not mandate that it is disallowed since that would mean having to store previously cancelled subscriptions indefinitely. |
151-
| <a name="3.14">14</a> | While the `Subscription` is not cancelled, calling `Subscription.cancel` MAY cause the `Publisher`, if stateful, to transition into the `shut-down` state if no other `Subscription` exists at this point [see [1.13](#1.13)].
150+
| <a name="3.14">14</a> | While the `Subscription` is not cancelled, calling `Subscription.cancel` MAY cause the `Publisher`, if stateful, to transition into the `shut-down` state if no other `Subscription` exists at this point [see [1.12](#1.12)].
152151
| <a name="3.15">15</a> | Calling `Subscription.cancel` MUST return normally. The only legal way to signal failure to a `Subscriber` is via the `onError` method. |
153152
| <a name="3.16">16</a> | Calling `Subscription.request` MUST return normally. The only legal way to signal failure to a `Subscriber` is via the `onError` method. |
154153
| <a name="3.17">17</a> | A `Subscription` MUST support an unbounded number of calls to request and MUST support a demand (sum requested - sum delivered) up to 2^63-1 (`java.lang.Long.MAX_VALUE`). A demand equal or greater than 2^63-1 (`java.lang.Long.MAX_VALUE`) MAY be considered by the `Publisher` as “effectively unbounded”[[1](#footnote-3-1)]. |

examples/src/main/java/org/reactivestreams/example/unicast/AsyncIterablePublisher.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ private void doSubscribe() {
106106
if (iterator == null)
107107
iterator = Collections.<T>emptyList().iterator(); // So we can assume that `iterator` is never null
108108
} catch(final Throwable t) {
109-
terminateDueTo(t); // Here we send onError, obeying rule 1.12
109+
terminateDueTo(t); // Here we send onError, obeying rule 1.09
110110
}
111111

112112
if (!cancelled) {
@@ -177,7 +177,7 @@ private void terminateDueTo(final Throwable t) {
177177
cancelled = true; // When we signal onError, the subscription must be considered as cancelled, as per rule 1.6
178178
try {
179179
subscriber.onError(t); // Then we signal the error downstream, to the `Subscriber`
180-
} catch(final Throwable t2) { // If `onError` throws an exception, this is a spec violation according to rule 1.13, and all we can do is to log it.
180+
} catch(final Throwable t2) { // If `onError` throws an exception, this is a spec violation according to rule 1.12, and all we can do is to log it.
181181
(new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t2)).printStackTrace(System.err);
182182
}
183183
}

tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java

+13-13
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import java.util.HashSet;
1818
import java.util.Set;
1919

20-
public abstract class IdentityProcessorVerification<T> extends WithHelperPublisher<T>
20+
public abstract class IdentityProcessorVerification<T> extends WithHelperPublisher<T>
2121
implements SubscriberWhiteboxVerificationRules, PublisherVerificationRules {
2222

2323
private final TestEnvironment env;
@@ -276,6 +276,11 @@ public void required_spec109_subscribeThrowNPEOnNullSubscriber() throws Throwabl
276276
publisherVerification.required_spec109_subscribeThrowNPEOnNullSubscriber();
277277
}
278278

279+
@Override @Test
280+
public void required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe() throws Throwable {
281+
publisherVerification.required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe();
282+
}
283+
279284
@Override @Test
280285
public void untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable {
281286
publisherVerification.untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice();
@@ -287,23 +292,18 @@ public void optional_spec111_maySupportMultiSubscribe() throws Throwable {
287292
}
288293

289294
@Override @Test
290-
public void required_spec112_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe() throws Throwable {
291-
publisherVerification.required_spec112_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe();
295+
public void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable {
296+
publisherVerification.required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne();
292297
}
293298

294299
@Override @Test
295-
public void required_spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable {
296-
publisherVerification.required_spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne();
300+
public void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable {
301+
publisherVerification.required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront();
297302
}
298303

299304
@Override @Test
300-
public void required_spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable {
301-
publisherVerification.required_spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront();
302-
}
303-
304-
@Override @Test
305-
public void required_spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable {
306-
publisherVerification.required_spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected();
305+
public void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable {
306+
publisherVerification.required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected();
307307
}
308308

309309
@Override @Test
@@ -373,7 +373,7 @@ public void required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue()
373373

374374
// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.4
375375
// for multiple subscribers
376-
@Test
376+
@Test
377377
public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError() throws Throwable {
378378
optionalMultipleSubscribersTest(2, new Function<Long,TestSetup>() {
379379
@Override

tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java

+29-29
Original file line numberDiff line numberDiff line change
@@ -453,29 +453,9 @@ public void run(Publisher<T> pub) throws Throwable {
453453
});
454454
}
455455

456-
// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.10
457-
@Override @Test
458-
public void untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable {
459-
notVerified(); // can we meaningfully test this?
460-
}
461-
462-
// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.11
456+
// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.09
463457
@Override @Test
464-
public void optional_spec111_maySupportMultiSubscribe() throws Throwable {
465-
optionalActivePublisherTest(1, false, new PublisherTestRun<T>() {
466-
@Override
467-
public void run(Publisher<T> pub) throws Throwable {
468-
ManualSubscriber<T> sub1 = env.newManualSubscriber(pub);
469-
ManualSubscriber<T> sub2 = env.newManualSubscriber(pub);
470-
471-
env.verifyNoAsyncErrors();
472-
}
473-
});
474-
}
475-
476-
// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.12
477-
@Override @Test
478-
public void required_spec112_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe() throws Throwable {
458+
public void required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe() throws Throwable {
479459
whenHasErrorPublisherTest(new PublisherTestRun<T>() {
480460
@Override
481461
public void run(Publisher<T> pub) throws Throwable {
@@ -496,13 +476,33 @@ public void onSubscribe(Subscription subs) {
496476
onErrorLatch.assertClosed("Should have received onError");
497477

498478
env.verifyNoAsyncErrors();
499-
}
479+
}
500480
});
501481
}
502482

503-
// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.13
483+
// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.10
484+
@Override @Test
485+
public void untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable {
486+
notVerified(); // can we meaningfully test this?
487+
}
488+
489+
// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.11
504490
@Override @Test
505-
public void required_spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable {
491+
public void optional_spec111_maySupportMultiSubscribe() throws Throwable {
492+
optionalActivePublisherTest(1, false, new PublisherTestRun<T>() {
493+
@Override
494+
public void run(Publisher<T> pub) throws Throwable {
495+
ManualSubscriber<T> sub1 = env.newManualSubscriber(pub);
496+
ManualSubscriber<T> sub2 = env.newManualSubscriber(pub);
497+
498+
env.verifyNoAsyncErrors();
499+
}
500+
});
501+
}
502+
503+
// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.12
504+
@Override @Test
505+
public void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable {
506506
optionalActivePublisherTest(5, true, new PublisherTestRun<T>() { // This test is skipped if the publisher is unbounded (never sends onComplete)
507507
@Override
508508
public void run(Publisher<T> pub) throws InterruptedException {
@@ -551,9 +551,9 @@ public void run(Publisher<T> pub) throws InterruptedException {
551551
});
552552
}
553553

554-
// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.13
554+
// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.12
555555
@Override @Test
556-
public void required_spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable {
556+
public void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable {
557557
optionalActivePublisherTest(3, false, new PublisherTestRun<T>() { // This test is skipped if the publisher cannot produce enough elements
558558
@Override
559559
public void run(Publisher<T> pub) throws Throwable {
@@ -584,9 +584,9 @@ public void run(Publisher<T> pub) throws Throwable {
584584
});
585585
}
586586

587-
// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.13
587+
// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.12
588588
@Override @Test
589-
public void required_spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable {
589+
public void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable {
590590
optionalActivePublisherTest(3, true, new PublisherTestRun<T>() { // This test is skipped if the publisher is unbounded (never sends onComplete)
591591
@Override
592592
public void run(Publisher<T> pub) throws Throwable {

tck/src/main/java/org/reactivestreams/tck/support/PublisherVerificationRules.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,12 @@ public interface PublisherVerificationRules {
2222
void untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals() throws Throwable;
2323
void untested_spec109_subscribeShouldNotThrowNonFatalThrowable() throws Throwable;
2424
void required_spec109_subscribeThrowNPEOnNullSubscriber() throws Throwable;
25+
void required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe() throws Throwable;
2526
void untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable;
2627
void optional_spec111_maySupportMultiSubscribe() throws Throwable;
27-
void required_spec112_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe() throws Throwable;
28-
void required_spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable;
29-
void required_spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable;
30-
void required_spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable;
28+
void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable;
29+
void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable;
30+
void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable;
3131
void required_spec302_mustAllowSynchronousRequestCallsFromOnNextAndOnSubscribe() throws Throwable;
3232
void required_spec303_mustNotAllowUnboundedRecursion() throws Throwable;
3333
void untested_spec304_requestShouldNotPerformHeavyComputations() throws Exception;

0 commit comments

Comments
 (0)