Skip to content

Commit 9333df9

Browse files
committed
Merge pull request #209 from reactive-streams/wip-204-spec-nulls-√
Modifies rules 1.09 and 2.13 to mandate `java.lang.NullPointerException`...
2 parents bf8ea54 + e2df130 commit 9333df9

12 files changed

+258
-9
lines changed

README.md

+3-2
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ onError | (onSubscribe onNext* (onError | onComplete)?)
6565
- The specifications below use binding words in capital letters from https://www.ietf.org/rfc/rfc2119.txt
6666
- The terms `emit`, `signal` or `send` are interchangeable. The specifications below will use `signal`.
6767
- The terms `synchronously` or `synchronous` refer to executing in the calling `Thread`.
68+
- The term "return normally" means "only throws exceptions that are explicitly allowed by the rule".
6869

6970
### SPECIFICATION
7071

@@ -86,7 +87,7 @@ public interface Publisher<T> {
8687
| <a name="1.6">6</a> | If a `Publisher` signals either `onError` or `onComplete` on a `Subscriber`, that `Subscriber`’s `Subscription` MUST be considered cancelled. |
8788
| <a name="1.7">7</a> | Once a terminal state has been signaled (`onError`, `onComplete`) it is REQUIRED that no further signals occur. |
8889
| <a name="1.8">8</a> | If a `Subscription` is cancelled its `Subscriber` MUST eventually stop being signaled. |
89-
| <a name="1.9">9</a> | Calling `Publisher.subscribe` MUST return normally. The only legal way to signal failure (or reject a `Subscriber`) is via the `onError` method. |
90+
| <a name="1.9">9</a> | Calling `Publisher.subscribe` MUST return normally except when the provided `Subscriber` is `null` in which case it MUST throw a `java.lang.NullPointerException` to the caller, for all other situations the only legal way to signal failure (or reject a `Subscriber`) is via the `onError` method. |
9091
| <a name="1.10">10</a> | `Publisher.subscribe` MAY be called as many times as wanted but MUST be with a different `Subscriber` each time [see [2.12](#2.12)]. |
9192
| <a name="1.11">11</a> | A `Publisher` MAY support multi-subscribe and choose whether each `Subscription` is unicast or multicast. |
9293
| <a name="1.12">12</a> | A `Publisher` MAY reject calls to its `subscribe` method if it is unable or unwilling to serve them [[1](#footnote-1-1)]. If rejecting it MUST do this by calling `onError` on the `Subscriber` passed to `Publisher.subscribe` instead of calling `onSubscribe`. |
@@ -119,7 +120,7 @@ public interface Subscriber<T> {
119120
| <a name="2.10">10</a> | A `Subscriber` MUST be prepared to receive an `onError` signal with or without a preceding `Subscription.request(long n)` call. |
120121
| <a name="2.11">11</a> | A `Subscriber` MUST make sure that all calls on its `onXXX` methods happen-before [[1]](#footnote-2-1) the processing of the respective signals. I.e. the Subscriber must take care of properly publishing the signal to its processing logic. |
121122
| <a name="2.12">12</a> | `Subscriber.onSubscribe` MUST be called at most once for a given `Subscriber` (based on object equality). |
122-
| <a name="2.13">13</a> | Calling `onSubscribe`, `onNext`, `onError` or `onComplete` MUST return normally. The only legal way for a `Subscriber` to signal failure is by cancelling its `Subscription`. In the case that this rule is violated, any associated `Subscription` to the `Subscriber` MUST be considered as cancelled, and the caller MUST raise this error condition in a fashion that is adequate for the runtime environment. |
123+
| <a name="2.13">13</a> | Calling `onSubscribe`, `onNext`, `onError` or `onComplete` MUST return normally except when any provided parameter is `null` in which case it MUST throw a `java.lang.NullPointerException` to the caller, for all other situations the only legal way for a `Subscriber` to signal failure is by cancelling its `Subscription`. In the case that this rule is violated, any associated `Subscription` to the `Subscriber` MUST be considered as cancelled, and the caller MUST raise this error condition in a fashion that is adequate for the runtime environment. |
123124
124125
[<a name="footnote-2-1">1</a>] : See JMM definition of Happen-Before in section 17.4.5. on http://docs.oracle.com/javase/specs/jls/se7/html/jls-17.html
125126

examples/src/main/java/org/reactivestreams/example/unicast/AsyncIterablePublisher.java

+1
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ final class SubscriptionImpl implements Subscription, Runnable {
6666
private Iterator<T> iterator; // This is our cursor into the data stream, which we will send to the `Subscriber`
6767

6868
SubscriptionImpl(final Subscriber<? super T> subscriber) {
69+
// As per rule 1.09, we need to throw a `java.lang.NullPointerException` if the `Subscriber` is `null`
6970
if (subscriber == null) throw null;
7071
this.subscriber = subscriber;
7172
}

examples/src/main/java/org/reactivestreams/example/unicast/AsyncSubscriber.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -143,15 +143,23 @@ private void handleOnError(final Throwable error) {
143143
// We implement the OnX methods on `Subscriber` to send Signals that we will process asycnhronously, but only one at a time
144144

145145
@Override public final void onSubscribe(final Subscription s) {
146+
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Subscription` is `null`
147+
if (s == null) throw null;
146148
signal(new OnSubscribe(s));
147149
}
148150

149151
@Override public final void onNext(final T element) {
152+
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `element` is `null`
153+
if (element == null) throw null;
154+
150155
signal(new OnNext<T>(element));
151156
}
152157

153158
@Override public final void onError(final Throwable t) {
154-
signal(new OnError(t));
159+
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Throwable` is `null`
160+
if (t == null) throw null;
161+
162+
signal(new OnError(t));
155163
}
156164

157165
@Override public final void onComplete() {

examples/src/main/java/org/reactivestreams/example/unicast/SyncSubscriber.java

+8
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ public abstract class SyncSubscriber<T> implements Subscriber<T> {
1515
private boolean done = false;
1616

1717
@Override public void onSubscribe(final Subscription s) {
18+
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Subscription` is `null`
19+
if (s == null) throw null;
20+
1821
if (subscription != null) { // If someone has made a mistake and added this Subscriber multiple times, let's handle it gracefully
1922
try {
2023
s.cancel(); // Cancel the additional subscription
@@ -38,6 +41,9 @@ public abstract class SyncSubscriber<T> implements Subscriber<T> {
3841
}
3942

4043
@Override public void onNext(final T element) {
44+
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `element` is `null`
45+
if (element == null) throw null;
46+
4147
if (!done) { // If we aren't already done
4248
try {
4349
if (foreach(element)) {
@@ -80,6 +86,8 @@ private void done() {
8086
protected abstract boolean foreach(final T element);
8187

8288
@Override public void onError(final Throwable t) {
89+
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Throwable` is `null`
90+
if (t == null) throw null;
8391
// Here we are not allowed to call any methods on the `Subscription` or the `Publisher`, as per rule 2.3
8492
// And anyway, the `Subscription` is considered to be cancelled if this method gets called, as per rule 2.4
8593
}

tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java

+18
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,11 @@ public void untested_spec109_subscribeShouldNotThrowNonFatalThrowable() throws T
271271
publisherVerification.untested_spec109_subscribeShouldNotThrowNonFatalThrowable();
272272
}
273273

274+
@Override @Test
275+
public void required_spec109_subscribeThrowNPEOnNullSubscriber() throws Throwable {
276+
publisherVerification.required_spec109_subscribeThrowNPEOnNullSubscriber();
277+
}
278+
274279
@Override @Test
275280
public void untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable {
276281
publisherVerification.untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice();
@@ -558,6 +563,19 @@ public void untested_spec213_failingOnSignalInvocation() throws Exception {
558563
subscriberVerification.untested_spec213_failingOnSignalInvocation();
559564
}
560565

566+
@Override @Test
567+
public void required_spec213_onSubscribe_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
568+
subscriberVerification.required_spec213_onSubscribe_mustThrowNullPointerExceptionWhenParametersAreNull();
569+
}
570+
@Override @Test
571+
public void required_spec213_onNext_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
572+
subscriberVerification.required_spec213_onNext_mustThrowNullPointerExceptionWhenParametersAreNull();
573+
}
574+
@Override @Test
575+
public void required_spec213_onError_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
576+
subscriberVerification.required_spec213_onError_mustThrowNullPointerExceptionWhenParametersAreNull();
577+
}
578+
561579
@Override @Test
562580
public void untested_spec301_mustNotBeCalledOutsideSubscriberContext() throws Exception {
563581
subscriberVerification.untested_spec301_mustNotBeCalledOutsideSubscriberContext();

tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java

+18-3
Original file line numberDiff line numberDiff line change
@@ -432,13 +432,28 @@ public void untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnError
432432
notVerified(); // can we meaningfully test this?
433433
}
434434

435-
// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.9
436435
@Override @Test
437436
public void untested_spec109_subscribeShouldNotThrowNonFatalThrowable() throws Throwable {
438-
notVerified(); // cannot be meaningfully tested, or can it?
437+
notVerified(); // can we meaningfully test this?
438+
}
439+
440+
// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.9
441+
@Override @Test
442+
public void required_spec109_subscribeThrowNPEOnNullSubscriber() throws Throwable {
443+
activePublisherTest(0, false, new PublisherTestRun<T>() {
444+
@Override
445+
public void run(Publisher<T> pub) throws Throwable {
446+
try {
447+
pub.subscribe(null);
448+
env.flop(String.format("Publisher (%s) did not throw a NullPointerException when given a null Subscribe in subscribe", pub));
449+
} catch (NullPointerException npe) {
450+
}
451+
env.verifyNoAsyncErrors();
452+
}
453+
});
439454
}
440455

441-
// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.10
456+
// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.10
442457
@Override @Test
443458
public void untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable {
444459
notVerified(); // can we meaningfully test this?

tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java

+80
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.util.concurrent.Executors;
1919

2020
import static org.reactivestreams.tck.SubscriberWhiteboxVerification.BlackboxSubscriberProxy;
21+
import static org.testng.Assert.assertTrue;
2122

2223
/**
2324
* Provides tests for verifying {@link org.reactivestreams.Subscriber} and {@link org.reactivestreams.Subscription}
@@ -310,6 +311,85 @@ public void untested_spec213_blackbox_failingOnSignalInvocation() throws Excepti
310311
notVerified(); // cannot be meaningfully tested, or can it?
311312
}
312313

314+
// Verifies rule: https://github.com/reactive-streams/reactive-streams#2.13
315+
@Override @Test
316+
public void required_spec213_blackbox_onSubscribe_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
317+
blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
318+
@Override
319+
public void run(BlackboxTestStage stage) throws Throwable {
320+
321+
{
322+
final Subscriber<T> sub = createSubscriber();
323+
boolean gotNPE = false;
324+
try {
325+
sub.onSubscribe(null);
326+
} catch(final NullPointerException expected) {
327+
gotNPE = true;
328+
}
329+
assertTrue(gotNPE, "onSubscribe(null) did not throw NullPointerException");
330+
}
331+
332+
env.verifyNoAsyncErrors();
333+
}
334+
});
335+
}
336+
337+
// Verifies rule: https://github.com/reactive-streams/reactive-streams#2.13
338+
@Override @Test
339+
public void required_spec213_blackbox_onNext_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
340+
blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
341+
@Override
342+
public void run(BlackboxTestStage stage) throws Throwable {
343+
final Subscription subscription = new Subscription() {
344+
@Override public void request(final long elements) {}
345+
@Override public void cancel() {}
346+
};
347+
348+
{
349+
final Subscriber<T> sub = createSubscriber();
350+
boolean gotNPE = false;
351+
sub.onSubscribe(subscription);
352+
try {
353+
sub.onNext(null);
354+
} catch(final NullPointerException expected) {
355+
gotNPE = true;
356+
}
357+
assertTrue(gotNPE, "onNext(null) did not throw NullPointerException");
358+
}
359+
360+
env.verifyNoAsyncErrors();
361+
}
362+
});
363+
}
364+
365+
// Verifies rule: https://github.com/reactive-streams/reactive-streams#2.13
366+
@Override @Test
367+
public void required_spec213_blackbox_onError_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
368+
blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
369+
@Override
370+
public void run(BlackboxTestStage stage) throws Throwable {
371+
final Subscription subscription = new Subscription() {
372+
@Override public void request(final long elements) {}
373+
@Override public void cancel() {}
374+
};
375+
376+
{
377+
final Subscriber<T> sub = createSubscriber();
378+
boolean gotNPE = false;
379+
sub.onSubscribe(subscription);
380+
try {
381+
sub.onError(null);
382+
} catch(final NullPointerException expected) {
383+
gotNPE = true;
384+
}
385+
assertTrue(gotNPE, "onError(null) did not throw NullPointerException");
386+
}
387+
388+
env.verifyNoAsyncErrors();
389+
}
390+
});
391+
}
392+
313393
////////////////////// SUBSCRIPTION SPEC RULE VERIFICATION //////////////////
314394

315395
// Verifies rule: https://github.com/reactive-streams/reactive-streams#3.1

tck/src/main/java/org/reactivestreams/tck/SubscriberWhiteboxVerification.java

+81-3
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,9 @@
44
import org.reactivestreams.Subscriber;
55
import org.reactivestreams.Subscription;
66
import org.reactivestreams.tck.TestEnvironment.*;
7-
import org.reactivestreams.tck.support.Function;
87
import org.reactivestreams.tck.support.Optional;
9-
import org.reactivestreams.tck.support.TestException;
108
import org.reactivestreams.tck.support.SubscriberWhiteboxVerificationRules;
9+
import org.reactivestreams.tck.support.TestException;
1110
import org.testng.SkipException;
1211
import org.testng.annotations.AfterClass;
1312
import org.testng.annotations.BeforeClass;
@@ -17,7 +16,6 @@
1716
import java.util.concurrent.ExecutorService;
1817
import java.util.concurrent.Executors;
1918

20-
import static org.testng.Assert.assertEquals;
2119
import static org.testng.Assert.assertTrue;
2220

2321
/**
@@ -343,6 +341,86 @@ public void untested_spec213_failingOnSignalInvocation() throws Exception {
343341
notVerified(); // cannot be meaningfully tested, or can it?
344342
}
345343

344+
// Verifies rule: https://github.com/reactive-streams/reactive-streams#2.13
345+
@Override @Test
346+
public void required_spec213_onSubscribe_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
347+
subscriberTest(new TestStageTestRun() {
348+
@Override
349+
public void run(WhiteboxTestStage stage) throws Throwable {
350+
351+
{
352+
final Subscriber<T> sub = createSubscriber(stage.probe());
353+
boolean gotNPE = false;
354+
try {
355+
sub.onSubscribe(null);
356+
} catch(final NullPointerException expected) {
357+
gotNPE = true;
358+
}
359+
assertTrue(gotNPE, "onSubscribe(null) did not throw NullPointerException");
360+
}
361+
362+
env.verifyNoAsyncErrors();
363+
}
364+
});
365+
}// Verifies rule: https://github.com/reactive-streams/reactive-streams#2.13
366+
@Override @Test
367+
public void required_spec213_onNext_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
368+
subscriberTest(new TestStageTestRun() {
369+
@Override
370+
public void run(WhiteboxTestStage stage) throws Throwable {
371+
372+
final Subscription subscription = new Subscription() {
373+
@Override public void request(final long elements) {}
374+
@Override public void cancel() {}
375+
};
376+
377+
{
378+
final Subscriber<T> sub = createSubscriber(stage.probe());
379+
boolean gotNPE = false;
380+
sub.onSubscribe(subscription);
381+
try {
382+
sub.onNext(null);
383+
} catch(final NullPointerException expected) {
384+
gotNPE = true;
385+
}
386+
assertTrue(gotNPE, "onNext(null) did not throw NullPointerException");
387+
}
388+
389+
env.verifyNoAsyncErrors();
390+
}
391+
});
392+
}
393+
394+
// Verifies rule: https://github.com/reactive-streams/reactive-streams#2.13
395+
@Override @Test
396+
public void required_spec213_onError_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
397+
subscriberTest(new TestStageTestRun() {
398+
@Override
399+
public void run(WhiteboxTestStage stage) throws Throwable {
400+
401+
final Subscription subscription = new Subscription() {
402+
@Override public void request(final long elements) {}
403+
@Override public void cancel() {}
404+
};
405+
406+
{
407+
final Subscriber<T> sub = createSubscriber(stage.probe());
408+
boolean gotNPE = false;
409+
sub.onSubscribe(subscription);
410+
try {
411+
sub.onError(null);
412+
} catch(final NullPointerException expected) {
413+
gotNPE = true;
414+
}
415+
assertTrue(gotNPE, "onError(null) did not throw NullPointerException");
416+
}
417+
418+
env.verifyNoAsyncErrors();
419+
}
420+
});
421+
}
422+
423+
346424
////////////////////// SUBSCRIPTION SPEC RULE VERIFICATION //////////////////
347425

348426
// Verifies rule: https://github.com/reactive-streams/reactive-streams#3.1

tck/src/main/java/org/reactivestreams/tck/support/PublisherVerificationRules.java

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ public interface PublisherVerificationRules {
2121
void untested_spec107_mustNotEmitFurtherSignalsOnceOnErrorHasBeenSignalled() throws Throwable;
2222
void untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals() throws Throwable;
2323
void untested_spec109_subscribeShouldNotThrowNonFatalThrowable() throws Throwable;
24+
void required_spec109_subscribeThrowNPEOnNullSubscriber() throws Throwable;
2425
void untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable;
2526
void optional_spec111_maySupportMultiSubscribe() throws Throwable;
2627
void required_spec112_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe() throws Throwable;

tck/src/main/java/org/reactivestreams/tck/support/SubscriberBlackboxVerificationRules.java

+3
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ public interface SubscriberBlackboxVerificationRules {
2020
void untested_spec211_blackbox_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() throws Exception;
2121
void untested_spec212_blackbox_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality() throws Throwable;
2222
void untested_spec213_blackbox_failingOnSignalInvocation() throws Exception;
23+
void required_spec213_blackbox_onSubscribe_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable;
24+
void required_spec213_blackbox_onNext_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable;
25+
void required_spec213_blackbox_onError_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable;
2326
void untested_spec301_blackbox_mustNotBeCalledOutsideSubscriberContext() throws Exception;
2427
void required_spec308_blackbox_requestMustRegisterGivenNumberElementsToBeProduced() throws Throwable;
2528
void untested_spec310_blackbox_requestMaySynchronouslyCallOnNextOnSubscriber() throws Exception;

tck/src/main/java/org/reactivestreams/tck/support/SubscriberWhiteboxVerificationRules.java

+3
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ public interface SubscriberWhiteboxVerificationRules {
2222
void untested_spec211_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() throws Exception;
2323
void untested_spec212_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality_specViolation() throws Throwable;
2424
void untested_spec213_failingOnSignalInvocation() throws Exception;
25+
void required_spec213_onSubscribe_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable;
26+
void required_spec213_onNext_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable;
27+
void required_spec213_onError_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable;
2528
void untested_spec301_mustNotBeCalledOutsideSubscriberContext() throws Exception;
2629
void required_spec308_requestMustRegisterGivenNumberElementsToBeProduced() throws Throwable;
2730
void untested_spec310_requestMaySynchronouslyCallOnNextOnSubscriber() throws Exception;

0 commit comments

Comments
 (0)