diff --git a/tck-flow/src/test/java/org/reactivestreams/tck/flow/SubmissionPublisherTest.java b/tck-flow/src/test/java/org/reactivestreams/tck/flow/SubmissionPublisherTest.java new file mode 100644 index 00000000..f9857246 --- /dev/null +++ b/tck-flow/src/test/java/org/reactivestreams/tck/flow/SubmissionPublisherTest.java @@ -0,0 +1,60 @@ +/************************************************************************ + * Licensed under Public Domain (CC0) * + * * + * To the extent possible under law, the person who associated CC0 with * + * this code has waived all copyright and related or neighboring * + * rights to this code. * + * * + * You should have received a copy of the CC0 legalcode along with this * + * work. If not, see .* + ************************************************************************/ + +package org.reactivestreams.tck.flow; + +import org.reactivestreams.tck.TestEnvironment; +import org.testng.annotations.Test; + +import java.util.concurrent.*; + +@Test +public class SubmissionPublisherTest extends FlowPublisherVerification { + + public SubmissionPublisherTest() { + super(new TestEnvironment()); + } + + @Override + public Flow.Publisher createFlowPublisher(final long elements) { + final SubmissionPublisher sp = new SubmissionPublisher(); + + Thread t = new Thread(new Runnable() { + @Override + public void run() { + while (sp.getNumberOfSubscribers() == 0) { + try { + Thread.sleep(1); + } catch (InterruptedException ex) { + return; + } + } + + for (int i = 0; i < elements; i++) { + sp.submit(i); + } + + sp.close(); + } + }); + t.setDaemon(true); + t.start(); + + return sp; + } + + @Override + public Flow.Publisher createFailedFlowPublisher() { + SubmissionPublisher sp = new SubmissionPublisher(); + sp.closeExceptionally(new Exception()); + return sp; + } +}