From 8c4703e0616194387df2e4e2c32b1973e71be4c1 Mon Sep 17 00:00:00 2001 From: Konrad Malawski Date: Mon, 12 Mar 2018 23:49:53 +0900 Subject: [PATCH] =tck #427 --- .../unicast/SyncSubscriberWhiteboxTest.java | 1 - .../tck/SubscriberBlackboxVerification.java | 78 ++++++++++++++++++- .../reactivestreams/tck/TestEnvironment.java | 1 - .../SubscriberBlackboxVerificationRules.java | 20 ++++- .../SubscriberBlackboxVerificationTest.java | 61 ++++++++++++++- 5 files changed, 155 insertions(+), 6 deletions(-) diff --git a/examples/src/test/java/org/reactivestreams/example/unicast/SyncSubscriberWhiteboxTest.java b/examples/src/test/java/org/reactivestreams/example/unicast/SyncSubscriberWhiteboxTest.java index 06f7cc9b..32d39e04 100644 --- a/examples/src/test/java/org/reactivestreams/example/unicast/SyncSubscriberWhiteboxTest.java +++ b/examples/src/test/java/org/reactivestreams/example/unicast/SyncSubscriberWhiteboxTest.java @@ -13,7 +13,6 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; -import org.reactivestreams.tck.SubscriberBlackboxVerification; import org.reactivestreams.tck.SubscriberWhiteboxVerification; import org.reactivestreams.tck.TestEnvironment; import org.testng.annotations.AfterClass; diff --git a/tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java b/tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java index 8931ec93..63940536 100644 --- a/tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java @@ -25,8 +25,11 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import static org.reactivestreams.tck.SubscriberWhiteboxVerification.BlackboxSubscriberProxy; import static org.testng.Assert.assertTrue; @@ -231,6 +234,77 @@ public String toString() { }}; } + @Override @Test + public void required_spec205_blackbox_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignalConcurrently() throws Throwable { + blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() { + @Override + public void run(BlackboxTestStage stage) throws Throwable { + final Subscriber sub = createSubscriber(); + + final AtomicInteger cancellationCounter = new AtomicInteger(0); + final AtomicReference firstAcceptedSubscription = new AtomicReference(null); + final AtomicReference illegallyAcceptedSubscription = new AtomicReference(null); + + final int tasksToSubmit = 8 * Runtime.getRuntime().availableProcessors(); + + Callable invokeOnSubscribe = new Callable() { + final Subscriber targetSubscriber = sub; + + @Override + public Void call() throws Exception { + targetSubscriber.onSubscribe(new Subscription() { + + // we keep around the thread name to help identify which thread's subscription was accepted first, + // and then illegally second + final String nameWithThread = "Subscription"; + + @Override public void request(long elements) { + if (firstAcceptedSubscription.compareAndSet(null, this)) { + env.debug(String.format("Accepted subscription %s (first)", firstAcceptedSubscription.get())); + // first one accepted, this is legal + // > invokeOnSubscribeConcurrentlyTasks = new ArrayList>(tasksToSubmit); + for (int i = 0; i < tasksToSubmit; i++) { + invokeOnSubscribeConcurrentlyTasks.add(invokeOnSubscribe); + } + + List> futures = Executors.newFixedThreadPool(16).invokeAll(invokeOnSubscribeConcurrentlyTasks); + for (Future future : futures) { + future.get(env.defaultTimeoutMillis(), TimeUnit.MILLISECONDS); + } + + try { + env.verifyNoAsyncErrors(); + } finally { + Subscription accepted = firstAcceptedSubscription.get(); + if (accepted != null) accepted.cancel(); + } + + // sendCompletion(); // we're done, complete the subscriber under test + } + }); + } + @Override @Test public void untested_spec206_blackbox_mustCallSubscriptionCancelIfItIsNoLongerValid() throws Exception { notVerified(); // cannot be meaningfully tested, or can it? diff --git a/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java b/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java index ce0a5b99..fbb87a1d 100644 --- a/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java +++ b/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java @@ -24,7 +24,6 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import static org.testng.Assert.assertTrue; diff --git a/tck/src/main/java/org/reactivestreams/tck/flow/support/SubscriberBlackboxVerificationRules.java b/tck/src/main/java/org/reactivestreams/tck/flow/support/SubscriberBlackboxVerificationRules.java index e240c780..796c1c51 100644 --- a/tck/src/main/java/org/reactivestreams/tck/flow/support/SubscriberBlackboxVerificationRules.java +++ b/tck/src/main/java/org/reactivestreams/tck/flow/support/SubscriberBlackboxVerificationRules.java @@ -133,7 +133,25 @@ public interface SubscriberBlackboxVerificationRules { * */ void required_spec205_blackbox_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Exception; - + /** + * Asks for a {@code Subscriber}, signals {@code onSubscribe} from different threads ... + *

+ * Verifies rule: 2.5 + *

+ * Notes: + *

    + *
  • The test doesn't signal any other events than {@code onSubscribe} and may cause resource leak in + * {@code Subscriber}s that expect a finite {@code Publisher}. + *
+ *

+ * If this test fails, the following could be checked within the {@code Subscriber} implementation: + *

    + *
  • if the {@code Subscribe.onSubscribe} implementation actually tries to detect multiple calls to it,
  • + *
  • if the second {@code Subscription} is cancelled asynchronously and that takes longer time than + * the {@code TestEnvironment}'s timeout permits.
  • + *
+ */ + void required_spec205_blackbox_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignalConcurrently() throws Throwable; /** * Currently, this test is skipped because it requires more control over the {@code Subscriber} implementation * to make it cancel the {@code Subscription} for some external condition. diff --git a/tck/src/test/java/org/reactivestreams/tck/SubscriberBlackboxVerificationTest.java b/tck/src/test/java/org/reactivestreams/tck/SubscriberBlackboxVerificationTest.java index 9f5055bf..bd726c7b 100644 --- a/tck/src/test/java/org/reactivestreams/tck/SubscriberBlackboxVerificationTest.java +++ b/tck/src/test/java/org/reactivestreams/tck/SubscriberBlackboxVerificationTest.java @@ -22,6 +22,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; /** * Validates that the TCK's {@link org.reactivestreams.tck.SubscriberBlackboxVerification} fails with nice human readable errors. @@ -137,10 +138,68 @@ public void onComplete() { completion.countDown(); } }).required_spec205_blackbox_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal(); - + completion.await(1, TimeUnit.SECONDS); } + @Test + public void required_spec205_blackbox_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignalConcurrently_shouldFail() throws Throwable { + requireTestFailure(new ThrowingRunnable() { + @Override public void run() throws Throwable { + customSubscriberVerification( + // THIS IS AN INCORRECT IMPLEMENTATION SINCE THE subscription ACCESS IS NOT SYNCHRONIZED: + new NoopSubscriber() { + + Subscription sub = null; // on purpose missing volatile or any external synchronization + + @Override + public void onSubscribe(Subscription s) { + if (this.sub == null) { // since sub is not volatile, this is racy, and we may accept (illegally) multiple subscriptions + + // This sleep is only here to make the "test testing the TCK" with this illegal implementation + // more shockinly expose the potential race condition here. In general implementations which naively + // check for sub == null, and then assign / assume this accept was valid, are prone to the race + // condition of multiple publishers sending them a subscription concurrently, thus one may accidentally + // accept two subscriptions -- which is illegal + // + // The sleep is not necessary to expose this race, however it makes this test testing the TCK more reliable on CI. + try { Thread.sleep(10); } + catch (InterruptedException ignored) {} + + this.sub = s; + s.request(1); + } else { + s.cancel(); + } + } + } + ).required_spec205_blackbox_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignalConcurrently(); + } + }, "ilegally accepted and issued request() to two subscriptions, violating rule 2.5; The accepted subscriptions are"); + } + + @Test + public void required_spec205_blackbox_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignalConcurrently_shouldPass() throws Throwable { + customSubscriberVerification( + new NoopSubscriber() { + final AtomicReference sub = new AtomicReference(); + + @Override + public void onSubscribe(Subscription s) { + if (sub.compareAndSet(null, s)) { + // successfully accepted subscription, may issue requests to it + s.request(1); + // s.cancel(); // this is fine + } else { + // all others must be cancelled + s.cancel(); + } + } + } + ).required_spec205_blackbox_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignalConcurrently(); + } + + @Test public void required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall_shouldFail() throws Throwable { requireTestFailure(new ThrowingRunnable() {