File tree 1 file changed +53
-0
lines changed
tck-flow/src/test/java/org/reactivestreams/tck/flow
1 file changed +53
-0
lines changed Original file line number Diff line number Diff line change
1
+ /************************************************************************
2
+ * Licensed under Public Domain (CC0) *
3
+ * *
4
+ * To the extent possible under law, the person who associated CC0 with *
5
+ * this code has waived all copyright and related or neighboring *
6
+ * rights to this code. *
7
+ * *
8
+ * You should have received a copy of the CC0 legalcode along with this *
9
+ * work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.*
10
+ ************************************************************************/
11
+
12
+ package org .reactivestreams .tck .flow ;
13
+
14
+ import org .reactivestreams .tck .TestEnvironment ;
15
+
16
+ import java .util .concurrent .*;
17
+
18
+ public class SubmissionPublisherTest extends FlowPublisherVerification <Integer > {
19
+
20
+ public SubmissionPublisherTest () {
21
+ super (new TestEnvironment ());
22
+ }
23
+
24
+ @ Override
25
+ public Flow .Publisher <Integer > createFlowPublisher (long elements ) {
26
+ SubmissionPublisher <Integer > sp = new SubmissionPublisher <>();
27
+
28
+ ForkJoinPool .commonPool ().submit (() -> {
29
+ while (sp .getNumberOfSubscribers () == 0 ) {
30
+ try {
31
+ Thread .sleep (1 );
32
+ } catch (InterruptedException ex ) {
33
+ return ;
34
+ }
35
+ }
36
+
37
+ for (int i = 0 ; i < elements ; i ++) {
38
+ sp .submit (i );
39
+ }
40
+
41
+ sp .close ();
42
+ });
43
+
44
+ return sp ;
45
+ }
46
+
47
+ @ Override
48
+ public Flow .Publisher <Integer > createFailedFlowPublisher () {
49
+ SubmissionPublisher <Integer > sp = new SubmissionPublisher <>();
50
+ sp .closeExceptionally (new Exception ());
51
+ return sp ;
52
+ }
53
+ }
You can’t perform that action at this time.
0 commit comments