|
33 | 33 | */
|
34 | 34 | public abstract class PublisherVerification<T> implements PublisherVerificationRules {
|
35 | 35 |
|
| 36 | + private static final String PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS_ENV = "PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS"; |
| 37 | + private static final long DEFAULT_PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS = 300L; |
| 38 | + |
36 | 39 | private final TestEnvironment env;
|
37 | 40 | private final long publisherReferenceGCTimeoutMillis;
|
38 | 41 |
|
| 42 | + /** |
| 43 | + * Constructs a new verification class using the given env and configuration. |
| 44 | + * |
| 45 | + * @param publisherReferenceGCTimeoutMillis used to determine after how much time a reference to a Subscriber should be already dropped by the Publisher. |
| 46 | + */ |
39 | 47 | public PublisherVerification(TestEnvironment env, long publisherReferenceGCTimeoutMillis) {
|
40 | 48 | this.env = env;
|
41 | 49 | this.publisherReferenceGCTimeoutMillis = publisherReferenceGCTimeoutMillis;
|
42 | 50 | }
|
43 | 51 |
|
| 52 | + /** |
| 53 | + * Constructs a new verification class using the given env and configuration. |
| 54 | + * |
| 55 | + * The value for {@code publisherReferenceGCTimeoutMillis} will be obtained by using {@link PublisherVerification#envPublisherReferenceGCTimeoutMillis()}. |
| 56 | + */ |
| 57 | + public PublisherVerification(TestEnvironment env) { |
| 58 | + this.env = env; |
| 59 | + this.publisherReferenceGCTimeoutMillis = envPublisherReferenceGCTimeoutMillis(); |
| 60 | + } |
| 61 | + |
| 62 | + /** |
| 63 | + * Tries to parse the env variable {@code PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS} as long and returns the value if present, |
| 64 | + * OR its default value ({@link PublisherVerification#DEFAULT_PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS}). |
| 65 | + * |
| 66 | + * This value is used to determine after how much time a reference to a Subscriber should be already dropped by the Publisher. |
| 67 | + * |
| 68 | + * @throws java.lang.IllegalArgumentException when unable to parse the env variable |
| 69 | + */ |
| 70 | + public static long envPublisherReferenceGCTimeoutMillis() { |
| 71 | + final String envMillis = System.getenv(PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS_ENV); |
| 72 | + if (envMillis == null) return DEFAULT_PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS; |
| 73 | + else try { |
| 74 | + return Long.parseLong(envMillis); |
| 75 | + } catch(NumberFormatException ex) { |
| 76 | + throw new IllegalArgumentException(String.format("Unable to parse %s env value [%s] as long!", PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS_ENV, envMillis), ex); |
| 77 | + } |
| 78 | + } |
| 79 | + |
44 | 80 | /**
|
45 | 81 | * This is the main method you must implement in your test incarnation.
|
46 | 82 | * It must create a Publisher for a stream with exactly the given number of elements.
|
@@ -88,6 +124,14 @@ public long boundedDepthOfOnNextAndRequestRecursion() {
|
88 | 124 | return 1;
|
89 | 125 | }
|
90 | 126 |
|
| 127 | + /** |
| 128 | + * The amount of time after which a cancelled Subscriber reference should be dropped. |
| 129 | + * See Rule 3.13 for details. |
| 130 | + */ |
| 131 | + final public long publisherReferenceGCTimeoutMillis() { |
| 132 | + return publisherReferenceGCTimeoutMillis; |
| 133 | + } |
| 134 | + |
91 | 135 | ////////////////////// TEST ENV CLEANUP /////////////////////////////////////
|
92 | 136 |
|
93 | 137 | @BeforeMethod
|
@@ -298,20 +342,20 @@ public void onComplete() {
|
298 | 342 | public void optional_spec104_mustSignalOnErrorWhenFails() throws Throwable {
|
299 | 343 | try {
|
300 | 344 | whenHasErrorPublisherTest(new PublisherTestRun<T>() {
|
301 |
| - @Override |
302 |
| - public void run(final Publisher<T> pub) throws InterruptedException { |
303 |
| - final Latch latch = new Latch(env); |
304 |
| - pub.subscribe(new TestEnvironment.TestSubscriber<T>(env) { |
305 |
| - @Override |
306 |
| - public void onError(Throwable cause) { |
307 |
| - latch.assertOpen(String.format("Error-state Publisher %s called `onError` twice on new Subscriber", pub)); |
308 |
| - latch.close(); |
309 |
| - } |
310 |
| - }); |
| 345 | + @Override |
| 346 | + public void run(final Publisher<T> pub) throws InterruptedException { |
| 347 | + final Latch latch = new Latch(env); |
| 348 | + pub.subscribe(new TestEnvironment.TestSubscriber<T>(env) { |
| 349 | + @Override |
| 350 | + public void onError(Throwable cause) { |
| 351 | + latch.assertOpen(String.format("Error-state Publisher %s called `onError` twice on new Subscriber", pub)); |
| 352 | + latch.close(); |
| 353 | + } |
| 354 | + }); |
311 | 355 |
|
312 |
| - latch.expectClose(String.format("Error-state Publisher %s did not call `onError` on new Subscriber", pub)); |
| 356 | + latch.expectClose(String.format("Error-state Publisher %s did not call `onError` on new Subscriber", pub)); |
313 | 357 |
|
314 |
| - env.verifyNoAsyncErrors(env.defaultTimeoutMillis()); |
| 358 | + env.verifyNoAsyncErrors(env.defaultTimeoutMillis()); |
315 | 359 | }
|
316 | 360 | });
|
317 | 361 | } catch (SkipException se) {
|
@@ -404,25 +448,25 @@ public void run(Publisher<T> pub) throws Throwable {
|
404 | 448 | @Override @Test
|
405 | 449 | public void required_spec112_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe() throws Throwable {
|
406 | 450 | whenHasErrorPublisherTest(new PublisherTestRun<T>() {
|
407 |
| - @Override |
408 |
| - public void run(Publisher<T> pub) throws Throwable { |
409 |
| - final Latch onErrorLatch = new Latch(env); |
410 |
| - ManualSubscriberWithSubscriptionSupport<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(env) { |
411 |
| - @Override |
412 |
| - public void onError(Throwable cause) { |
413 |
| - onErrorLatch.assertOpen("Only one onError call expected"); |
414 |
| - onErrorLatch.close(); |
415 |
| - } |
| 451 | + @Override |
| 452 | + public void run(Publisher<T> pub) throws Throwable { |
| 453 | + final Latch onErrorLatch = new Latch(env); |
| 454 | + ManualSubscriberWithSubscriptionSupport<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(env) { |
| 455 | + @Override |
| 456 | + public void onError(Throwable cause) { |
| 457 | + onErrorLatch.assertOpen("Only one onError call expected"); |
| 458 | + onErrorLatch.close(); |
| 459 | + } |
416 | 460 |
|
417 |
| - @Override |
418 |
| - public void onSubscribe(Subscription subs) { |
419 |
| - env.flop("onSubscribe should not be called if Publisher is unable to subscribe a Subscriber"); |
420 |
| - } |
421 |
| - }; |
422 |
| - pub.subscribe(sub); |
423 |
| - onErrorLatch.assertClosed("Should have received onError"); |
| 461 | + @Override |
| 462 | + public void onSubscribe(Subscription subs) { |
| 463 | + env.flop("onSubscribe should not be called if Publisher is unable to subscribe a Subscriber"); |
| 464 | + } |
| 465 | + }; |
| 466 | + pub.subscribe(sub); |
| 467 | + onErrorLatch.assertClosed("Should have received onError"); |
424 | 468 |
|
425 |
| - env.verifyNoAsyncErrors(); |
| 469 | + env.verifyNoAsyncErrors(); |
426 | 470 | }
|
427 | 471 | });
|
428 | 472 | }
|
|
0 commit comments