From 2338efd261e35376bdade9795c493ed2f0ea96d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Sun, 26 Nov 2017 22:37:16 +0100 Subject: [PATCH 1/3] Add SubmissionPublisher TCK test --- .../tck/flow/SubmissionPublisherTest.java | 53 +++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 tck-flow/src/test/java/org/reactivestreams/tck/flow/SubmissionPublisherTest.java diff --git a/tck-flow/src/test/java/org/reactivestreams/tck/flow/SubmissionPublisherTest.java b/tck-flow/src/test/java/org/reactivestreams/tck/flow/SubmissionPublisherTest.java new file mode 100644 index 00000000..1981244f --- /dev/null +++ b/tck-flow/src/test/java/org/reactivestreams/tck/flow/SubmissionPublisherTest.java @@ -0,0 +1,53 @@ +/************************************************************************ + * Licensed under Public Domain (CC0) * + * * + * To the extent possible under law, the person who associated CC0 with * + * this code has waived all copyright and related or neighboring * + * rights to this code. * + * * + * You should have received a copy of the CC0 legalcode along with this * + * work. If not, see .* + ************************************************************************/ + +package org.reactivestreams.tck.flow; + +import org.reactivestreams.tck.TestEnvironment; + +import java.util.concurrent.*; + +public class SubmissionPublisherTest extends FlowPublisherVerification { + + public SubmissionPublisherTest() { + super(new TestEnvironment()); + } + + @Override + public Flow.Publisher createFlowPublisher(long elements) { + SubmissionPublisher sp = new SubmissionPublisher<>(); + + ForkJoinPool.commonPool().submit(() -> { + while (sp.getNumberOfSubscribers() == 0) { + try { + Thread.sleep(1); + } catch (InterruptedException ex) { + return; + } + } + + for (int i = 0; i < elements; i++) { + sp.submit(i); + } + + sp.close(); + }); + + return sp; + } + + @Override + public Flow.Publisher createFailedFlowPublisher() { + SubmissionPublisher sp = new SubmissionPublisher<>(); + sp.closeExceptionally(new Exception()); + return sp; + } +} From 8cdf9133a212582041600d84861e7042d3b07425 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Sun, 26 Nov 2017 22:42:34 +0100 Subject: [PATCH 2/3] Flow-TCK is set up as Java 6 compile target --- .../tck/flow/SubmissionPublisherTest.java | 35 +++++++++++-------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/tck-flow/src/test/java/org/reactivestreams/tck/flow/SubmissionPublisherTest.java b/tck-flow/src/test/java/org/reactivestreams/tck/flow/SubmissionPublisherTest.java index 1981244f..9df75ba8 100644 --- a/tck-flow/src/test/java/org/reactivestreams/tck/flow/SubmissionPublisherTest.java +++ b/tck-flow/src/test/java/org/reactivestreams/tck/flow/SubmissionPublisherTest.java @@ -12,9 +12,11 @@ package org.reactivestreams.tck.flow; import org.reactivestreams.tck.TestEnvironment; +import org.testng.annotations.Test; import java.util.concurrent.*; +@Test public class SubmissionPublisherTest extends FlowPublisherVerification { public SubmissionPublisherTest() { @@ -22,23 +24,26 @@ public SubmissionPublisherTest() { } @Override - public Flow.Publisher createFlowPublisher(long elements) { - SubmissionPublisher sp = new SubmissionPublisher<>(); - - ForkJoinPool.commonPool().submit(() -> { - while (sp.getNumberOfSubscribers() == 0) { - try { - Thread.sleep(1); - } catch (InterruptedException ex) { - return; + public Flow.Publisher createFlowPublisher(final long elements) { + final SubmissionPublisher sp = new SubmissionPublisher(); + + ForkJoinPool.commonPool().submit(new Runnable() { + @Override + public void run() { + while (sp.getNumberOfSubscribers() == 0) { + try { + Thread.sleep(1); + } catch (InterruptedException ex) { + return; + } } - } - for (int i = 0; i < elements; i++) { - sp.submit(i); - } + for (int i = 0; i < elements; i++) { + sp.submit(i); + } - sp.close(); + sp.close(); + } }); return sp; @@ -46,7 +51,7 @@ public Flow.Publisher createFlowPublisher(long elements) { @Override public Flow.Publisher createFailedFlowPublisher() { - SubmissionPublisher sp = new SubmissionPublisher<>(); + SubmissionPublisher sp = new SubmissionPublisher(); sp.closeExceptionally(new Exception()); return sp; } From 4225c7983dee666920e4c9d949f22061328c7eef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Mon, 27 Nov 2017 00:08:00 +0100 Subject: [PATCH 3/3] Try a plain thread in case FJP deadlocks on CI --- .../org/reactivestreams/tck/flow/SubmissionPublisherTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tck-flow/src/test/java/org/reactivestreams/tck/flow/SubmissionPublisherTest.java b/tck-flow/src/test/java/org/reactivestreams/tck/flow/SubmissionPublisherTest.java index 9df75ba8..f9857246 100644 --- a/tck-flow/src/test/java/org/reactivestreams/tck/flow/SubmissionPublisherTest.java +++ b/tck-flow/src/test/java/org/reactivestreams/tck/flow/SubmissionPublisherTest.java @@ -27,7 +27,7 @@ public SubmissionPublisherTest() { public Flow.Publisher createFlowPublisher(final long elements) { final SubmissionPublisher sp = new SubmissionPublisher(); - ForkJoinPool.commonPool().submit(new Runnable() { + Thread t = new Thread(new Runnable() { @Override public void run() { while (sp.getNumberOfSubscribers() == 0) { @@ -45,6 +45,8 @@ public void run() { sp.close(); } }); + t.setDaemon(true); + t.start(); return sp; }