Skip to content

Fixes #233 by implementing support for triggered demand in in the SubscriberBlackboxVerification #252

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 30, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@ protected SubscriberBlackboxVerification(TestEnvironment env) {
*/
public abstract Subscriber<T> createSubscriber();

/**
* Override this method if the Subscriber implementation you are verifying
* needs an external signal before it signals demand to its Publisher.
*
* By default this method does nothing.
*/
public void triggerRequest(final Subscriber<? super T> subscriber) {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"this page method body was intentionally left blank." 😉

}

// ENV SETUP

/**
Expand All @@ -74,6 +84,7 @@ public void required_spec201_blackbox_mustSignalDemandViaSubscriptionRequest() t
blackboxSubscriberTest(new BlackboxTestStageTestRun() {
@Override
public void run(BlackboxTestStage stage) throws InterruptedException {
triggerRequest(stage.subProxy().sub());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Kon!

final long n = stage.expectRequest();// assuming subscriber wants to consume elements...

// should cope with up to requested number of elements
Expand Down Expand Up @@ -248,6 +259,7 @@ public void run(BlackboxTestStage stage) throws Throwable {
final BlackboxSubscriberProxy<T> probe = stage.createBlackboxSubscriberProxy(env, sub);

pub.subscribe(probe);
triggerRequest(sub);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

probe.expectCompletion();
probe.expectNone();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package org.reactivestreams.tck;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.reactivestreams.tck.SubscriberBlackboxVerification;
import org.reactivestreams.tck.TestEnvironment;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import org.reactivestreams.tck.support.SyncTriggeredDemandSubscriber;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Test // Must be here for TestNG to find and run this, do not remove
public class SyncTriggeredDemandSubscriberTest extends SubscriberBlackboxVerification<Integer> {

private ExecutorService e;
@BeforeClass void before() { e = Executors.newFixedThreadPool(4); }
@AfterClass void after() { if (e != null) e.shutdown(); }

public SyncTriggeredDemandSubscriberTest() {
super(new TestEnvironment());
}

@Override public void triggerRequest(final Subscriber<? super Integer> subscriber) {
((SyncTriggeredDemandSubscriber<? super Integer>)subscriber).triggerDemand(1);
}

@Override public Subscriber<Integer> createSubscriber() {
return new SyncTriggeredDemandSubscriber<Integer>() {
private long acc;
@Override protected long foreach(final Integer element) {
acc += element;
return 1;
}

@Override public void onComplete() {
}
};
}

@Override public Integer createElement(int element) {
return element;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package org.reactivestreams.tck;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.reactivestreams.tck.SubscriberBlackboxVerification;
import org.reactivestreams.tck.SubscriberWhiteboxVerification;
import org.reactivestreams.tck.TestEnvironment;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import org.reactivestreams.tck.support.SyncTriggeredDemandSubscriber;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Test // Must be here for TestNG to find and run this, do not remove
public class SyncTriggeredDemandSubscriberWhiteboxTest extends SubscriberWhiteboxVerification<Integer> {

private ExecutorService e;
@BeforeClass void before() { e = Executors.newFixedThreadPool(4); }
@AfterClass void after() { if (e != null) e.shutdown(); }

public SyncTriggeredDemandSubscriberWhiteboxTest() {
super(new TestEnvironment());
}

@Override
public Subscriber<Integer> createSubscriber(final WhiteboxSubscriberProbe<Integer> probe) {
return new SyncTriggeredDemandSubscriber<Integer>() {
@Override
public void onSubscribe(final Subscription s) {
super.onSubscribe(s);

probe.registerOnSubscribe(new SubscriberPuppet() {
@Override
public void triggerRequest(long elements) {
s.request(elements);
}

@Override
public void signalCancel() {
s.cancel();
}
});
}

@Override
public void onNext(Integer element) {
super.onNext(element);
probe.registerOnNext(element);
}

@Override
public void onError(Throwable cause) {
super.onError(cause);
probe.registerOnError(cause);
}

@Override
public void onComplete() {
super.onComplete();
probe.registerOnComplete();
}

@Override
protected long foreach(Integer element) {
return 1;
}
};
}

@Override public Integer createElement(int element) {
return element;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package org.reactivestreams.tck.support;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/**
* SyncTriggeredDemandSubscriber is an implementation of Reactive Streams `Subscriber`,
* it runs synchronously (on the Publisher's thread) and requests demand triggered from
* "the outside" using its `triggerDemand` method and from "the inside" using the return
* value of its user-defined `foreach` method which is invoked to process each element.
*
* NOTE: The code below uses a lot of try-catches to show the reader where exceptions can be expected, and where they are forbidden.
*/
public abstract class SyncTriggeredDemandSubscriber<T> implements Subscriber<T> {
private Subscription subscription; // Obeying rule 3.1, we make this private!
private boolean done = false;

@Override public void onSubscribe(final Subscription s) {
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Subscription` is `null`
if (s == null) throw null;

if (subscription != null) { // If someone has made a mistake and added this Subscriber multiple times, let's handle it gracefully
try {
s.cancel(); // Cancel the additional subscription
} catch(final Throwable t) {
//Subscription.cancel is not allowed to throw an exception, according to rule 3.15
(new IllegalStateException(s + " violated the Reactive Streams rule 3.15 by throwing an exception from cancel.", t)).printStackTrace(System.err);
}
} else {
// We have to assign it locally before we use it, if we want to be a synchronous `Subscriber`
// Because according to rule 3.10, the Subscription is allowed to call `onNext` synchronously from within `request`
subscription = s;
}
}

/**
* Requests the provided number of elements from the `Subscription` of this `Subscriber`.
* NOTE: This makes no attempt at thread safety so only invoke it once from the outside to initiate the demand.
* @return `true` if successful and `false` if not (either due to no `Subscription` or due to exceptions thrown)
*/
public boolean triggerDemand(final long n) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comments before merge here please

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

final Subscription s = subscription;
if (s == null) return false;
else {
try {
s.request(n);
} catch(final Throwable t) {
// Subscription.request is not allowed to throw according to rule 3.16
(new IllegalStateException(s + " violated the Reactive Streams rule 3.16 by throwing an exception from request.", t)).printStackTrace(System.err);
return false;
}
return true;
}
}

@Override public void onNext(final T element) {
if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec
(new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onNext prior to onSubscribe.")).printStackTrace(System.err);
} else {
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `element` is `null`
if (element == null) throw null;

if (!done) { // If we aren't already done
try {
final long need = foreach(element);
if (need > 0) triggerDemand(need);
else if (need == 0) {}
else {
done();
}
} catch (final Throwable t) {
done();
try {
onError(t);
} catch (final Throwable t2) {
//Subscriber.onError is not allowed to throw an exception, according to rule 2.13
(new IllegalStateException(this + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t2)).printStackTrace(System.err);
}
}
}
}
}

// Showcases a convenience method to idempotently marking the Subscriber as "done", so we don't want to process more elements
// herefor we also need to cancel our `Subscription`.
private void done() {
//On this line we could add a guard against `!done`, but since rule 3.7 says that `Subscription.cancel()` is idempotent, we don't need to.
done = true; // If we `foreach` throws an exception, let's consider ourselves done (not accepting more elements)
try {
subscription.cancel(); // Cancel the subscription
} catch(final Throwable t) {
//Subscription.cancel is not allowed to throw an exception, according to rule 3.15
(new IllegalStateException(subscription + " violated the Reactive Streams rule 3.15 by throwing an exception from cancel.", t)).printStackTrace(System.err);
}
}

// This method is left as an exercise to the reader/extension point
// Don't forget to call `triggerDemand` at the end if you are interested in more data,
// a return value of < 0 indicates that the subscription should be cancelled,
// a value of 0 indicates that there is no current need,
// a value of > 0 indicates the current need.
protected abstract long foreach(final T element);

@Override public void onError(final Throwable t) {
if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec
(new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onError prior to onSubscribe.")).printStackTrace(System.err);
} else {
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Throwable` is `null`
if (t == null) throw null;
// Here we are not allowed to call any methods on the `Subscription` or the `Publisher`, as per rule 2.3
// And anyway, the `Subscription` is considered to be cancelled if this method gets called, as per rule 2.4
}
}

@Override public void onComplete() {
if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec
(new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onComplete prior to onSubscribe.")).printStackTrace(System.err);
} else {
// Here we are not allowed to call any methods on the `Subscription` or the `Publisher`, as per rule 2.3
// And anyway, the `Subscription` is considered to be cancelled if this method gets called, as per rule 2.4
}
}
}