File tree 1 file changed +20
-15
lines changed
tck-flow/src/test/java/org/reactivestreams/tck/flow
1 file changed +20
-15
lines changed Original file line number Diff line number Diff line change 12
12
package org .reactivestreams .tck .flow ;
13
13
14
14
import org .reactivestreams .tck .TestEnvironment ;
15
+ import org .testng .annotations .Test ;
15
16
16
17
import java .util .concurrent .*;
17
18
19
+ @ Test
18
20
public class SubmissionPublisherTest extends FlowPublisherVerification <Integer > {
19
21
20
22
public SubmissionPublisherTest () {
21
23
super (new TestEnvironment ());
22
24
}
23
25
24
26
@ Override
25
- public Flow .Publisher <Integer > createFlowPublisher (long elements ) {
26
- SubmissionPublisher <Integer > sp = new SubmissionPublisher <>();
27
-
28
- ForkJoinPool .commonPool ().submit (() -> {
29
- while (sp .getNumberOfSubscribers () == 0 ) {
30
- try {
31
- Thread .sleep (1 );
32
- } catch (InterruptedException ex ) {
33
- return ;
27
+ public Flow .Publisher <Integer > createFlowPublisher (final long elements ) {
28
+ final SubmissionPublisher <Integer > sp = new SubmissionPublisher <Integer >();
29
+
30
+ ForkJoinPool .commonPool ().submit (new Runnable () {
31
+ @ Override
32
+ public void run () {
33
+ while (sp .getNumberOfSubscribers () == 0 ) {
34
+ try {
35
+ Thread .sleep (1 );
36
+ } catch (InterruptedException ex ) {
37
+ return ;
38
+ }
34
39
}
35
- }
36
40
37
- for (int i = 0 ; i < elements ; i ++) {
38
- sp .submit (i );
39
- }
41
+ for (int i = 0 ; i < elements ; i ++) {
42
+ sp .submit (i );
43
+ }
40
44
41
- sp .close ();
45
+ sp .close ();
46
+ }
42
47
});
43
48
44
49
return sp ;
45
50
}
46
51
47
52
@ Override
48
53
public Flow .Publisher <Integer > createFailedFlowPublisher () {
49
- SubmissionPublisher <Integer > sp = new SubmissionPublisher <>();
54
+ SubmissionPublisher <Integer > sp = new SubmissionPublisher <Integer >();
50
55
sp .closeExceptionally (new Exception ());
51
56
return sp ;
52
57
}
You can’t perform that action at this time.
0 commit comments