From 755e4bf5fbb204303aaf439bcea6f68c12242095 Mon Sep 17 00:00:00 2001 From: Luke Daley Date: Mon, 6 Apr 2015 09:30:47 +1000 Subject: [PATCH 1/2] Cancel the subscription after receiving all of the pertinent emissions (#259). --- .../java/org/reactivestreams/tck/PublisherVerification.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java b/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java index ea625d96..d82563c5 100644 --- a/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java @@ -1044,6 +1044,8 @@ public void onNext(T element) { if (callsCounter > 0) { subscription.value().request(Long.MAX_VALUE - 1); callsCounter--; + } else { + subscription.value().cancel(); } } else { env.flop(String.format("Subscriber::onNext(%s) called before Subscriber::onSubscribe", element)); From f07a8d4b4d8db126de9a30a2103bce7dbd5406af Mon Sep 17 00:00:00 2001 From: Luke Daley Date: Wed, 8 Apr 2015 11:46:27 +1000 Subject: [PATCH 2/2] Test that 'required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue' completes in a timely manner for fully synchronous publishers (#259). --- .../tck/PublisherVerificationTest.java | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/tck/src/test/java/org/reactivestreams/tck/PublisherVerificationTest.java b/tck/src/test/java/org/reactivestreams/tck/PublisherVerificationTest.java index 7c04de32..4b22b586 100644 --- a/tck/src/test/java/org/reactivestreams/tck/PublisherVerificationTest.java +++ b/tck/src/test/java/org/reactivestreams/tck/PublisherVerificationTest.java @@ -5,6 +5,7 @@ import org.reactivestreams.Subscription; import org.reactivestreams.tck.support.TCKVerificationSupport; import org.reactivestreams.tck.support.TestException; +import org.testng.Assert; import org.testng.annotations.Test; import java.util.Random; @@ -585,6 +586,38 @@ public void required_spec317_mustSupportACumulativePendingElementCountUpToLongMa }, "Async error during test execution: Illegally signalling onError too soon!"); } + @Test + public void required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue_forSynchronousPublisher() throws Throwable { + final AtomicInteger sent = new AtomicInteger(); + + customPublisherVerification(new Publisher() { + @Override + public void subscribe(final Subscriber downstream) { + downstream.onSubscribe(new Subscription() { + boolean started; + boolean cancelled; + + @Override + public void request(long n) { + if (!started) { + started = true; + while (!cancelled) { + downstream.onNext(sent.getAndIncrement()); + } + } + } + + @Override + public void cancel() { + cancelled = true; + } + }); + } + }).required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue(); + + // 11 due to the implementation of this particular TCK test (see impl) + Assert.assertEquals(sent.get(), 11); + } // FAILING IMPLEMENTATIONS //