Skip to content

Commit 46cdc0a

Browse files
committed
Modifies rules 1.09 and 2.13 to mandate java.lang.NullPointerException be thrown.
Updates the TCK, Spec and example implementations.
1 parent bf8ea54 commit 46cdc0a

11 files changed

+158
-5
lines changed

README.md

+3-2
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ onError | (onSubscribe onNext* (onError | onComplete)?)
6565
- The specifications below use binding words in capital letters from https://www.ietf.org/rfc/rfc2119.txt
6666
- The terms `emit`, `signal` or `send` are interchangeable. The specifications below will use `signal`.
6767
- The terms `synchronously` or `synchronous` refer to executing in the calling `Thread`.
68+
- The term "return normally" means "only throws exceptions that are explicitly allowed by the rule".
6869

6970
### SPECIFICATION
7071

@@ -86,7 +87,7 @@ public interface Publisher<T> {
8687
| <a name="1.6">6</a> | If a `Publisher` signals either `onError` or `onComplete` on a `Subscriber`, that `Subscriber`’s `Subscription` MUST be considered cancelled. |
8788
| <a name="1.7">7</a> | Once a terminal state has been signaled (`onError`, `onComplete`) it is REQUIRED that no further signals occur. |
8889
| <a name="1.8">8</a> | If a `Subscription` is cancelled its `Subscriber` MUST eventually stop being signaled. |
89-
| <a name="1.9">9</a> | Calling `Publisher.subscribe` MUST return normally. The only legal way to signal failure (or reject a `Subscriber`) is via the `onError` method. |
90+
| <a name="1.9">9</a> | Calling `Publisher.subscribe` MUST return normally except in the case where the provided `Subscriber` is `null` in which case it MUST throw a `java.lang.NullPointerException` to the caller, this aside the only legal way to signal failure (or reject a `Subscriber`) is via the `onError` method. |
9091
| <a name="1.10">10</a> | `Publisher.subscribe` MAY be called as many times as wanted but MUST be with a different `Subscriber` each time [see [2.12](#2.12)]. |
9192
| <a name="1.11">11</a> | A `Publisher` MAY support multi-subscribe and choose whether each `Subscription` is unicast or multicast. |
9293
| <a name="1.12">12</a> | A `Publisher` MAY reject calls to its `subscribe` method if it is unable or unwilling to serve them [[1](#footnote-1-1)]. If rejecting it MUST do this by calling `onError` on the `Subscriber` passed to `Publisher.subscribe` instead of calling `onSubscribe`. |
@@ -119,7 +120,7 @@ public interface Subscriber<T> {
119120
| <a name="2.10">10</a> | A `Subscriber` MUST be prepared to receive an `onError` signal with or without a preceding `Subscription.request(long n)` call. |
120121
| <a name="2.11">11</a> | A `Subscriber` MUST make sure that all calls on its `onXXX` methods happen-before [[1]](#footnote-2-1) the processing of the respective signals. I.e. the Subscriber must take care of properly publishing the signal to its processing logic. |
121122
| <a name="2.12">12</a> | `Subscriber.onSubscribe` MUST be called at most once for a given `Subscriber` (based on object equality). |
122-
| <a name="2.13">13</a> | Calling `onSubscribe`, `onNext`, `onError` or `onComplete` MUST return normally. The only legal way for a `Subscriber` to signal failure is by cancelling its `Subscription`. In the case that this rule is violated, any associated `Subscription` to the `Subscriber` MUST be considered as cancelled, and the caller MUST raise this error condition in a fashion that is adequate for the runtime environment. |
123+
| <a name="2.13">13</a> | Calling `onSubscribe`, `onNext`, `onError` or `onComplete` MUST return normally except in the case where any provided parameter is `null` in which case it MUST throw a `java.lang.NullPointerException` to the caller, this aside the only legal way for a `Subscriber` to signal failure is by cancelling its `Subscription`. In the case that this rule is violated, any associated `Subscription` to the `Subscriber` MUST be considered as cancelled, and the caller MUST raise this error condition in a fashion that is adequate for the runtime environment. |
123124
124125
[<a name="footnote-2-1">1</a>] : See JMM definition of Happen-Before in section 17.4.5. on http://docs.oracle.com/javase/specs/jls/se7/html/jls-17.html
125126

examples/src/main/java/org/reactivestreams/example/unicast/AsyncIterablePublisher.java

+1
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ final class SubscriptionImpl implements Subscription, Runnable {
6666
private Iterator<T> iterator; // This is our cursor into the data stream, which we will send to the `Subscriber`
6767

6868
SubscriptionImpl(final Subscriber<? super T> subscriber) {
69+
// As per rule 1.09, we need to throw a `java.lang.NullPointerException` if the `Subscriber` is `null`
6970
if (subscriber == null) throw null;
7071
this.subscriber = subscriber;
7172
}

examples/src/main/java/org/reactivestreams/example/unicast/AsyncSubscriber.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -143,15 +143,23 @@ private void handleOnError(final Throwable error) {
143143
// We implement the OnX methods on `Subscriber` to send Signals that we will process asycnhronously, but only one at a time
144144

145145
@Override public final void onSubscribe(final Subscription s) {
146+
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Subscription` is `null`
147+
if (s == null) throw null;
146148
signal(new OnSubscribe(s));
147149
}
148150

149151
@Override public final void onNext(final T element) {
152+
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `element` is `null`
153+
if (element == null) throw null;
154+
150155
signal(new OnNext<T>(element));
151156
}
152157

153158
@Override public final void onError(final Throwable t) {
154-
signal(new OnError(t));
159+
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Throwable` is `null`
160+
if (t == null) throw null;
161+
162+
signal(new OnError(t));
155163
}
156164

157165
@Override public final void onComplete() {

examples/src/main/java/org/reactivestreams/example/unicast/SyncSubscriber.java

+8
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ public abstract class SyncSubscriber<T> implements Subscriber<T> {
1515
private boolean done = false;
1616

1717
@Override public void onSubscribe(final Subscription s) {
18+
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Subscription` is `null`
19+
if (s == null) throw null;
20+
1821
if (subscription != null) { // If someone has made a mistake and added this Subscriber multiple times, let's handle it gracefully
1922
try {
2023
s.cancel(); // Cancel the additional subscription
@@ -38,6 +41,9 @@ public abstract class SyncSubscriber<T> implements Subscriber<T> {
3841
}
3942

4043
@Override public void onNext(final T element) {
44+
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `element` is `null`
45+
if (element == null) throw null;
46+
4147
if (!done) { // If we aren't already done
4248
try {
4349
if (foreach(element)) {
@@ -80,6 +86,8 @@ private void done() {
8086
protected abstract boolean foreach(final T element);
8187

8288
@Override public void onError(final Throwable t) {
89+
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Throwable` is `null`
90+
if (t == null) throw null;
8391
// Here we are not allowed to call any methods on the `Subscription` or the `Publisher`, as per rule 2.3
8492
// And anyway, the `Subscription` is considered to be cancelled if this method gets called, as per rule 2.4
8593
}

tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java

+10
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,11 @@ public void untested_spec109_subscribeShouldNotThrowNonFatalThrowable() throws T
271271
publisherVerification.untested_spec109_subscribeShouldNotThrowNonFatalThrowable();
272272
}
273273

274+
@Override @Test
275+
public void required_spec109_subscribeThrowNPEOnNullSubscriber() throws Throwable {
276+
publisherVerification.required_spec109_subscribeThrowNPEOnNullSubscriber();
277+
}
278+
274279
@Override @Test
275280
public void untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable {
276281
publisherVerification.untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice();
@@ -558,6 +563,11 @@ public void untested_spec213_failingOnSignalInvocation() throws Exception {
558563
subscriberVerification.untested_spec213_failingOnSignalInvocation();
559564
}
560565

566+
@Override @Test
567+
public void required_spec213_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
568+
subscriberVerification.required_spec213_mustThrowNullPointerExceptionWhenParametersAreNull();
569+
}
570+
561571
@Override @Test
562572
public void untested_spec301_mustNotBeCalledOutsideSubscriberContext() throws Exception {
563573
subscriberVerification.untested_spec301_mustNotBeCalledOutsideSubscriberContext();

tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java

+16-2
Original file line numberDiff line numberDiff line change
@@ -432,10 +432,24 @@ public void untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnError
432432
notVerified(); // can we meaningfully test this?
433433
}
434434

435-
// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.9
436435
@Override @Test
437436
public void untested_spec109_subscribeShouldNotThrowNonFatalThrowable() throws Throwable {
438-
notVerified(); // cannot be meaningfully tested, or can it?
437+
notVerified(); // can we meaningfully test this?
438+
}
439+
440+
// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.9
441+
@Override @Test
442+
public void required_spec109_subscribeThrowNPEOnNullSubscriber() throws Throwable {
443+
optionalActivePublisherTest(1, true, new PublisherTestRun<T>() {
444+
@Override
445+
public void run(Publisher<T> pub) throws Throwable {
446+
try {
447+
pub.subscribe(null);
448+
env.flop(String.format("Publisher (%s) did not throw a NullPointerException when given a null Subscribe in subscribe", pub));
449+
} catch (NullPointerException npe) { }
450+
env.verifyNoAsyncErrors();
451+
}
452+
});
439453
}
440454

441455
// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.10

tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java

+55
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,14 @@
1414
import org.testng.annotations.BeforeMethod;
1515
import org.testng.annotations.Test;
1616

17+
import java.lang.NullPointerException;
18+
import java.lang.Override;
19+
import java.lang.RuntimeException;
1720
import java.util.concurrent.ExecutorService;
1821
import java.util.concurrent.Executors;
1922

2023
import static org.reactivestreams.tck.SubscriberWhiteboxVerification.BlackboxSubscriberProxy;
24+
import static org.testng.Assert.assertTrue;
2125

2226
/**
2327
* Provides tests for verifying {@link org.reactivestreams.Subscriber} and {@link org.reactivestreams.Subscription}
@@ -310,6 +314,57 @@ public void untested_spec213_blackbox_failingOnSignalInvocation() throws Excepti
310314
notVerified(); // cannot be meaningfully tested, or can it?
311315
}
312316

317+
// Verifies rule: https://github.com/reactive-streams/reactive-streams#2.13
318+
@Override @Test
319+
public void required_spec213_blackbox_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
320+
blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
321+
@Override
322+
public void run(BlackboxTestStage stage) throws Throwable {
323+
final Subscription subscription = new Subscription() {
324+
@Override public void request(final long elements) {}
325+
@Override public void cancel() {}
326+
};
327+
328+
{
329+
final Subscriber<T> sub = createSubscriber();
330+
boolean gotNPE = false;
331+
try {
332+
sub.onSubscribe(null);
333+
} catch(final NullPointerException expected) {
334+
gotNPE = true;
335+
}
336+
assertTrue(gotNPE, "onSubscribe(null) did not throw NullPointerException");
337+
}
338+
339+
{
340+
final Subscriber<T> sub = createSubscriber();
341+
boolean gotNPE = false;
342+
sub.onSubscribe(subscription);
343+
try {
344+
sub.onNext(null);
345+
} catch(final NullPointerException expected) {
346+
gotNPE = true;
347+
}
348+
assertTrue(gotNPE, "onNext(null) did not throw NullPointerException");
349+
}
350+
351+
{
352+
final Subscriber<T> sub = createSubscriber();
353+
boolean gotNPE = false;
354+
sub.onSubscribe(subscription);
355+
try {
356+
sub.onError(null);
357+
} catch(final NullPointerException expected) {
358+
gotNPE = true;
359+
}
360+
assertTrue(gotNPE, "onError(null) did not throw NullPointerException");
361+
}
362+
363+
env.verifyNoAsyncErrors();
364+
}
365+
});
366+
}
367+
313368
////////////////////// SUBSCRIPTION SPEC RULE VERIFICATION //////////////////
314369

315370
// Verifies rule: https://github.com/reactive-streams/reactive-streams#3.1

tck/src/main/java/org/reactivestreams/tck/SubscriberWhiteboxVerification.java

+53
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,59 @@ public void untested_spec213_failingOnSignalInvocation() throws Exception {
343343
notVerified(); // cannot be meaningfully tested, or can it?
344344
}
345345

346+
// Verifies rule: https://github.com/reactive-streams/reactive-streams#2.13
347+
@Override @Test
348+
public void required_spec213_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
349+
subscriberTest(new TestStageTestRun() {
350+
@Override
351+
public void run(WhiteboxTestStage stage) throws Throwable {
352+
353+
final Subscription subscription = new Subscription() {
354+
@Override public void request(final long elements) {}
355+
@Override public void cancel() {}
356+
};
357+
358+
{
359+
final Subscriber<T> sub = createSubscriber(stage.probe());
360+
boolean gotNPE = false;
361+
try {
362+
sub.onSubscribe(null);
363+
} catch(final NullPointerException expected) {
364+
gotNPE = true;
365+
}
366+
assertTrue(gotNPE, "onSubscribe(null) did not throw NullPointerException");
367+
}
368+
369+
{
370+
final Subscriber<T> sub = createSubscriber(stage.probe());
371+
boolean gotNPE = false;
372+
sub.onSubscribe(subscription);
373+
try {
374+
sub.onNext(null);
375+
} catch(final NullPointerException expected) {
376+
gotNPE = true;
377+
}
378+
assertTrue(gotNPE, "onNext(null) did not throw NullPointerException");
379+
}
380+
381+
{
382+
final Subscriber<T> sub = createSubscriber(stage.probe());
383+
boolean gotNPE = false;
384+
sub.onSubscribe(subscription);
385+
try {
386+
sub.onError(null);
387+
} catch(final NullPointerException expected) {
388+
gotNPE = true;
389+
}
390+
assertTrue(gotNPE, "onError(null) did not throw NullPointerException");
391+
}
392+
393+
env.verifyNoAsyncErrors();
394+
}
395+
});
396+
}
397+
398+
346399
////////////////////// SUBSCRIPTION SPEC RULE VERIFICATION //////////////////
347400

348401
// Verifies rule: https://github.com/reactive-streams/reactive-streams#3.1

tck/src/main/java/org/reactivestreams/tck/support/PublisherVerificationRules.java

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ public interface PublisherVerificationRules {
2121
void untested_spec107_mustNotEmitFurtherSignalsOnceOnErrorHasBeenSignalled() throws Throwable;
2222
void untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals() throws Throwable;
2323
void untested_spec109_subscribeShouldNotThrowNonFatalThrowable() throws Throwable;
24+
void required_spec109_subscribeThrowNPEOnNullSubscriber() throws Throwable;
2425
void untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable;
2526
void optional_spec111_maySupportMultiSubscribe() throws Throwable;
2627
void required_spec112_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe() throws Throwable;

tck/src/main/java/org/reactivestreams/tck/support/SubscriberBlackboxVerificationRules.java

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ public interface SubscriberBlackboxVerificationRules {
2020
void untested_spec211_blackbox_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() throws Exception;
2121
void untested_spec212_blackbox_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality() throws Throwable;
2222
void untested_spec213_blackbox_failingOnSignalInvocation() throws Exception;
23+
void required_spec213_blackbox_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable;
2324
void untested_spec301_blackbox_mustNotBeCalledOutsideSubscriberContext() throws Exception;
2425
void required_spec308_blackbox_requestMustRegisterGivenNumberElementsToBeProduced() throws Throwable;
2526
void untested_spec310_blackbox_requestMaySynchronouslyCallOnNextOnSubscriber() throws Exception;

tck/src/main/java/org/reactivestreams/tck/support/SubscriberWhiteboxVerificationRules.java

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ public interface SubscriberWhiteboxVerificationRules {
2222
void untested_spec211_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() throws Exception;
2323
void untested_spec212_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality_specViolation() throws Throwable;
2424
void untested_spec213_failingOnSignalInvocation() throws Exception;
25+
void required_spec213_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable;
2526
void untested_spec301_mustNotBeCalledOutsideSubscriberContext() throws Exception;
2627
void required_spec308_requestMustRegisterGivenNumberElementsToBeProduced() throws Throwable;
2728
void untested_spec310_requestMaySynchronouslyCallOnNextOnSubscriber() throws Exception;

0 commit comments

Comments
 (0)