14
14
import org .reactivestreams .Publisher ;
15
15
import org .reactivestreams .Subscriber ;
16
16
import org .reactivestreams .Subscription ;
17
- import org .reactivestreams .tck .flow .support .SubscriberBufferOverflowException ;
18
17
import org .reactivestreams .tck .flow .support .Optional ;
18
+ import org .reactivestreams .tck .flow .support .SubscriberBufferOverflowException ;
19
19
20
20
import java .util .Collections ;
21
21
import java .util .LinkedList ;
24
24
import java .util .concurrent .CopyOnWriteArrayList ;
25
25
import java .util .concurrent .CountDownLatch ;
26
26
import java .util .concurrent .TimeUnit ;
27
- import java .util .concurrent .atomic .AtomicBoolean ;
28
27
import java .util .concurrent .atomic .AtomicReference ;
29
28
29
+ import static java .util .concurrent .TimeUnit .MILLISECONDS ;
30
+ import static java .util .concurrent .TimeUnit .NANOSECONDS ;
30
31
import static org .testng .Assert .assertTrue ;
31
32
import static org .testng .Assert .fail ;
32
33
@@ -37,8 +38,10 @@ public class TestEnvironment {
37
38
private static final long DEFAULT_TIMEOUT_MILLIS = 100 ;
38
39
39
40
private static final String DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS_ENV = "DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS" ;
41
+ private static final String DEFAULT_POLL_TIMEOUT_MILLIS_ENV = "DEFAULT_POLL_TIMEOUT_MILLIS_ENV" ;
40
42
41
43
private final long defaultTimeoutMillis ;
44
+ private final long defaultPollTimeoutMillis ;
42
45
private final long defaultNoSignalsTimeoutMillis ;
43
46
private final boolean printlnDebug ;
44
47
@@ -51,14 +54,46 @@ public class TestEnvironment {
51
54
* run the tests.
52
55
* @param defaultTimeoutMillis default timeout to be used in all expect* methods
53
56
* @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore
57
+ * @param defaultPollTimeoutMillis default amount of time to poll for events if {@code defaultTimeoutMillis} isn't
58
+ * preempted by an asynchronous event.
54
59
* @param printlnDebug if true, signals such as OnNext / Request / OnComplete etc will be printed to standard output,
55
60
*/
56
- public TestEnvironment (long defaultTimeoutMillis , long defaultNoSignalsTimeoutMillis , boolean printlnDebug ) {
61
+ public TestEnvironment (long defaultTimeoutMillis , long defaultNoSignalsTimeoutMillis , long defaultPollTimeoutMillis ,
62
+ boolean printlnDebug ) {
57
63
this .defaultTimeoutMillis = defaultTimeoutMillis ;
64
+ this .defaultPollTimeoutMillis = defaultPollTimeoutMillis ;
58
65
this .defaultNoSignalsTimeoutMillis = defaultNoSignalsTimeoutMillis ;
59
66
this .printlnDebug = printlnDebug ;
60
67
}
61
68
69
+ /**
70
+ * Tests must specify the timeout for expected outcome of asynchronous
71
+ * interactions. Longer timeout does not invalidate the correctness of
72
+ * the implementation, but can in some cases result in longer time to
73
+ * run the tests.
74
+ * @param defaultTimeoutMillis default timeout to be used in all expect* methods
75
+ * @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore
76
+ * @param printlnDebug if true, signals such as OnNext / Request / OnComplete etc will be printed to standard output,
77
+ */
78
+ public TestEnvironment (long defaultTimeoutMillis , long defaultNoSignalsTimeoutMillis , boolean printlnDebug ) {
79
+ this (defaultTimeoutMillis , defaultNoSignalsTimeoutMillis , defaultTimeoutMillis , printlnDebug );
80
+ }
81
+
82
+ /**
83
+ * Tests must specify the timeout for expected outcome of asynchronous
84
+ * interactions. Longer timeout does not invalidate the correctness of
85
+ * the implementation, but can in some cases result in longer time to
86
+ * run the tests.
87
+ *
88
+ * @param defaultTimeoutMillis default timeout to be used in all expect* methods
89
+ * @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore
90
+ * @param defaultPollTimeoutMillis default amount of time to poll for events if {@code defaultTimeoutMillis} isn't
91
+ * preempted by an asynchronous event.
92
+ */
93
+ public TestEnvironment (long defaultTimeoutMillis , long defaultNoSignalsTimeoutMillis , long defaultPollTimeoutMillis ) {
94
+ this (defaultTimeoutMillis , defaultNoSignalsTimeoutMillis , defaultPollTimeoutMillis , false );
95
+ }
96
+
62
97
/**
63
98
* Tests must specify the timeout for expected outcome of asynchronous
64
99
* interactions. Longer timeout does not invalidate the correctness of
@@ -69,7 +104,7 @@ public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMi
69
104
* @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore
70
105
*/
71
106
public TestEnvironment (long defaultTimeoutMillis , long defaultNoSignalsTimeoutMillis ) {
72
- this (defaultTimeoutMillis , defaultNoSignalsTimeoutMillis , false );
107
+ this (defaultTimeoutMillis , defaultTimeoutMillis , defaultNoSignalsTimeoutMillis );
73
108
}
74
109
75
110
/**
@@ -81,7 +116,7 @@ public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMi
81
116
* @param defaultTimeoutMillis default timeout to be used in all expect* methods
82
117
*/
83
118
public TestEnvironment (long defaultTimeoutMillis ) {
84
- this (defaultTimeoutMillis , defaultTimeoutMillis , false );
119
+ this (defaultTimeoutMillis , defaultTimeoutMillis , defaultTimeoutMillis );
85
120
}
86
121
87
122
/**
@@ -97,7 +132,7 @@ public TestEnvironment(long defaultTimeoutMillis) {
97
132
* often helpful to pinpoint simple race conditions etc.
98
133
*/
99
134
public TestEnvironment (boolean printlnDebug ) {
100
- this (envDefaultTimeoutMillis (), envDefaultNoSignalsTimeoutMillis (), printlnDebug );
135
+ this (envDefaultTimeoutMillis (), envDefaultNoSignalsTimeoutMillis (), envDefaultPollTimeoutMillis (), printlnDebug );
101
136
}
102
137
103
138
/**
@@ -126,6 +161,14 @@ public long defaultNoSignalsTimeoutMillis() {
126
161
return defaultNoSignalsTimeoutMillis ;
127
162
}
128
163
164
+ /**
165
+ * The default amount of time to poll for events if {@code defaultTimeoutMillis} isn't preempted by an asynchronous
166
+ * event.
167
+ */
168
+ public long defaultPollTimeoutMillis () {
169
+ return defaultPollTimeoutMillis ;
170
+ }
171
+
129
172
/**
130
173
* Tries to parse the env variable {@code DEFAULT_TIMEOUT_MILLIS} as long and returns the value if present OR its default value.
131
174
*
@@ -156,6 +199,21 @@ public static long envDefaultNoSignalsTimeoutMillis() {
156
199
}
157
200
}
158
201
202
+ /**
203
+ * Tries to parse the env variable {@code DEFAULT_POLL_TIMEOUT_MILLIS_ENV} as long and returns the value if present OR its default value.
204
+ *
205
+ * @throws java.lang.IllegalArgumentException when unable to parse the env variable
206
+ */
207
+ public static long envDefaultPollTimeoutMillis () {
208
+ final String envMillis = System .getenv (DEFAULT_POLL_TIMEOUT_MILLIS_ENV );
209
+ if (envMillis == null ) return envDefaultTimeoutMillis ();
210
+ else try {
211
+ return Long .parseLong (envMillis );
212
+ } catch (NumberFormatException ex ) {
213
+ throw new IllegalArgumentException (String .format ("Unable to parse %s env value [%s] as long!" , DEFAULT_POLL_TIMEOUT_MILLIS_ENV , envMillis ), ex );
214
+ }
215
+ }
216
+
159
217
/**
160
218
* To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously.
161
219
* This method does *NOT* fail the test - it's up to inspections of the error to fail the test if required.
@@ -277,7 +335,7 @@ public Throwable dropAsyncError() {
277
335
}
278
336
279
337
/**
280
- * Waits for {@link TestEnvironment#defaultTimeoutMillis ()} and then verifies that no asynchronous errors
338
+ * Waits for {@link TestEnvironment#defaultNoSignalsTimeoutMillis ()} and then verifies that no asynchronous errors
281
339
* were signalled pior to, or during that time (by calling {@code flop()}).
282
340
*/
283
341
public void verifyNoAsyncErrors () {
@@ -519,42 +577,57 @@ public void expectCompletion(long timeoutMillis, String errorMsg) throws Interru
519
577
}
520
578
521
579
public <E extends Throwable > void expectErrorWithMessage (Class <E > expected , String requiredMessagePart ) throws Exception {
522
- expectErrorWithMessage (expected , requiredMessagePart , env .defaultTimeoutMillis ());
580
+ expectErrorWithMessage (expected , Collections . singletonList ( requiredMessagePart ) , env .defaultTimeoutMillis (), env . defaultPollTimeoutMillis ());
523
581
}
524
582
public <E extends Throwable > void expectErrorWithMessage (Class <E > expected , List <String > requiredMessagePartAlternatives ) throws Exception {
525
- expectErrorWithMessage (expected , requiredMessagePartAlternatives , env .defaultTimeoutMillis ());
583
+ expectErrorWithMessage (expected , requiredMessagePartAlternatives , env .defaultTimeoutMillis (), env . defaultPollTimeoutMillis () );
526
584
}
527
585
528
586
@ SuppressWarnings ("ThrowableResultOfMethodCallIgnored" )
529
587
public <E extends Throwable > void expectErrorWithMessage (Class <E > expected , String requiredMessagePart , long timeoutMillis ) throws Exception {
530
588
expectErrorWithMessage (expected , Collections .singletonList (requiredMessagePart ), timeoutMillis );
531
589
}
590
+
532
591
public <E extends Throwable > void expectErrorWithMessage (Class <E > expected , List <String > requiredMessagePartAlternatives , long timeoutMillis ) throws Exception {
533
- final E err = expectError (expected , timeoutMillis );
592
+ expectErrorWithMessage (expected , requiredMessagePartAlternatives , timeoutMillis , timeoutMillis );
593
+ }
594
+
595
+ public <E extends Throwable > void expectErrorWithMessage (Class <E > expected , List <String > requiredMessagePartAlternatives ,
596
+ long totalTimeoutMillis , long pollTimeoutMillis ) throws Exception {
597
+ final E err = expectError (expected , totalTimeoutMillis , pollTimeoutMillis );
534
598
final String message = err .getMessage ();
535
-
599
+
536
600
boolean contains = false ;
537
- for (String requiredMessagePart : requiredMessagePartAlternatives )
601
+ for (String requiredMessagePart : requiredMessagePartAlternatives )
538
602
if (message .contains (requiredMessagePart )) contains = true ; // not short-circuting loop, it is expected to
539
603
assertTrue (contains ,
540
- String .format ("Got expected exception [%s] but missing message part [%s], was: %s" ,
541
- err .getClass (), "anyOf: " + requiredMessagePartAlternatives , err .getMessage ()));
604
+ String .format ("Got expected exception [%s] but missing message part [%s], was: %s" ,
605
+ err .getClass (), "anyOf: " + requiredMessagePartAlternatives , err .getMessage ()));
542
606
}
543
607
544
608
public <E extends Throwable > E expectError (Class <E > expected ) throws Exception {
545
609
return expectError (expected , env .defaultTimeoutMillis ());
546
610
}
547
611
548
612
public <E extends Throwable > E expectError (Class <E > expected , long timeoutMillis ) throws Exception {
549
- return expectError (expected , timeoutMillis , String . format ( "Expected onError(%s)" , expected . getName () ));
613
+ return expectError (expected , timeoutMillis , env . defaultPollTimeoutMillis ( ));
550
614
}
551
615
552
616
public <E extends Throwable > E expectError (Class <E > expected , String errorMsg ) throws Exception {
553
617
return expectError (expected , env .defaultTimeoutMillis (), errorMsg );
554
618
}
555
619
556
620
public <E extends Throwable > E expectError (Class <E > expected , long timeoutMillis , String errorMsg ) throws Exception {
557
- return received .expectError (expected , timeoutMillis , errorMsg );
621
+ return expectError (expected , timeoutMillis , env .defaultPollTimeoutMillis (), errorMsg );
622
+ }
623
+
624
+ public <E extends Throwable > E expectError (Class <E > expected , long totalTimeoutMillis , long pollTimeoutMillis ) throws Exception {
625
+ return expectError (expected , totalTimeoutMillis , pollTimeoutMillis , String .format ("Expected onError(%s)" , expected .getName ()));
626
+ }
627
+
628
+ public <E extends Throwable > E expectError (Class <E > expected , long totalTimeoutMillis , long pollTimeoutMillis ,
629
+ String errorMsg ) throws Exception {
630
+ return received .expectError (expected , totalTimeoutMillis , pollTimeoutMillis , errorMsg );
558
631
}
559
632
560
633
public void expectNone () throws InterruptedException {
@@ -1025,22 +1098,44 @@ public void expectCompletion(long timeoutMillis, String errorMsg) throws Interru
1025
1098
} // else, ok
1026
1099
}
1027
1100
1028
- @ SuppressWarnings ("unchecked" )
1101
+ /**
1102
+ * @deprecated Deprecated in favor of {@link #expectError(Class, long, long, String)}.
1103
+ */
1104
+ @ Deprecated
1029
1105
public <E extends Throwable > E expectError (Class <E > clazz , long timeoutMillis , String errorMsg ) throws Exception {
1030
- Thread .sleep (timeoutMillis );
1031
-
1032
- if (env .asyncErrors .isEmpty ()) {
1033
- return env .flopAndFail (String .format ("%s within %d ms" , errorMsg , timeoutMillis ));
1034
- } else {
1035
- // ok, there was an expected error
1036
- Throwable thrown = env .asyncErrors .remove (0 );
1106
+ return expectError (clazz , timeoutMillis , timeoutMillis , errorMsg );
1107
+ }
1037
1108
1038
- if (clazz .isInstance (thrown )) {
1039
- return (E ) thrown ;
1109
+ @ SuppressWarnings ("unchecked" )
1110
+ final <E extends Throwable > E expectError (Class <E > clazz , final long totalTimeoutMillis ,
1111
+ long pollTimeoutMillis ,
1112
+ String errorMsg ) throws Exception {
1113
+ long totalTimeoutRemainingNs = MILLISECONDS .toNanos (totalTimeoutMillis );
1114
+ long timeStampANs = System .nanoTime ();
1115
+ long timeStampBNs ;
1116
+
1117
+ for (;;) {
1118
+ Thread .sleep (Math .min (pollTimeoutMillis , NANOSECONDS .toMillis (totalTimeoutRemainingNs )));
1119
+
1120
+ if (env .asyncErrors .isEmpty ()) {
1121
+ timeStampBNs = System .nanoTime ();
1122
+ totalTimeoutRemainingNs =- timeStampBNs - timeStampANs ;
1123
+ timeStampANs = timeStampBNs ;
1124
+
1125
+ if (totalTimeoutRemainingNs <= 0 ) {
1126
+ return env .flopAndFail (String .format ("%s within %d ms" , errorMsg , totalTimeoutMillis ));
1127
+ }
1040
1128
} else {
1129
+ // ok, there was an expected error
1130
+ Throwable thrown = env .asyncErrors .remove (0 );
1131
+
1132
+ if (clazz .isInstance (thrown )) {
1133
+ return (E ) thrown ;
1134
+ } else {
1041
1135
1042
- return env .flopAndFail (String .format ("%s within %d ms; Got %s but expected %s" ,
1043
- errorMsg , timeoutMillis , thrown .getClass ().getCanonicalName (), clazz .getCanonicalName ()));
1136
+ return env .flopAndFail (String .format ("%s within %d ms; Got %s but expected %s" ,
1137
+ errorMsg , totalTimeoutMillis , thrown .getClass ().getCanonicalName (), clazz .getCanonicalName ()));
1138
+ }
1044
1139
}
1045
1140
}
1046
1141
}
0 commit comments