diff --git a/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java b/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java index 617e0673..b8cc0d1c 100644 --- a/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java @@ -212,17 +212,20 @@ public void required_spec101_subscriptionRequestMustResultInTheCorrectNumberOfPr public void run(Publisher pub) throws InterruptedException { ManualSubscriber sub = env.newManualSubscriber(pub); - - sub.expectNone(String.format("Publisher %s produced value before the first `request`: ", pub)); - sub.request(1); - sub.nextElement(String.format("Publisher %s produced no element after first `request`", pub)); - sub.expectNone(String.format("Publisher %s produced unrequested: ", pub)); - - sub.request(1); - sub.request(2); - sub.nextElements(3, env.defaultTimeoutMillis(), String.format("Publisher %s produced less than 3 elements after two respective `request` calls", pub)); - - sub.expectNone(String.format("Publisher %sproduced unrequested ", pub)); + try { + sub.expectNone(String.format("Publisher %s produced value before the first `request`: ", pub)); + sub.request(1); + sub.nextElement(String.format("Publisher %s produced no element after first `request`", pub)); + sub.expectNone(String.format("Publisher %s produced unrequested: ", pub)); + + sub.request(1); + sub.request(2); + sub.nextElements(3, env.defaultTimeoutMillis(), String.format("Publisher %s produced less than 3 elements after two respective `request` calls", pub)); + + sub.expectNone(String.format("Publisher %sproduced unrequested ", pub)); + } finally { + sub.cancel(); + } } }); } @@ -486,30 +489,39 @@ public void required_spec109_mustIssueOnSubscribeForNonNullSubscriber() throws T @Override public void run(Publisher pub) throws Throwable { final Latch onSubscribeLatch = new Latch(env); - pub.subscribe(new Subscriber() { - @Override - public void onError(Throwable cause) { - onSubscribeLatch.assertClosed("onSubscribe should be called prior to onError always"); - } - - @Override - public void onSubscribe(Subscription subs) { - onSubscribeLatch.assertOpen("Only one onSubscribe call expected"); - onSubscribeLatch.close(); - } - - @Override - public void onNext(T elem) { - onSubscribeLatch.assertClosed("onSubscribe should be called prior to onNext always"); - } - - @Override - public void onComplete() { - onSubscribeLatch.assertClosed("onSubscribe should be called prior to onComplete always"); + final AtomicReference cancel = new AtomicReference(); + try { + pub.subscribe(new Subscriber() { + @Override + public void onError(Throwable cause) { + onSubscribeLatch.assertClosed("onSubscribe should be called prior to onError always"); + } + + @Override + public void onSubscribe(Subscription subs) { + cancel.set(subs); + onSubscribeLatch.assertOpen("Only one onSubscribe call expected"); + onSubscribeLatch.close(); + } + + @Override + public void onNext(T elem) { + onSubscribeLatch.assertClosed("onSubscribe should be called prior to onNext always"); + } + + @Override + public void onComplete() { + onSubscribeLatch.assertClosed("onSubscribe should be called prior to onComplete always"); + } + }); + onSubscribeLatch.expectClose("Should have received onSubscribe"); + env.verifyNoAsyncErrorsNoDelay(); + } finally { + Subscription s = cancel.getAndSet(null); + if (s != null) { + s.cancel(); } - }); - onSubscribeLatch.expectClose("Should have received onSubscribe"); - env.verifyNoAsyncErrorsNoDelay(); + } } }); } @@ -560,7 +572,15 @@ public void run(Publisher pub) throws Throwable { ManualSubscriber sub1 = env.newManualSubscriber(pub); ManualSubscriber sub2 = env.newManualSubscriber(pub); - env.verifyNoAsyncErrors(); + try { + env.verifyNoAsyncErrors(); + } finally { + try { + sub1.cancel(); + } finally { + sub2.cancel(); + } + } } }); } diff --git a/tck/src/test/java/org/reactivestreams/tck/RangePublisherTest.java b/tck/src/test/java/org/reactivestreams/tck/RangePublisherTest.java new file mode 100644 index 00000000..a76dee97 --- /dev/null +++ b/tck/src/test/java/org/reactivestreams/tck/RangePublisherTest.java @@ -0,0 +1,176 @@ +/************************************************************************ + * Licensed under Public Domain (CC0) * + * * + * To the extent possible under law, the person who associated CC0 with * + * this code has waived all copyright and related or neighboring * + * rights to this code. * + * * + * You should have received a copy of the CC0 legalcode along with this * + * work. If not, see .* + ************************************************************************/ + +package org.reactivestreams.tck; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.*; + +import org.reactivestreams.*; +import org.testng.annotations.*; + +@Test +public class RangePublisherTest extends PublisherVerification { + + static final Map stacks = new ConcurrentHashMap(); + + static final Map states = new ConcurrentHashMap(); + + static final AtomicInteger id = new AtomicInteger(); + + @AfterClass + public static void afterClass() { + boolean fail = false; + StringBuilder b = new StringBuilder(); + for (Map.Entry t : states.entrySet()) { + if (!t.getValue()) { + b.append("\r\n-------------------------------"); + for (Object o : stacks.get(t.getKey())) { + b.append("\r\nat ").append(o); + } + fail = true; + } + } + if (fail) { + throw new AssertionError("Cancellations were missing:" + b); + } + } + + public RangePublisherTest() { + super(new TestEnvironment()); + } + + @Override + public Publisher createPublisher(long elements) { + return new RangePublisher(1, elements); + } + + @Override + public Publisher createFailedPublisher() { + return null; + } + + static final class RangePublisher + implements Publisher { + + final StackTraceElement[] stacktrace; + + final long start; + + final long count; + + RangePublisher(long start, long count) { + this.stacktrace = Thread.currentThread().getStackTrace(); + this.start = start; + this.count = count; + } + + @Override + public void subscribe(Subscriber s) { + if (s == null) { + throw new NullPointerException(); + } + + int ids = id.incrementAndGet(); + + RangeSubscription parent = new RangeSubscription(s, ids, start, start + count); + stacks.put(ids, stacktrace); + states.put(ids, false); + s.onSubscribe(parent); + } + + static final class RangeSubscription extends AtomicLong implements Subscription { + + private static final long serialVersionUID = 9066221863682220604L; + + final Subscriber actual; + + final int ids; + + final long end; + + long index; + + volatile boolean cancelled; + + RangeSubscription(Subscriber actual, int ids, long start, long end) { + this.actual = actual; + this.ids = ids; + this.index = start; + this.end = end; + } + + @Override + public void request(long n) { + if (!cancelled) { + if (n <= 0L) { + cancelled = true; + states.put(ids, true); + actual.onError(new IllegalArgumentException("ยง3.9 violated")); + return; + } + + for (;;) { + long r = get(); + long u = r + n; + if (u < 0L) { + u = Long.MAX_VALUE; + } + if (compareAndSet(r, u)) { + if (r == 0) { + break; + } + return; + } + } + + long idx = index; + long f = end; + + for (;;) { + long e = 0; + while (e != n && idx != f) { + if (cancelled) { + return; + } + + actual.onNext((int)idx); + + idx++; + e++; + } + + if (idx == f) { + if (!cancelled) { + states.put(ids, true); + actual.onComplete(); + } + return; + } + + index = idx; + n = addAndGet(-n); + if (n == 0) { + break; + } + } + } + } + + @Override + public void cancel() { + cancelled = true; + states.put(ids, true); + } + } + } +}