Skip to content

Commit d7a2e1d

Browse files
committed
+TCK verifyNoAsyncErrors now by default waits, fixes spec111
Resolves reactive-streams#239
1 parent bca27f5 commit d7a2e1d

5 files changed

+69
-44
lines changed

tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -408,7 +408,7 @@ public TestSetup apply(Long aLong) throws Throwable {
408408
sub1.expectError(ex);
409409
sub2.expectError(ex);
410410

411-
env.verifyNoAsyncErrors();
411+
env.verifyNoAsyncErrorsNoDelay();
412412
}};
413413
}
414414
});
@@ -480,7 +480,7 @@ public void mustImmediatelyPassOnOnErrorEventsReceivedFromItsUpstreamToItsDownst
480480
sendError(ex);
481481
sub.expectError(ex); // "immediately", i.e. without a preceding request
482482

483-
env.verifyNoAsyncErrors();
483+
env.verifyNoAsyncErrorsNoDelay();
484484
}};
485485
}
486486

@@ -668,7 +668,7 @@ public TestSetup apply(Long subscribers) throws Throwable {
668668
sub1.expectCompletion(env.defaultTimeoutMillis());
669669
sub2.expectCompletion(env.defaultTimeoutMillis());
670670

671-
env.verifyNoAsyncErrors();
671+
env.verifyNoAsyncErrorsNoDelay();
672672
}};
673673
}
674674
});

tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java

+13-12
Original file line numberDiff line numberDiff line change
@@ -460,9 +460,10 @@ public void run(Publisher<T> pub) throws Throwable {
460460
try {
461461
pub.subscribe(null);
462462
env.flop("Publisher did not throw a NullPointerException when given a null Subscribe in subscribe");
463-
} catch (NullPointerException npe) {
463+
} catch (NullPointerException ignored) {
464+
// valid behaviour
464465
}
465-
env.verifyNoAsyncErrors();
466+
env.verifyNoAsyncErrorsNoDelay();
466467
}
467468
});
468469
}
@@ -497,7 +498,7 @@ public void onComplete() {
497498
}
498499
});
499500
onSubscribeLatch.expectClose("Should have received onSubscribe");
500-
env.verifyNoAsyncErrors();
501+
env.verifyNoAsyncErrorsNoDelay();
501502
}
502503
});
503504
}
@@ -528,7 +529,7 @@ public void onSubscribe(Subscription subs) {
528529
onSubscribeLatch.expectClose("Should have received onSubscribe");
529530
onErrorLatch.expectClose("Should have received onError");
530531

531-
env.verifyNoAsyncErrors();
532+
env.verifyNoAsyncErrorsNoDelay();
532533
}
533534
});
534535
}
@@ -777,7 +778,7 @@ public void onError(Throwable cause) {
777778
"awaited at-most %s signals (`maxOnNextSignalsInRecursionTest()`) or completion",
778779
oneMoreThanBoundedLimit);
779780
runCompleted.expectClose(env.defaultTimeoutMillis(), msg);
780-
env.verifyNoAsyncErrors();
781+
env.verifyNoAsyncErrorsNoDelay();
781782
} finally {
782783
// since the request/onNext recursive calls may keep the publisher running "forever",
783784
// we MUST cancel it manually before exiting this test case
@@ -828,7 +829,7 @@ public void cancel() {
828829
sub.request(1);
829830

830831
sub.expectNone();
831-
env.verifyNoAsyncErrors();
832+
env.verifyNoAsyncErrorsNoDelay();
832833
}
833834
});
834835
}
@@ -849,7 +850,7 @@ public void run(Publisher<T> pub) throws Throwable {
849850
subs.cancel();
850851

851852
sub.expectNone();
852-
env.verifyNoAsyncErrors();
853+
env.verifyNoAsyncErrorsNoDelay();
853854
}
854855
});
855856
}
@@ -940,7 +941,7 @@ > AsyncPublisher receives cancel() - handles it right away, by "stopping itself"
940941
}
941942
});
942943

943-
env.verifyNoAsyncErrors();
944+
env.verifyNoAsyncErrorsNoDelay();
944945
}
945946

946947
// Verifies rule: https://github.com/reactive-streams/reactive-streams#3.13
@@ -976,7 +977,7 @@ public void run(Publisher<T> pub) throws Throwable {
976977
env.flop(String.format("Publisher %s did not drop reference to test subscriber after subscription cancellation", pub));
977978
}
978979

979-
env.verifyNoAsyncErrors();
980+
env.verifyNoAsyncErrorsNoDelay();
980981
}
981982
});
982983
}
@@ -995,7 +996,7 @@ public void run(Publisher<T> pub) throws Throwable {
995996
sub.nextElements(totalElements);
996997
sub.expectCompletion();
997998

998-
env.verifyNoAsyncErrors();
999+
env.verifyNoAsyncErrorsNoDelay();
9991000
}
10001001
});
10011002
}
@@ -1017,7 +1018,7 @@ public void run(Publisher<T> pub) throws Throwable {
10171018
sub.expectCompletion();
10181019

10191020
try {
1020-
env.verifyNoAsyncErrors();
1021+
env.verifyNoAsyncErrorsNoDelay();
10211022
} finally {
10221023
sub.cancel();
10231024
}
@@ -1090,7 +1091,7 @@ public void activePublisherTest(long elements, boolean completionSignalRequired,
10901091
} else {
10911092
Publisher<T> pub = createPublisher(elements);
10921093
body.run(pub);
1093-
env.verifyNoAsyncErrors();
1094+
env.verifyNoAsyncErrorsNoDelay();
10941095
}
10951096
}
10961097

tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java

+8-8
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ public void cancel() {
121121
sub.onSubscribe(subs);
122122
sub.onComplete();
123123

124-
env.verifyNoAsyncErrors();
124+
env.verifyNoAsyncErrorsNoDelay();
125125
}
126126
});
127127
}
@@ -160,7 +160,7 @@ public void cancel() {
160160
sub.onSubscribe(subs);
161161
sub.onError(new TestException());
162162

163-
env.verifyNoAsyncErrors();
163+
env.verifyNoAsyncErrorsNoDelay();
164164
}
165165
});
166166
}
@@ -196,7 +196,7 @@ public String toString() {
196196
});
197197

198198
secondSubscriptionCancelled.expectClose("Expected SecondSubscription given to subscriber to be cancelled, but `Subscription.cancel()` was not called.");
199-
env.verifyNoAsyncErrors();
199+
env.verifyNoAsyncErrorsNoDelay();
200200
}};
201201
}
202202

@@ -251,7 +251,7 @@ public void run(BlackboxTestStage stage) throws Throwable {
251251
probe.expectCompletion();
252252
probe.expectNone();
253253

254-
env.verifyNoAsyncErrors();
254+
env.verifyNoAsyncErrorsNoDelay();
255255
}
256256
});
257257
}
@@ -275,7 +275,7 @@ public void subscribe(Subscriber<? super T> s) {
275275
pub.subscribe(probe);
276276
probe.expectCompletion();
277277

278-
env.verifyNoAsyncErrors();
278+
env.verifyNoAsyncErrorsNoDelay();
279279
}
280280
});
281281
}
@@ -329,7 +329,7 @@ public void run(BlackboxTestStage stage) throws Throwable {
329329
assertTrue(gotNPE, "onSubscribe(null) did not throw NullPointerException");
330330
}
331331

332-
env.verifyNoAsyncErrors();
332+
env.verifyNoAsyncErrorsNoDelay();
333333
}
334334
});
335335
}
@@ -357,7 +357,7 @@ public void run(BlackboxTestStage stage) throws Throwable {
357357
assertTrue(gotNPE, "onNext(null) did not throw NullPointerException");
358358
}
359359

360-
env.verifyNoAsyncErrors();
360+
env.verifyNoAsyncErrorsNoDelay();
361361
}
362362
});
363363
}
@@ -385,7 +385,7 @@ public void run(BlackboxTestStage stage) throws Throwable {
385385
assertTrue(gotNPE, "onError(null) did not throw NullPointerException");
386386
}
387387

388-
env.verifyNoAsyncErrors();
388+
env.verifyNoAsyncErrorsNoDelay();
389389
}
390390
});
391391
}

tck/src/main/java/org/reactivestreams/tck/SubscriberWhiteboxVerification.java

+12-13
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ public void cancel() {
149149
sub.onSubscribe(subs);
150150
sub.onComplete();
151151

152-
env.verifyNoAsyncErrors();
152+
env.verifyNoAsyncErrorsNoDelay();
153153
}
154154
});
155155
}
@@ -190,7 +190,7 @@ public void cancel() {
190190
sub.onSubscribe(subs);
191191
sub.onError(new TestException());
192192

193-
env.verifyNoAsyncErrors();
193+
env.verifyNoAsyncErrorsNoDelay();
194194
}
195195
});
196196
}
@@ -307,7 +307,7 @@ public void run(WhiteboxTestStage stage) throws InterruptedException {
307307
stage.sendError(ex);
308308
stage.probe.expectError(ex);
309309

310-
env.verifyNoAsyncErrors();
310+
env.verifyNoAsyncErrorsNoDelay();
311311
}
312312
});
313313
}
@@ -322,7 +322,7 @@ public void run(WhiteboxTestStage stage) throws InterruptedException {
322322
stage.sendError(ex);
323323
stage.probe.expectError(ex);
324324

325-
env.verifyNoAsyncErrors();
325+
env.verifyNoAsyncErrorsNoDelay();
326326
}
327327
});
328328
}
@@ -358,10 +358,10 @@ public void run(WhiteboxTestStage stage) throws Throwable {
358358
sub.onSubscribe(null);
359359
} catch (final NullPointerException expected) {
360360
gotNPE = true;
361-
}
362-
assertTrue(gotNPE, "onSubscribe(null) did not throw NullPointerException");
361+
}
363362

364-
env.verifyNoAsyncErrors(env.defaultTimeoutMillis());
363+
assertTrue(gotNPE, "onSubscribe(null) did not throw NullPointerException");
364+
env.verifyNoAsyncErrorsNoDelay();
365365
}
366366
});
367367
}
@@ -379,11 +379,10 @@ public void run(WhiteboxTestStage stage) throws Throwable {
379379
sub.onNext(null);
380380
} catch (final NullPointerException expected) {
381381
gotNPE = true;
382-
} finally {
383-
assertTrue(gotNPE, "onNext(null) did not throw NullPointerException");
384382
}
385383

386-
env.verifyNoAsyncErrors(env.defaultTimeoutMillis());
384+
assertTrue(gotNPE, "onNext(null) did not throw NullPointerException");
385+
env.verifyNoAsyncErrorsNoDelay();
387386
}
388387
});
389388
}
@@ -405,7 +404,7 @@ public void run(WhiteboxTestStage stage) throws Throwable {
405404
assertTrue(gotNPE, "onError(null) did not throw NullPointerException");
406405
}
407406

408-
env.verifyNoAsyncErrors(env.defaultTimeoutMillis());
407+
env.verifyNoAsyncErrorsNoDelay();
409408
}
410409
});
411410
}
@@ -566,7 +565,7 @@ public T nextT() throws InterruptedException {
566565
}
567566

568567
public void verifyNoAsyncErrors() {
569-
env.verifyNoAsyncErrors();
568+
env.verifyNoAsyncErrorsNoDelay();
570569
}
571570
}
572571

@@ -737,7 +736,7 @@ private SubscriberPuppet puppet() {
737736
public void registerOnSubscribe(SubscriberPuppet p) {
738737
if (!puppet.isCompleted()) {
739738
puppet.complete(p);
740-
}
739+
}
741740
}
742741

743742
}

tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java

+33-8
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import org.reactivestreams.tck.support.SubscriberBufferOverflowException;
77
import org.reactivestreams.tck.support.Optional;
88

9-
import java.text.NumberFormat;
109
import java.util.LinkedList;
1110
import java.util.List;
1211
import java.util.concurrent.ArrayBlockingQueue;
@@ -107,7 +106,7 @@ public static long envDefaultTimeoutMillis() {
107106
* To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously.
108107
* This method does *NOT* fail the test - it's up to inspections of the error to fail the test if required.
109108
*
110-
* Use {@code env.verifyNoAsyncErrors()} at the end of your TCK tests to verify there no flops called during it's execution.
109+
* Use {@code env.verifyNoAsyncErrorsNoDelay()} at the end of your TCK tests to verify there no flops called during it's execution.
111110
* To check investigate asyncErrors more closely you can use {@code expectError} methods or collect the error directly
112111
* from the environment using {@code env.dropAsyncError()}.
113112
*
@@ -127,7 +126,7 @@ public void flop(String msg) {
127126
*
128127
* This overload keeps the passed in throwable as the asyncError, instead of creating an AssertionError for this.
129128
*
130-
* Use {@code env.verifyNoAsyncErrors()} at the end of your TCK tests to verify there no flops called during it's execution.
129+
* Use {@code env.verifyNoAsyncErrorsNoDelay()} at the end of your TCK tests to verify there no flops called during it's execution.
131130
* To check investigate asyncErrors more closely you can use {@code expectError} methods or collect the error directly
132131
* from the environment using {@code env.dropAsyncError()}.
133132
*
@@ -147,7 +146,7 @@ public void flop(Throwable thr, String msg) {
147146
*
148147
* This overload keeps the passed in throwable as the asyncError, instead of creating an AssertionError for this.
149148
*
150-
* Use {@code env.verifyNoAsyncErrors()} at the end of your TCK tests to verify there no flops called during it's execution.
149+
* Use {@code env.verifyNoAsyncErrorsNoDelay()} at the end of your TCK tests to verify there no flops called during it's execution.
151150
* To check investigate asyncErrors more closely you can use {@code expectError} methods or collect the error directly
152151
* from the environment using {@code env.dropAsyncError()}.
153152
*
@@ -167,7 +166,7 @@ public void flop(Throwable thr) {
167166
* This method DOES fail the test right away (it tries to, by throwing an AssertionException),
168167
* in such it is different from {@link org.reactivestreams.tck.TestEnvironment#flop} which only records the error.
169168
*
170-
* Use {@code env.verifyNoAsyncErrors()} at the end of your TCK tests to verify there no flops called during it's execution.
169+
* Use {@code env.verifyNoAsyncErrorsNoDelay()} at the end of your TCK tests to verify there no flops called during it's execution.
171170
* To check investigate asyncErrors more closely you can use {@code expectError} methods or collect the error directly
172171
* from the environment using {@code env.dropAsyncError()}.
173172
*
@@ -192,7 +191,7 @@ public <T> void subscribe(Publisher<T> pub, TestSubscriber<T> sub) throws Interr
192191
public <T> void subscribe(Publisher<T> pub, TestSubscriber<T> sub, long timeoutMillis) throws InterruptedException {
193192
pub.subscribe(sub);
194193
sub.subscription.expectCompletion(timeoutMillis, String.format("Could not subscribe %s to Publisher %s", sub, pub));
195-
verifyNoAsyncErrors();
194+
verifyNoAsyncErrorsNoDelay();
196195
}
197196

198197
public <T> ManualSubscriber<T> newBlackholeSubscriber(Publisher<T> pub) throws InterruptedException {
@@ -223,16 +222,39 @@ public Throwable dropAsyncError() {
223222
}
224223
}
225224

225+
/**
226+
* Waits for {@link TestEnvironment#defaultTimeoutMillis()} and then verifies that no asynchronous errors
227+
* were signalled pior to, or during that time (by calling {@code flop()}).
228+
*/
229+
public void verifyNoAsyncErrors() {
230+
verifyNoAsyncErrors(defaultTimeoutMillis());
231+
}
232+
233+
/**
234+
* This version of {@code verifyNoAsyncErrors} should be used when errors still could be signalled
235+
* asynchronously during {@link TestEnvironment#defaultTimeoutMillis()} time.
236+
* <p></p>
237+
* It will immediatly check if any async errors were signaled (using {@link TestEnvironment#flop(String)},
238+
* and if no errors encountered wait for another default timeout as the errors may yet be signalled.
239+
* The initial check is performed in order to fail-fast in case of an already failed test.
240+
*/
226241
public void verifyNoAsyncErrors(long delay) {
227242
try {
243+
verifyNoAsyncErrorsNoDelay();
244+
228245
Thread.sleep(delay);
229-
verifyNoAsyncErrors();
246+
verifyNoAsyncErrorsNoDelay();
230247
} catch (InterruptedException e) {
231248
throw new RuntimeException(e);
232249
}
233250
}
234251

235-
public void verifyNoAsyncErrors() {
252+
/**
253+
* Verifies that no asynchronous errors were signalled pior to calling this method (by calling {@code flop()}).
254+
* This version of verifyNoAsyncError <b>does not wait before checking for asynchronous errors</b>, and is to be used
255+
* for example in tight loops etc.
256+
*/
257+
public void verifyNoAsyncErrorsNoDelay() {
236258
for (Throwable e : asyncErrors) {
237259
if (e instanceof AssertionError) {
238260
throw (AssertionError) e;
@@ -267,6 +289,9 @@ public Optional<StackTraceElement> findCallerMethodInStackTrace(String method) {
267289

268290
// ---- classes ----
269291

292+
/**
293+
* {@link Subscriber} implementation which can be steered by test code and asserted on.
294+
*/
270295
public static class ManualSubscriber<T> extends TestSubscriber<T> {
271296
Receptacle<T> received;
272297

0 commit comments

Comments
 (0)