Skip to content

Commit e85c7b9

Browse files
committed
Modifies rules 1.09 and 2.13 to mandate java.lang.NullPointerException be thrown.
Updates the TCK, Spec and example implementations.
1 parent e521dbc commit e85c7b9

11 files changed

+204
-51
lines changed

README.md

+49-48
Large diffs are not rendered by default.

examples/src/main/java/org/reactivestreams/example/unicast/AsyncIterablePublisher.java

+1
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ final class SubscriptionImpl implements Subscription, Runnable {
6666
private Iterator<T> iterator; // This is our cursor into the data stream, which we will send to the `Subscriber`
6767

6868
SubscriptionImpl(final Subscriber<? super T> subscriber) {
69+
// As per rule 1.09, we need to throw a `java.lang.NullPointerException` if the `Subscriber` is `null`
6970
if (subscriber == null) throw null;
7071
this.subscriber = subscriber;
7172
}

examples/src/main/java/org/reactivestreams/example/unicast/AsyncSubscriber.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -143,15 +143,23 @@ private void handleOnError(final Throwable error) {
143143
// We implement the OnX methods on `Subscriber` to send Signals that we will process asycnhronously, but only one at a time
144144

145145
@Override public final void onSubscribe(final Subscription s) {
146+
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Subscription` is `null`
147+
if (s == null) throw null;
146148
signal(new OnSubscribe(s));
147149
}
148150

149151
@Override public final void onNext(final T element) {
152+
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `element` is `null`
153+
if (element == null) throw null;
154+
150155
signal(new OnNext<T>(element));
151156
}
152157

153158
@Override public final void onError(final Throwable t) {
154-
signal(new OnError(t));
159+
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Throwable` is `null`
160+
if (t == null) throw null;
161+
162+
signal(new OnError(t));
155163
}
156164

157165
@Override public final void onComplete() {

examples/src/main/java/org/reactivestreams/example/unicast/SyncSubscriber.java

+8
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ public abstract class SyncSubscriber<T> implements Subscriber<T> {
1515
private boolean done = false;
1616

1717
@Override public void onSubscribe(final Subscription s) {
18+
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Subscription` is `null`
19+
if (s == null) throw null;
20+
1821
if (subscription != null) { // If someone has made a mistake and added this Subscriber multiple times, let's handle it gracefully
1922
try {
2023
s.cancel(); // Cancel the additional subscription
@@ -38,6 +41,9 @@ public abstract class SyncSubscriber<T> implements Subscriber<T> {
3841
}
3942

4043
@Override public void onNext(final T element) {
44+
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `element` is `null`
45+
if (element == null) throw null;
46+
4147
if (!done) { // If we aren't already done
4248
try {
4349
if (foreach(element)) {
@@ -80,6 +86,8 @@ private void done() {
8086
protected abstract boolean foreach(final T element);
8187

8288
@Override public void onError(final Throwable t) {
89+
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Throwable` is `null`
90+
if (t == null) throw null;
8391
// Here we are not allowed to call any methods on the `Subscription` or the `Publisher`, as per rule 2.3
8492
// And anyway, the `Subscription` is considered to be cancelled if this method gets called, as per rule 2.4
8593
}

tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java

+10
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,11 @@ public void untested_spec109_subscribeShouldNotThrowNonFatalThrowable() throws T
271271
publisherVerification.untested_spec109_subscribeShouldNotThrowNonFatalThrowable();
272272
}
273273

274+
@Override @Test
275+
public void required_spec109_subscribeThrowNPEOnNullSubscriber() throws Throwable {
276+
publisherVerification.required_spec109_subscribeThrowNPEOnNullSubscriber();
277+
}
278+
274279
@Override @Test
275280
public void untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable {
276281
publisherVerification.untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice();
@@ -558,6 +563,11 @@ public void untested_spec213_failingOnSignalInvocation() throws Exception {
558563
subscriberVerification.untested_spec213_failingOnSignalInvocation();
559564
}
560565

566+
@Override @Test
567+
public void required_spec213_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
568+
subscriberVerification.required_spec213_mustThrowNullPointerExceptionWhenParametersAreNull();
569+
}
570+
561571
@Override @Test
562572
public void untested_spec301_mustNotBeCalledOutsideSubscriberContext() throws Exception {
563573
subscriberVerification.untested_spec301_mustNotBeCalledOutsideSubscriberContext();

tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java

+16-2
Original file line numberDiff line numberDiff line change
@@ -432,10 +432,24 @@ public void untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnError
432432
notVerified(); // can we meaningfully test this?
433433
}
434434

435-
// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.9
436435
@Override @Test
437436
public void untested_spec109_subscribeShouldNotThrowNonFatalThrowable() throws Throwable {
438-
notVerified(); // cannot be meaningfully tested, or can it?
437+
notVerified(); // can we meaningfully test this?
438+
}
439+
440+
// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.9
441+
@Override @Test
442+
public void required_spec109_subscribeThrowNPEOnNullSubscriber() throws Throwable {
443+
optionalActivePublisherTest(1, true, new PublisherTestRun<T>() {
444+
@Override
445+
public void run(Publisher<T> pub) throws Throwable {
446+
try {
447+
pub.subscribe(null);
448+
env.flop(String.format("Publisher (%s) did not throw a NullPointerException when given a null Subscribe in subscribe", pub));
449+
} catch (NullPointerException npe) { }
450+
env.verifyNoAsyncErrors();
451+
}
452+
});
439453
}
440454

441455
// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.10

tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java

+55
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,14 @@
1414
import org.testng.annotations.BeforeMethod;
1515
import org.testng.annotations.Test;
1616

17+
import java.lang.NullPointerException;
18+
import java.lang.Override;
19+
import java.lang.RuntimeException;
1720
import java.util.concurrent.ExecutorService;
1821
import java.util.concurrent.Executors;
1922

2023
import static org.reactivestreams.tck.SubscriberWhiteboxVerification.BlackboxSubscriberProxy;
24+
import static org.testng.Assert.assertTrue;
2125

2226
/**
2327
* Provides tests for verifying {@link org.reactivestreams.Subscriber} and {@link org.reactivestreams.Subscription}
@@ -310,6 +314,57 @@ public void untested_spec213_blackbox_failingOnSignalInvocation() throws Excepti
310314
notVerified(); // cannot be meaningfully tested, or can it?
311315
}
312316

317+
// Verifies rule: https://github.com/reactive-streams/reactive-streams#2.13
318+
@Override @Test
319+
public void required_spec213_blackbox_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
320+
blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
321+
@Override
322+
public void run(BlackboxTestStage stage) throws Throwable {
323+
final Subscription subscription = new Subscription() {
324+
@Override public void request(final long elements) {}
325+
@Override public void cancel() {}
326+
};
327+
328+
{
329+
final Subscriber<T> sub = createSubscriber();
330+
boolean gotNPE = false;
331+
try {
332+
sub.onSubscribe(null);
333+
} catch(final NullPointerException expected) {
334+
gotNPE = true;
335+
}
336+
assertTrue(gotNPE, "onSubscribe(null) did not throw NullPointerException");
337+
}
338+
339+
{
340+
final Subscriber<T> sub = createSubscriber();
341+
boolean gotNPE = false;
342+
sub.onSubscribe(subscription);
343+
try {
344+
sub.onNext(null);
345+
} catch(final NullPointerException expected) {
346+
gotNPE = true;
347+
}
348+
assertTrue(gotNPE, "onNext(null) did not throw NullPointerException");
349+
}
350+
351+
{
352+
final Subscriber<T> sub = createSubscriber();
353+
boolean gotNPE = false;
354+
sub.onSubscribe(subscription);
355+
try {
356+
sub.onError(null);
357+
} catch(final NullPointerException expected) {
358+
gotNPE = true;
359+
}
360+
assertTrue(gotNPE, "onError(null) did not throw NullPointerException");
361+
}
362+
363+
env.verifyNoAsyncErrors();
364+
}
365+
});
366+
}
367+
313368
////////////////////// SUBSCRIPTION SPEC RULE VERIFICATION //////////////////
314369

315370
// Verifies rule: https://github.com/reactive-streams/reactive-streams#3.1

tck/src/main/java/org/reactivestreams/tck/SubscriberWhiteboxVerification.java

+53
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,59 @@ public void untested_spec213_failingOnSignalInvocation() throws Exception {
343343
notVerified(); // cannot be meaningfully tested, or can it?
344344
}
345345

346+
// Verifies rule: https://github.com/reactive-streams/reactive-streams#2.13
347+
@Override @Test
348+
public void required_spec213_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
349+
subscriberTest(new TestStageTestRun() {
350+
@Override
351+
public void run(WhiteboxTestStage stage) throws Throwable {
352+
353+
final Subscription subscription = new Subscription() {
354+
@Override public void request(final long elements) {}
355+
@Override public void cancel() {}
356+
};
357+
358+
{
359+
final Subscriber<T> sub = createSubscriber(stage.probe());
360+
boolean gotNPE = false;
361+
try {
362+
sub.onSubscribe(null);
363+
} catch(final NullPointerException expected) {
364+
gotNPE = true;
365+
}
366+
assertTrue(gotNPE, "onSubscribe(null) did not throw NullPointerException");
367+
}
368+
369+
{
370+
final Subscriber<T> sub = createSubscriber(stage.probe());
371+
boolean gotNPE = false;
372+
sub.onSubscribe(subscription);
373+
try {
374+
sub.onNext(null);
375+
} catch(final NullPointerException expected) {
376+
gotNPE = true;
377+
}
378+
assertTrue(gotNPE, "onNext(null) did not throw NullPointerException");
379+
}
380+
381+
{
382+
final Subscriber<T> sub = createSubscriber(stage.probe());
383+
boolean gotNPE = false;
384+
sub.onSubscribe(subscription);
385+
try {
386+
sub.onError(null);
387+
} catch(final NullPointerException expected) {
388+
gotNPE = true;
389+
}
390+
assertTrue(gotNPE, "onError(null) did not throw NullPointerException");
391+
}
392+
393+
env.verifyNoAsyncErrors();
394+
}
395+
});
396+
}
397+
398+
346399
////////////////////// SUBSCRIPTION SPEC RULE VERIFICATION //////////////////
347400

348401
// Verifies rule: https://github.com/reactive-streams/reactive-streams#3.1

tck/src/main/java/org/reactivestreams/tck/support/PublisherVerificationRules.java

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ public interface PublisherVerificationRules {
2121
void untested_spec107_mustNotEmitFurtherSignalsOnceOnErrorHasBeenSignalled() throws Throwable;
2222
void untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals() throws Throwable;
2323
void untested_spec109_subscribeShouldNotThrowNonFatalThrowable() throws Throwable;
24+
void required_spec109_subscribeThrowNPEOnNullSubscriber() throws Throwable;
2425
void untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable;
2526
void optional_spec111_maySupportMultiSubscribe() throws Throwable;
2627
void required_spec112_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe() throws Throwable;

tck/src/main/java/org/reactivestreams/tck/support/SubscriberBlackboxVerificationRules.java

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ public interface SubscriberBlackboxVerificationRules {
2020
void untested_spec211_blackbox_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() throws Exception;
2121
void untested_spec212_blackbox_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality() throws Throwable;
2222
void untested_spec213_blackbox_failingOnSignalInvocation() throws Exception;
23+
void required_spec213_blackbox_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable;
2324
void untested_spec301_blackbox_mustNotBeCalledOutsideSubscriberContext() throws Exception;
2425
void required_spec308_blackbox_requestMustRegisterGivenNumberElementsToBeProduced() throws Throwable;
2526
void untested_spec310_blackbox_requestMaySynchronouslyCallOnNextOnSubscriber() throws Exception;

tck/src/main/java/org/reactivestreams/tck/support/SubscriberWhiteboxVerificationRules.java

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ public interface SubscriberWhiteboxVerificationRules {
2222
void untested_spec211_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() throws Exception;
2323
void untested_spec212_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality_specViolation() throws Throwable;
2424
void untested_spec213_failingOnSignalInvocation() throws Exception;
25+
void required_spec213_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable;
2526
void untested_spec301_mustNotBeCalledOutsideSubscriberContext() throws Exception;
2627
void required_spec308_requestMustRegisterGivenNumberElementsToBeProduced() throws Throwable;
2728
void untested_spec310_requestMaySynchronouslyCallOnNextOnSubscriber() throws Exception;

0 commit comments

Comments
 (0)