Skip to content

+tck #236 example subscriber whitebox tested, and whitebox fixed #244

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package org.reactivestreams.example.unicast;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.reactivestreams.tck.SubscriberBlackboxVerification;
import org.reactivestreams.tck.SubscriberWhiteboxVerification;
import org.reactivestreams.tck.TestEnvironment;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Test // Must be here for TestNG to find and run this, do not remove
public class SyncSubscriberWhiteboxTest extends SubscriberWhiteboxVerification<Integer> {

private ExecutorService e;
@BeforeClass void before() { e = Executors.newFixedThreadPool(4); }
@AfterClass void after() { if (e != null) e.shutdown(); }

public SyncSubscriberWhiteboxTest() {
super(new TestEnvironment());
}

@Override
public Subscriber<Integer> createSubscriber(final WhiteboxSubscriberProbe<Integer> probe) {
return new SyncSubscriber<Integer>() {
@Override
public void onSubscribe(final Subscription s) {
super.onSubscribe(s);

probe.registerOnSubscribe(new SubscriberPuppet() {
@Override
public void triggerRequest(long elements) {
s.request(elements);
}

@Override
public void signalCancel() {
s.cancel();
}
});
}

@Override
public void onNext(Integer element) {
super.onNext(element);
probe.registerOnNext(element);
}

@Override
public void onError(Throwable cause) {
super.onError(cause);
probe.registerOnError(cause);
}

@Override
public void onComplete() {
super.onComplete();
probe.registerOnComplete();
}

@Override
protected boolean foreach(Integer element) {
return true;
}
};
}

@Override public Integer createElement(int element) {
return element;
}

}
34 changes: 19 additions & 15 deletions tck/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,8 @@ Based on experiences so far implementing the `SubscriberPuppet` is non-trivial a
We keep the whitebox verification, as it is tremendously useful in the `ProcessorVerification`, where the Puppet is implemented within the TCK and injected to the tests.
We do not expect all implementations to make use of the plain `SubscriberWhiteboxVerification`, using the `SubscriberBlackboxVerification` instead.

A simple synchronous `Subscriber` implementation would look similar to following example:
For the simplest possible (and most common) `Subscriber` implementation using the whitebox verification boils down to
exteding (or delegating to) your implementation with additionally signalling and registering the test probe, as shown in the below example:

```java
package com.example.streams;
Expand All @@ -305,21 +306,24 @@ public class MySubscriberWhiteboxVerificationTest extends SubscriberWhiteboxVeri
super(new TestEnvironment());
}

// The implementation under test is "SyncSubscriber":
// class SyncSubscriber<T> extends Subscriber<T> { /* ... */ }

@Override
public Subscriber<Integer> createSubscriber(final WhiteboxSubscriberProbe<Integer> probe) {

// return YOUR subscriber under-test, with additional WhiteboxSubscriberProbe instrumentation
return new Subscriber<Integer>() {

// in order to test the SyncSubscriber we must instrument it by extending it,
// and calling the WhiteboxSubscriberProbe in all of the Subscribers methods:
return new SyncSubscriber<Integer>() {
@Override
public void onSubscribe(final Subscription s) {
// in addition to normal Subscriber work that you're testing,
// register a SubscriberPuppet, to give the TCK control over demand generation and cancelling
probe.registerOnSubscribe(new SubscriberPuppet() {
super.onSubscribe(s);

// register a successful subscription, and create a Puppet,
// for the WhiteboxVerification to be able to drive its tests:
probe.registerOnSubscribe(new SubscriberPuppet() {
@Override
public void triggerRequest(long n) {
s.request(n);
public void triggerRequest(long elements) {
s.request(elements);
}

@Override
Expand All @@ -330,20 +334,20 @@ public class MySubscriberWhiteboxVerificationTest extends SubscriberWhiteboxVeri
}

@Override
public void onNext(Integer value) {
// in addition to normal Subscriber work that you're testing, register onNext with the probe
probe.registerOnNext(value);
public void onNext(Integer element) {
super.onNext(element);
probe.registerOnNext(element);
}

@Override
public void onError(Throwable cause) {
// in addition to normal Subscriber work that you're testing, register onError with the probe
super.onError(cause);
probe.registerOnError(cause);
}

@Override
public void onComplete() {
// in addition to normal Subscriber work that you're testing, register onComplete with the probe
super.onComplete();
probe.registerOnComplete();
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ public void untested_spec204_mustConsiderTheSubscriptionAsCancelledInAfterReciev
}

@Override @Test
public void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Exception {
public void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Throwable {
subscriberVerification.required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,31 +203,35 @@ public void untested_spec204_mustConsiderTheSubscriptionAsCancelledInAfterReciev

// Verifies rule: https://github.com/reactive-streams/reactive-streams#2.5
@Override @Test
public void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Exception {
new WhiteboxTestStage(env) {{
// try to subscribe another time, if the subscriber calls `probe.registerOnSubscribe` the test will fail
final Latch secondSubscriptionCancelled = new Latch(env);
sub().onSubscribe(
new Subscription() {
@Override
public void request(long elements) {
env.flop(String.format("Subscriber %s illegally called `subscription.request(%s)`", sub(), elements));
}
public void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Throwable {
subscriberTest(new TestStageTestRun() {
@Override
public void run(WhiteboxTestStage stage) throws Throwable {
// try to subscribe another time, if the subscriber calls `probe.registerOnSubscribe` the test will fail
final Latch secondSubscriptionCancelled = new Latch(env);
final Subscriber<? super T> sub = stage.sub();
final Subscription subscription = new Subscription() {
@Override
public void request(long elements) {
// ignore...
}

@Override
public void cancel() {
secondSubscriptionCancelled.close();
}
@Override
public void cancel() {
secondSubscriptionCancelled.close();
}

@Override
public String toString() {
return "SecondSubscription(should get cancelled)";
}
});
@Override
public String toString() {
return "SecondSubscription(should get cancelled)";
}
};
sub.onSubscribe(subscription);

secondSubscriptionCancelled.expectClose("Expected 2nd Subscription given to subscriber to be cancelled, but `Subscription.cancel()` was not called.");
env.verifyNoAsyncErrors();
}};
secondSubscriptionCancelled.expectClose("Expected 2nd Subscription given to subscriber to be cancelled, but `Subscription.cancel()` was not called");
env.verifyNoAsyncErrors();
}
});
}

// Verifies rule: https://github.com/reactive-streams/reactive-streams#2.6
Expand Down Expand Up @@ -348,50 +352,38 @@ public void required_spec213_onSubscribe_mustThrowNullPointerExceptionWhenParame
@Override
public void run(WhiteboxTestStage stage) throws Throwable {

{
final Subscriber<? super T> sub = stage.sub();
boolean gotNPE = false;
try {
sub.onSubscribe(null);
} catch (final NullPointerException expected) {
gotNPE = true;
}
assertTrue(gotNPE, "onSubscribe(null) did not throw NullPointerException");
final Subscriber<? super T> sub = stage.sub();
boolean gotNPE = false;
try {
sub.onSubscribe(null);
} catch (final NullPointerException expected) {
gotNPE = true;
}
assertTrue(gotNPE, "onSubscribe(null) did not throw NullPointerException");

env.verifyNoAsyncErrors();
env.verifyNoAsyncErrors(env.defaultTimeoutMillis());
}
});
}// Verifies rule: https://github.com/reactive-streams/reactive-streams#2.13
}

// Verifies rule: https://github.com/reactive-streams/reactive-streams#2.13
@Override @Test
public void required_spec213_onNext_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
subscriberTest(new TestStageTestRun() {
@Override
public void run(WhiteboxTestStage stage) throws Throwable {

final Subscription subscription = new Subscription() {
@Override
public void request(final long elements) {
}

@Override
public void cancel() {
}
};

{
final Subscriber<? super T> sub = stage.sub();
boolean gotNPE = false;
sub.onSubscribe(subscription);
try {
sub.onNext(null);
} catch (final NullPointerException expected) {
gotNPE = true;
}
final Subscriber<? super T> sub = stage.sub();
boolean gotNPE = false;
try {
sub.onNext(null);
} catch (final NullPointerException expected) {
gotNPE = true;
} finally {
assertTrue(gotNPE, "onNext(null) did not throw NullPointerException");
}

env.verifyNoAsyncErrors();
env.verifyNoAsyncErrors(env.defaultTimeoutMillis());
}
});
}
Expand All @@ -403,29 +395,17 @@ public void required_spec213_onError_mustThrowNullPointerExceptionWhenParameters
@Override
public void run(WhiteboxTestStage stage) throws Throwable {

final Subscription subscription = new Subscription() {
@Override
public void request(final long elements) {
}

@Override
public void cancel() {
}
};

{
final Subscriber<? super T> sub = stage.sub();
boolean gotNPE = false;
sub.onSubscribe(subscription);
try {
sub.onError(null);
} catch (final NullPointerException expected) {
gotNPE = true;
} finally {
assertTrue(gotNPE, "onError(null) did not throw NullPointerException");
}
assertTrue(gotNPE, "onError(null) did not throw NullPointerException");
}

env.verifyNoAsyncErrors();
env.verifyNoAsyncErrors(env.defaultTimeoutMillis());
}
});
}
Expand Down Expand Up @@ -495,11 +475,24 @@ abstract class TestStageTestRun {
public abstract void run(WhiteboxTestStage stage) throws Throwable;
}

/**
* Prepares subscriber and publisher pair (by subscribing the first to the latter),
* and then hands over the tests {@link WhiteboxTestStage} over to the test.
*
* The test stage is, like in a puppet show, used to orchestrate what each participant should do.
* Since this is a whitebox test, this allows the stage to completely control when and how to signal / expect signals.
*/
public void subscriberTest(TestStageTestRun body) throws Throwable {
WhiteboxTestStage stage = new WhiteboxTestStage(env, true);
body.run(stage);
}

/**
* Provides a {@link WhiteboxTestStage} without performing any additional setup,
* like the {@link org.reactivestreams.tck.SubscriberWhiteboxVerification#subscriberTest(TestStageTestRun)} would.
*
* Use this method to write tests in which you need full control over when and how the initial {@code subscribe} is signalled.
*/
public void subscriberTestWithoutSetup(TestStageTestRun body) throws Throwable {
WhiteboxTestStage stage = new WhiteboxTestStage(env, false);
body.run(stage);
Expand Down Expand Up @@ -559,7 +552,10 @@ public WhiteboxSubscriberProbe<T> createWhiteboxSubscriberProbe(TestEnvironment
}

public T signalNext() throws InterruptedException {
T element = nextT();
return signalNext(nextT());
}

private T signalNext(T element) throws InterruptedException {
sendNext(element);
return element;
}
Expand All @@ -577,7 +573,7 @@ public void verifyNoAsyncErrors() {
/**
* This class is intented to be used as {@code Subscriber} decorator and should be used in {@code pub.subscriber(...)} calls,
* in order to allow intercepting calls on the underlying {@code Subscriber}.
* This delegation allows the proxy to implement {@link org.reactivestreams.tck.SubscriberWhiteboxVerification.BlackboxProbe} assertions.
* This delegation allows the proxy to implement {@link BlackboxProbe} assertions.
*/
public static class BlackboxSubscriberProxy<T> extends BlackboxProbe<T> implements Subscriber<T> {

Expand Down Expand Up @@ -741,8 +737,6 @@ private SubscriberPuppet puppet() {
public void registerOnSubscribe(SubscriberPuppet p) {
if (!puppet.isCompleted()) {
puppet.complete(p);
} else {
env.flop(String.format("Subscriber %s illegally accepted a second Subscription", sub()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public interface SubscriberWhiteboxVerificationRules {
void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete() throws Throwable;
void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnError() throws Throwable;
void untested_spec204_mustConsiderTheSubscriptionAsCancelledInAfterRecievingOnCompleteOrOnError() throws Exception;
void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Exception;
void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Throwable;
void untested_spec206_mustCallSubscriptionCancelIfItIsNoLongerValid() throws Exception;
void untested_spec207_mustEnsureAllCallsOnItsSubscriptionTakePlaceFromTheSameThreadOrTakeCareOfSynchronization() throws Exception;
void required_spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel() throws Throwable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public Subscriber<Integer> apply(WhiteboxSubscriberProbe<Integer> probe) throws
}
}).required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal();
}
}, "illegally accepted a second Subscription");
}, "Expected 2nd Subscription given to subscriber to be cancelled");
}

@Test
Expand Down