Skip to content

Commit 4df0d4d

Browse files
committed
+TCK verifyNoAsyncErrors now by default waits, fixes spec111
Resolves #239
1 parent b33420a commit 4df0d4d

5 files changed

+66
-40
lines changed

tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,7 @@ public TestSetup apply(Long aLong) throws Throwable {
404404
sub1.expectError(ex);
405405
sub2.expectError(ex);
406406

407-
env.verifyNoAsyncErrors();
407+
env.verifyNoAsyncErrorsNoDelay();
408408
}};
409409
}
410410
});
@@ -476,7 +476,7 @@ public void mustImmediatelyPassOnOnErrorEventsReceivedFromItsUpstreamToItsDownst
476476
sendError(ex);
477477
sub.expectError(ex); // "immediately", i.e. without a preceding request
478478

479-
env.verifyNoAsyncErrors();
479+
env.verifyNoAsyncErrorsNoDelay();
480480
}};
481481
}
482482

@@ -664,7 +664,7 @@ public TestSetup apply(Long subscribers) throws Throwable {
664664
sub1.expectCompletion(env.defaultTimeoutMillis());
665665
sub2.expectCompletion(env.defaultTimeoutMillis());
666666

667-
env.verifyNoAsyncErrors();
667+
env.verifyNoAsyncErrorsNoDelay();
668668
}};
669669
}
670670
});

tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java

+13-12
Original file line numberDiff line numberDiff line change
@@ -456,9 +456,10 @@ public void run(Publisher<T> pub) throws Throwable {
456456
try {
457457
pub.subscribe(null);
458458
env.flop("Publisher did not throw a NullPointerException when given a null Subscribe in subscribe");
459-
} catch (NullPointerException npe) {
459+
} catch (NullPointerException ignored) {
460+
// valid behaviour
460461
}
461-
env.verifyNoAsyncErrors();
462+
env.verifyNoAsyncErrorsNoDelay();
462463
}
463464
});
464465
}
@@ -493,7 +494,7 @@ public void onComplete() {
493494
}
494495
});
495496
onSubscribeLatch.expectClose("Should have received onSubscribe");
496-
env.verifyNoAsyncErrors();
497+
env.verifyNoAsyncErrorsNoDelay();
497498
}
498499
});
499500
}
@@ -524,7 +525,7 @@ public void onSubscribe(Subscription subs) {
524525
onSubscribeLatch.expectClose("Should have received onSubscribe");
525526
onErrorLatch.expectClose("Should have received onError");
526527

527-
env.verifyNoAsyncErrors();
528+
env.verifyNoAsyncErrorsNoDelay();
528529
}
529530
});
530531
}
@@ -773,7 +774,7 @@ public void onError(Throwable cause) {
773774
"awaited at-most %s signals (`maxOnNextSignalsInRecursionTest()`) or completion",
774775
oneMoreThanBoundedLimit);
775776
runCompleted.expectClose(env.defaultTimeoutMillis(), msg);
776-
env.verifyNoAsyncErrors();
777+
env.verifyNoAsyncErrorsNoDelay();
777778
} finally {
778779
// since the request/onNext recursive calls may keep the publisher running "forever",
779780
// we MUST cancel it manually before exiting this test case
@@ -824,7 +825,7 @@ public void cancel() {
824825
sub.request(1);
825826

826827
sub.expectNone();
827-
env.verifyNoAsyncErrors();
828+
env.verifyNoAsyncErrorsNoDelay();
828829
}
829830
});
830831
}
@@ -845,7 +846,7 @@ public void run(Publisher<T> pub) throws Throwable {
845846
subs.cancel();
846847

847848
sub.expectNone();
848-
env.verifyNoAsyncErrors();
849+
env.verifyNoAsyncErrorsNoDelay();
849850
}
850851
});
851852
}
@@ -936,7 +937,7 @@ > AsyncPublisher receives cancel() - handles it right away, by "stopping itself"
936937
}
937938
});
938939

939-
env.verifyNoAsyncErrors();
940+
env.verifyNoAsyncErrorsNoDelay();
940941
}
941942

942943
// Verifies rule: https://github.com/reactive-streams/reactive-streams#3.13
@@ -972,7 +973,7 @@ public void run(Publisher<T> pub) throws Throwable {
972973
env.flop(String.format("Publisher %s did not drop reference to test subscriber after subscription cancellation", pub));
973974
}
974975

975-
env.verifyNoAsyncErrors();
976+
env.verifyNoAsyncErrorsNoDelay();
976977
}
977978
});
978979
}
@@ -991,7 +992,7 @@ public void run(Publisher<T> pub) throws Throwable {
991992
sub.nextElements(totalElements);
992993
sub.expectCompletion();
993994

994-
env.verifyNoAsyncErrors();
995+
env.verifyNoAsyncErrorsNoDelay();
995996
}
996997
});
997998
}
@@ -1013,7 +1014,7 @@ public void run(Publisher<T> pub) throws Throwable {
10131014
sub.expectCompletion();
10141015

10151016
try {
1016-
env.verifyNoAsyncErrors();
1017+
env.verifyNoAsyncErrorsNoDelay();
10171018
} finally {
10181019
sub.cancel();
10191020
}
@@ -1086,7 +1087,7 @@ public void activePublisherTest(long elements, boolean completionSignalRequired,
10861087
} else {
10871088
Publisher<T> pub = createPublisher(elements);
10881089
body.run(pub);
1089-
env.verifyNoAsyncErrors();
1090+
env.verifyNoAsyncErrorsNoDelay();
10901091
}
10911092
}
10921093

tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java

+8-8
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ public void cancel() {
121121
sub.onSubscribe(subs);
122122
sub.onComplete();
123123

124-
env.verifyNoAsyncErrors();
124+
env.verifyNoAsyncErrorsNoDelay();
125125
}
126126
});
127127
}
@@ -160,7 +160,7 @@ public void cancel() {
160160
sub.onSubscribe(subs);
161161
sub.onError(new TestException());
162162

163-
env.verifyNoAsyncErrors();
163+
env.verifyNoAsyncErrorsNoDelay();
164164
}
165165
});
166166
}
@@ -196,7 +196,7 @@ public String toString() {
196196
});
197197

198198
secondSubscriptionCancelled.expectClose("Expected SecondSubscription given to subscriber to be cancelled, but `Subscription.cancel()` was not called.");
199-
env.verifyNoAsyncErrors();
199+
env.verifyNoAsyncErrorsNoDelay();
200200
}};
201201
}
202202

@@ -251,7 +251,7 @@ public void run(BlackboxTestStage stage) throws Throwable {
251251
probe.expectCompletion();
252252
probe.expectNone();
253253

254-
env.verifyNoAsyncErrors();
254+
env.verifyNoAsyncErrorsNoDelay();
255255
}
256256
});
257257
}
@@ -275,7 +275,7 @@ public void subscribe(Subscriber<? super T> s) {
275275
pub.subscribe(probe);
276276
probe.expectCompletion();
277277

278-
env.verifyNoAsyncErrors();
278+
env.verifyNoAsyncErrorsNoDelay();
279279
}
280280
});
281281
}
@@ -329,7 +329,7 @@ public void run(BlackboxTestStage stage) throws Throwable {
329329
assertTrue(gotNPE, "onSubscribe(null) did not throw NullPointerException");
330330
}
331331

332-
env.verifyNoAsyncErrors();
332+
env.verifyNoAsyncErrorsNoDelay();
333333
}
334334
});
335335
}
@@ -357,7 +357,7 @@ public void run(BlackboxTestStage stage) throws Throwable {
357357
assertTrue(gotNPE, "onNext(null) did not throw NullPointerException");
358358
}
359359

360-
env.verifyNoAsyncErrors();
360+
env.verifyNoAsyncErrorsNoDelay();
361361
}
362362
});
363363
}
@@ -385,7 +385,7 @@ public void run(BlackboxTestStage stage) throws Throwable {
385385
assertTrue(gotNPE, "onError(null) did not throw NullPointerException");
386386
}
387387

388-
env.verifyNoAsyncErrors();
388+
env.verifyNoAsyncErrorsNoDelay();
389389
}
390390
});
391391
}

tck/src/main/java/org/reactivestreams/tck/SubscriberWhiteboxVerification.java

+9-9
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ public void cancel() {
149149
sub.onSubscribe(subs);
150150
sub.onComplete();
151151

152-
env.verifyNoAsyncErrors();
152+
env.verifyNoAsyncErrorsNoDelay();
153153
}
154154
});
155155
}
@@ -190,7 +190,7 @@ public void cancel() {
190190
sub.onSubscribe(subs);
191191
sub.onError(new TestException());
192192

193-
env.verifyNoAsyncErrors();
193+
env.verifyNoAsyncErrorsNoDelay();
194194
}
195195
});
196196
}
@@ -226,7 +226,7 @@ public String toString() {
226226
});
227227

228228
secondSubscriptionCancelled.expectClose("Expected 2nd Subscription given to subscriber to be cancelled, but `Subscription.cancel()` was not called.");
229-
env.verifyNoAsyncErrors();
229+
env.verifyNoAsyncErrorsNoDelay();
230230
}};
231231
}
232232

@@ -303,7 +303,7 @@ public void run(WhiteboxTestStage stage) throws InterruptedException {
303303
stage.sendError(ex);
304304
stage.probe.expectError(ex);
305305

306-
env.verifyNoAsyncErrors();
306+
env.verifyNoAsyncErrorsNoDelay();
307307
}
308308
});
309309
}
@@ -318,7 +318,7 @@ public void run(WhiteboxTestStage stage) throws InterruptedException {
318318
stage.sendError(ex);
319319
stage.probe.expectError(ex);
320320

321-
env.verifyNoAsyncErrors();
321+
env.verifyNoAsyncErrorsNoDelay();
322322
}
323323
});
324324
}
@@ -359,7 +359,7 @@ public void run(WhiteboxTestStage stage) throws Throwable {
359359
assertTrue(gotNPE, "onSubscribe(null) did not throw NullPointerException");
360360
}
361361

362-
env.verifyNoAsyncErrors();
362+
env.verifyNoAsyncErrorsNoDelay();
363363
}
364364
});
365365
}// Verifies rule: https://github.com/reactive-streams/reactive-streams#2.13
@@ -391,7 +391,7 @@ public void cancel() {
391391
assertTrue(gotNPE, "onNext(null) did not throw NullPointerException");
392392
}
393393

394-
env.verifyNoAsyncErrors();
394+
env.verifyNoAsyncErrorsNoDelay();
395395
}
396396
});
397397
}
@@ -425,7 +425,7 @@ public void cancel() {
425425
assertTrue(gotNPE, "onError(null) did not throw NullPointerException");
426426
}
427427

428-
env.verifyNoAsyncErrors();
428+
env.verifyNoAsyncErrorsNoDelay();
429429
}
430430
});
431431
}
@@ -570,7 +570,7 @@ public T nextT() throws InterruptedException {
570570
}
571571

572572
public void verifyNoAsyncErrors() {
573-
env.verifyNoAsyncErrors();
573+
env.verifyNoAsyncErrorsNoDelay();
574574
}
575575
}
576576

tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java

+33-8
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import org.reactivestreams.tck.support.SubscriberBufferOverflowException;
77
import org.reactivestreams.tck.support.Optional;
88

9-
import java.text.NumberFormat;
109
import java.util.LinkedList;
1110
import java.util.List;
1211
import java.util.concurrent.ArrayBlockingQueue;
@@ -107,7 +106,7 @@ public static long envDefaultTimeoutMillis() {
107106
* To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously.
108107
* This method does *NOT* fail the test - it's up to inspections of the error to fail the test if required.
109108
*
110-
* Use {@code env.verifyNoAsyncErrors()} at the end of your TCK tests to verify there no flops called during it's execution.
109+
* Use {@code env.verifyNoAsyncErrorsNoDelay()} at the end of your TCK tests to verify there no flops called during it's execution.
111110
* To check investigate asyncErrors more closely you can use {@code expectError} methods or collect the error directly
112111
* from the environment using {@code env.dropAsyncError()}.
113112
*
@@ -127,7 +126,7 @@ public void flop(String msg) {
127126
*
128127
* This overload keeps the passed in throwable as the asyncError, instead of creating an AssertionError for this.
129128
*
130-
* Use {@code env.verifyNoAsyncErrors()} at the end of your TCK tests to verify there no flops called during it's execution.
129+
* Use {@code env.verifyNoAsyncErrorsNoDelay()} at the end of your TCK tests to verify there no flops called during it's execution.
131130
* To check investigate asyncErrors more closely you can use {@code expectError} methods or collect the error directly
132131
* from the environment using {@code env.dropAsyncError()}.
133132
*
@@ -147,7 +146,7 @@ public void flop(Throwable thr, String msg) {
147146
*
148147
* This overload keeps the passed in throwable as the asyncError, instead of creating an AssertionError for this.
149148
*
150-
* Use {@code env.verifyNoAsyncErrors()} at the end of your TCK tests to verify there no flops called during it's execution.
149+
* Use {@code env.verifyNoAsyncErrorsNoDelay()} at the end of your TCK tests to verify there no flops called during it's execution.
151150
* To check investigate asyncErrors more closely you can use {@code expectError} methods or collect the error directly
152151
* from the environment using {@code env.dropAsyncError()}.
153152
*
@@ -167,7 +166,7 @@ public void flop(Throwable thr) {
167166
* This method DOES fail the test right away (it tries to, by throwing an AssertionException),
168167
* in such it is different from {@link org.reactivestreams.tck.TestEnvironment#flop} which only records the error.
169168
*
170-
* Use {@code env.verifyNoAsyncErrors()} at the end of your TCK tests to verify there no flops called during it's execution.
169+
* Use {@code env.verifyNoAsyncErrorsNoDelay()} at the end of your TCK tests to verify there no flops called during it's execution.
171170
* To check investigate asyncErrors more closely you can use {@code expectError} methods or collect the error directly
172171
* from the environment using {@code env.dropAsyncError()}.
173172
*
@@ -192,7 +191,7 @@ public <T> void subscribe(Publisher<T> pub, TestSubscriber<T> sub) throws Interr
192191
public <T> void subscribe(Publisher<T> pub, TestSubscriber<T> sub, long timeoutMillis) throws InterruptedException {
193192
pub.subscribe(sub);
194193
sub.subscription.expectCompletion(timeoutMillis, String.format("Could not subscribe %s to Publisher %s", sub, pub));
195-
verifyNoAsyncErrors();
194+
verifyNoAsyncErrorsNoDelay();
196195
}
197196

198197
public <T> ManualSubscriber<T> newBlackholeSubscriber(Publisher<T> pub) throws InterruptedException {
@@ -223,16 +222,39 @@ public Throwable dropAsyncError() {
223222
}
224223
}
225224

225+
/**
226+
* Waits for {@link TestEnvironment#defaultTimeoutMillis()} and then verifies that no asynchronous errors
227+
* were signalled pior to, or during that time (by calling {@code flop()}).
228+
*/
229+
public void verifyNoAsyncErrors() {
230+
verifyNoAsyncErrors(defaultTimeoutMillis());
231+
}
232+
233+
/**
234+
* This version of {@code verifyNoAsyncErrors} should be used when errors still could be signalled
235+
* asynchronously during {@link TestEnvironment#defaultTimeoutMillis()} time.
236+
* <p></p>
237+
* It will immediatly check if any async errors were signaled (using {@link TestEnvironment#flop(String)},
238+
* and if no errors encountered wait for another default timeout as the errors may yet be signalled.
239+
* The initial check is performed in order to fail-fast in case of an already failed test.
240+
*/
226241
public void verifyNoAsyncErrors(long delay) {
227242
try {
243+
verifyNoAsyncErrorsNoDelay();
244+
228245
Thread.sleep(delay);
229-
verifyNoAsyncErrors();
246+
verifyNoAsyncErrorsNoDelay();
230247
} catch (InterruptedException e) {
231248
throw new RuntimeException(e);
232249
}
233250
}
234251

235-
public void verifyNoAsyncErrors() {
252+
/**
253+
* Verifies that no asynchronous errors were signalled pior to calling this method (by calling {@code flop()}).
254+
* This version of verifyNoAsyncError <b>does not wait before checking for asynchronous errors</b>, and is to be used
255+
* for example in tight loops etc.
256+
*/
257+
public void verifyNoAsyncErrorsNoDelay() {
236258
for (Throwable e : asyncErrors) {
237259
if (e instanceof AssertionError) {
238260
throw (AssertionError) e;
@@ -267,6 +289,9 @@ public Optional<StackTraceElement> findCallerMethodInStackTrace(String method) {
267289

268290
// ---- classes ----
269291

292+
/**
293+
* {@link Subscriber} implementation which can be steered by test code and asserted on.
294+
*/
270295
public static class ManualSubscriber<T> extends TestSubscriber<T> {
271296
Receptacle<T> received;
272297

0 commit comments

Comments
 (0)