diff --git a/examples/src/test/java/org/reactivestreams/example/unicast/AsyncRangePublisherTest.java b/examples/src/test/java/org/reactivestreams/example/unicast/AsyncRangePublisherTest.java new file mode 100644 index 00000000..65b3d5e9 --- /dev/null +++ b/examples/src/test/java/org/reactivestreams/example/unicast/AsyncRangePublisherTest.java @@ -0,0 +1,215 @@ +/*************************************************** + * Licensed under MIT No Attribution (SPDX: MIT-0) * + ***************************************************/ +package org.reactivestreams.example.unicast; + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.reactivestreams.tck.PublisherVerification; +import org.reactivestreams.tck.TestEnvironment; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; + +@Test // Must be here for TestNG to find and run this, do not remove +public class AsyncRangePublisherTest extends PublisherVerification { + private static final int TERMINAL_DELAY_MS = 20; + private static final int DEFAULT_TIMEOUT_MS = 5000; + private static final int DEFAULT_POLL_INTERVAL_MS = TERMINAL_DELAY_MS / 2; + private ExecutorService e; + @BeforeClass + void before() { e = Executors.newCachedThreadPool(); } + @AfterClass + void after() { if (e != null) e.shutdown(); } + + public AsyncRangePublisherTest() { + super(new TestEnvironment(DEFAULT_TIMEOUT_MS, 50, DEFAULT_POLL_INTERVAL_MS)); + } + + @Override + public Publisher createPublisher(long elements) { + return new AsyncPublisher(new RangePublisher(1, (int)elements), e); + } + + @Override + public Publisher createFailedPublisher() { + return null; + } + + private static final class AsyncPublisher implements Publisher { + private final Publisher original; + private final Executor executor; + + private AsyncPublisher(Publisher original, Executor executor) { + this.original = requireNonNull(original); + this.executor = requireNonNull(executor); + } + + @Override + public void subscribe(Subscriber s) { + AsyncSubscriber.wrapAndSubscribe(original, requireNonNull(s), executor); + } + + private static final class AsyncSubscriber implements Subscriber { + private final BlockingQueue signalQueue = new LinkedBlockingQueue(); + + static void wrapAndSubscribe(final Publisher publisher, + final Subscriber targetSubscriber, final Executor executor) { + final AsyncSubscriber asyncSubscriber = new AsyncSubscriber(); + try { + executor.execute(new Runnable() { + private Subscription subscription; + private boolean terminated; + @Override + public void run() { + try { + for (; ; ) { + final Object signal = asyncSubscriber.signalQueue.take(); + if (signal instanceof Cancelled) { + return; + } else if (signal instanceof TerminalSignal) { + // sleep intentional to verify TestEnvironment.expectError behavior. + Thread.sleep(TERMINAL_DELAY_MS); + + TerminalSignal terminalSignal = (TerminalSignal) signal; + terminated = true; + if (terminalSignal.cause == null) { + targetSubscriber.onComplete(); + } else { + targetSubscriber.onError(terminalSignal.cause); + } + return; + } else if (signal instanceof OnSubscribeSignal) { + // We distribute the subscription downstream and may also call cancel on this + // thread if an exception is thrown. Since there is no concurrency allowed, make + // the subscription safe for concurrency. + subscription = concurrentSafe(((OnSubscribeSignal) signal).subscription); + targetSubscriber.onSubscribe(subscription); + } else { + @SuppressWarnings("unchecked") final T onNextSignal = ((OnNextSignal) signal).onNext; + targetSubscriber.onNext(onNextSignal); + } + } + } catch (Throwable cause) { + if (!terminated) { + try { + if (subscription == null) { + targetSubscriber.onSubscribe(noopSubscription()); + } else { + subscription.cancel(); + } + } finally { + terminated = true; + targetSubscriber.onError(new IllegalStateException("run loop interrupted", cause)); + } + } + } + } + }); + } catch (Throwable cause) { + try { + targetSubscriber.onSubscribe(noopSubscription()); + } finally { + targetSubscriber.onError(new IllegalStateException("Executor rejected", cause)); + } + // Publisher rejected the target subscriber and terminated it, don't continue to subscribe to avoid + // duplicate termination. + return; + } + publisher.subscribe(asyncSubscriber); + } + + @Override + public void onSubscribe(final Subscription s) { + signalQueue.add(new OnSubscribeSignal(new Subscription() { + @Override + public void request(long n) { + s.request(n); + } + + @Override + public void cancel() { + try { + s.cancel(); + } finally { + signalQueue.add(new Cancelled()); + } + } + })); + } + + @Override + public void onNext(T t) { + signalQueue.add(new OnNextSignal(t)); + } + + @Override + public void onError(Throwable t) { + signalQueue.add(new TerminalSignal(requireNonNull(t))); + } + + @Override + public void onComplete() { + signalQueue.add(new TerminalSignal(null)); + } + } + + private static Subscription concurrentSafe(final Subscription subscription) { + // TODO: make concurrent safe. TCK interacts from the subscription concurrently so lock isn't sufficient. + return subscription; + } + + private static Subscription noopSubscription() { + return new Subscription() { + @Override + public void request(long n) { + } + + @Override + public void cancel() { + } + }; + } + + private static final class TerminalSignal { + private final Throwable cause; + + private TerminalSignal(Throwable cause) { + this.cause = cause; + } + } + + private static final class OnSubscribeSignal { + private final Subscription subscription; + + private OnSubscribeSignal(Subscription subscription) { + this.subscription = subscription; + } + } + + private static final class OnNextSignal { + private final T onNext; + + private OnNextSignal(T onNext) { + this.onNext = onNext; + } + } + + private static final class Cancelled { + } + + private static T requireNonNull(T o) { + if (o == null) { + throw new NullPointerException(); + } + return o; + } + } +} diff --git a/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java b/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java index a70271ab..51f6c9f2 100644 --- a/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java +++ b/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java @@ -1112,7 +1112,7 @@ final E expectError(Class clazz, final long totalTimeou if (env.asyncErrors.isEmpty()) { timeStampBNs = System.nanoTime(); - totalTimeoutRemainingNs =- timeStampBNs - timeStampANs; + totalTimeoutRemainingNs -= (timeStampBNs - timeStampANs); timeStampANs = timeStampBNs; if (totalTimeoutRemainingNs <= 0) {