Skip to content

TCK: Receptacle#expectError timeout approach #452

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 6, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions tck-flow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -209,32 +209,39 @@ within the TCK which await for something to happen. The other timeout is `publis
[Rule 3.13](https://github.com/reactive-streams/reactive-streams-jvm#3.13) which defines that `Subscriber` references MUST be dropped
by the Publisher.

Note that the TCK differenciates between timeouts for "waiting for a signal" (``defaultTimeoutMillis``),
and "asserting no signals happen during a given amount of time" (``envDefaultNoSignalsTimeoutMillis``).
While the latter defaults to the prior, it may be useful to tweak them independently when running on continious
integration servers (for example, keeping the no-signals timeout significantly lower).

In order to configure these timeouts (for example when running on a slow continious integtation machine), you can either:
Note that the TCK differentiates between timeouts for "waiting for a signal"
(`defaultTimeoutMillis`), and "asserting no signals happen during a given amount of time"
(`defaultNoSignalsTimeoutMillis`). While the latter defaults to the prior, it may be useful to tweak
them independently when running on continuous integration servers (for example, keeping the
no-signals timeout significantly lower). Another configuration option is the "poll timeout" which is
used whenever an operation has to poll for a `defaultTimeoutMillis` for a signal to appear (most
often errors), it can then poll and check using the `defaultPollTimeoutMillis`, for the expected
error, rather than blocking for the full default timeout.

In order to configure these timeouts (for example when running on a slow continuous integration
machine), you can either:

**Use env variables** to set these timeouts, in which case the you can do:

```bash
export DEFAULT_TIMEOUT_MILLIS=100
export DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS=100
export DEFAULT_POLL_TIMEOUT_MILLIS=20
export PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS=300
```

Or **define the timeouts explicitly in code**:

```java
public class RangePublisherTest extends FlowPublisherVerification<Integer> {
public class RangePublisherTest extends PublisherVerification<Integer> {

public static final long DEFAULT_TIMEOUT_MILLIS = 100L;
public static final long DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS = DEFAULT_TIMEOUT_MILLIS;
public static final long PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS = 500L;
public static final long DEFAULT_POLL_TIMEOUT_MILLIS = 20;
public static final long PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS = 300L;

public RangePublisherTest() {
super(new TestEnvironment(DEFAULT_TIMEOUT_MILLIS, DEFAULT_TIMEOUT_MILLIS), PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS);
super(new TestEnvironment(DEFAULT_TIMEOUT_MILLIS, DEFAULT_TIMEOUT_MILLIS, DEFAULT_POLL_TIMEOUT_MILLIS), PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS);
}

// ...
Expand Down
23 changes: 15 additions & 8 deletions tck/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -209,18 +209,24 @@ within the TCK which await for something to happen. The other timeout is `publis
[Rule 3.13](https://github.com/reactive-streams/reactive-streams-jvm#3.13) which defines that `Subscriber` references MUST be dropped
by the Publisher.

Note that the TCK differenciates between timeouts for "waiting for a signal" (``defaultTimeoutMillis``),
and "asserting no signals happen during a given amount of time" (``envDefaultNoSignalsTimeoutMillis``).
While the latter defaults to the prior, it may be useful to tweak them independently when running on continious
integration servers (for example, keeping the no-signals timeout significantly lower).

In order to configure these timeouts (for example when running on a slow continious integtation machine), you can either:
Note that the TCK differentiates between timeouts for "waiting for a signal"
(`defaultTimeoutMillis`), and "asserting no signals happen during a given amount of time"
(`defaultNoSignalsTimeoutMillis`). While the latter defaults to the prior, it may be useful to tweak
them independently when running on continuous integration servers (for example, keeping the
no-signals timeout significantly lower). Another configuration option is the "poll timeout" which is
used whenever an operation has to poll for a `defaultTimeoutMillis` for a signal to appear (most
often errors), it can then poll and check using the `defaultPollTimeoutMillis`, for the expected
error, rather than blocking for the full default timeout.

In order to configure these timeouts (for example when running on a slow continuous integration
machine), you can either:

**Use env variables** to set these timeouts, in which case the you can do:

```bash
export DEFAULT_TIMEOUT_MILLIS=100
export DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS=100
export DEFAULT_POLL_TIMEOUT_MILLIS=20
export PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS=300
```

Expand All @@ -231,10 +237,11 @@ public class RangePublisherTest extends PublisherVerification<Integer> {

public static final long DEFAULT_TIMEOUT_MILLIS = 100L;
public static final long DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS = DEFAULT_TIMEOUT_MILLIS;
public static final long PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS = 500L;
public static final long DEFAULT_POLL_TIMEOUT_MILLIS = 20;
public static final long PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS = 300L;

public RangePublisherTest() {
super(new TestEnvironment(DEFAULT_TIMEOUT_MILLIS, DEFAULT_TIMEOUT_MILLIS), PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS);
super(new TestEnvironment(DEFAULT_TIMEOUT_MILLIS, DEFAULT_TIMEOUT_MILLIS, DEFAULT_POLL_TIMEOUT_MILLIS), PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS);
}

// ...
Expand Down
151 changes: 123 additions & 28 deletions tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.reactivestreams.tck.flow.support.SubscriberBufferOverflowException;
import org.reactivestreams.tck.flow.support.Optional;
import org.reactivestreams.tck.flow.support.SubscriberBufferOverflowException;

import java.util.Collections;
import java.util.LinkedList;
Expand All @@ -24,9 +24,10 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

Expand All @@ -37,8 +38,10 @@ public class TestEnvironment {
private static final long DEFAULT_TIMEOUT_MILLIS = 100;

private static final String DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS_ENV = "DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS";
private static final String DEFAULT_POLL_TIMEOUT_MILLIS_ENV = "DEFAULT_POLL_TIMEOUT_MILLIS_ENV";

private final long defaultTimeoutMillis;
private final long defaultPollTimeoutMillis;
private final long defaultNoSignalsTimeoutMillis;
private final boolean printlnDebug;

Expand All @@ -51,14 +54,46 @@ public class TestEnvironment {
* run the tests.
* @param defaultTimeoutMillis default timeout to be used in all expect* methods
* @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore
* @param defaultPollTimeoutMillis default amount of time to poll for events if {@code defaultTimeoutMillis} isn't
* preempted by an asynchronous event.
* @param printlnDebug if true, signals such as OnNext / Request / OnComplete etc will be printed to standard output,
*/
public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMillis, boolean printlnDebug) {
public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMillis, long defaultPollTimeoutMillis,
boolean printlnDebug) {
this.defaultTimeoutMillis = defaultTimeoutMillis;
this.defaultPollTimeoutMillis = defaultPollTimeoutMillis;
this.defaultNoSignalsTimeoutMillis = defaultNoSignalsTimeoutMillis;
this.printlnDebug = printlnDebug;
}

/**
* Tests must specify the timeout for expected outcome of asynchronous
* interactions. Longer timeout does not invalidate the correctness of
* the implementation, but can in some cases result in longer time to
* run the tests.
* @param defaultTimeoutMillis default timeout to be used in all expect* methods
* @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore
* @param printlnDebug if true, signals such as OnNext / Request / OnComplete etc will be printed to standard output,
*/
public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMillis, boolean printlnDebug) {
this(defaultTimeoutMillis, defaultNoSignalsTimeoutMillis, defaultTimeoutMillis, printlnDebug);
}

/**
* Tests must specify the timeout for expected outcome of asynchronous
* interactions. Longer timeout does not invalidate the correctness of
* the implementation, but can in some cases result in longer time to
* run the tests.
*
* @param defaultTimeoutMillis default timeout to be used in all expect* methods
* @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore
* @param defaultPollTimeoutMillis default amount of time to poll for events if {@code defaultTimeoutMillis} isn't
* preempted by an asynchronous event.
*/
public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMillis, long defaultPollTimeoutMillis) {
this(defaultTimeoutMillis, defaultNoSignalsTimeoutMillis, defaultPollTimeoutMillis, false);
}

/**
* Tests must specify the timeout for expected outcome of asynchronous
* interactions. Longer timeout does not invalidate the correctness of
Expand All @@ -69,7 +104,7 @@ public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMi
* @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore
*/
public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMillis) {
this(defaultTimeoutMillis, defaultNoSignalsTimeoutMillis, false);
this(defaultTimeoutMillis, defaultTimeoutMillis, defaultNoSignalsTimeoutMillis);
}

/**
Expand All @@ -81,7 +116,7 @@ public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMi
* @param defaultTimeoutMillis default timeout to be used in all expect* methods
*/
public TestEnvironment(long defaultTimeoutMillis) {
this(defaultTimeoutMillis, defaultTimeoutMillis, false);
this(defaultTimeoutMillis, defaultTimeoutMillis, defaultTimeoutMillis);
}

/**
Expand All @@ -97,7 +132,7 @@ public TestEnvironment(long defaultTimeoutMillis) {
* often helpful to pinpoint simple race conditions etc.
*/
public TestEnvironment(boolean printlnDebug) {
this(envDefaultTimeoutMillis(), envDefaultNoSignalsTimeoutMillis(), printlnDebug);
this(envDefaultTimeoutMillis(), envDefaultNoSignalsTimeoutMillis(), envDefaultPollTimeoutMillis(), printlnDebug);
}

/**
Expand Down Expand Up @@ -126,6 +161,14 @@ public long defaultNoSignalsTimeoutMillis() {
return defaultNoSignalsTimeoutMillis;
}

/**
* The default amount of time to poll for events if {@code defaultTimeoutMillis} isn't preempted by an asynchronous
* event.
*/
public long defaultPollTimeoutMillis() {
return defaultPollTimeoutMillis;
}

/**
* Tries to parse the env variable {@code DEFAULT_TIMEOUT_MILLIS} as long and returns the value if present OR its default value.
*
Expand Down Expand Up @@ -156,6 +199,21 @@ public static long envDefaultNoSignalsTimeoutMillis() {
}
}

/**
* Tries to parse the env variable {@code DEFAULT_POLL_TIMEOUT_MILLIS_ENV} as long and returns the value if present OR its default value.
*
* @throws java.lang.IllegalArgumentException when unable to parse the env variable
*/
public static long envDefaultPollTimeoutMillis() {
final String envMillis = System.getenv(DEFAULT_POLL_TIMEOUT_MILLIS_ENV);
if (envMillis == null) return envDefaultTimeoutMillis();
else try {
return Long.parseLong(envMillis);
} catch (NumberFormatException ex) {
throw new IllegalArgumentException(String.format("Unable to parse %s env value [%s] as long!", DEFAULT_POLL_TIMEOUT_MILLIS_ENV, envMillis), ex);
}
}

/**
* To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously.
* This method does *NOT* fail the test - it's up to inspections of the error to fail the test if required.
Expand Down Expand Up @@ -277,7 +335,7 @@ public Throwable dropAsyncError() {
}

/**
* Waits for {@link TestEnvironment#defaultTimeoutMillis()} and then verifies that no asynchronous errors
* Waits for {@link TestEnvironment#defaultNoSignalsTimeoutMillis()} and then verifies that no asynchronous errors
* were signalled pior to, or during that time (by calling {@code flop()}).
*/
public void verifyNoAsyncErrors() {
Expand Down Expand Up @@ -519,42 +577,57 @@ public void expectCompletion(long timeoutMillis, String errorMsg) throws Interru
}

public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, String requiredMessagePart) throws Exception {
expectErrorWithMessage(expected, requiredMessagePart, env.defaultTimeoutMillis());
expectErrorWithMessage(expected, Collections.singletonList(requiredMessagePart), env.defaultTimeoutMillis(), env.defaultPollTimeoutMillis());
}
public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, List<String> requiredMessagePartAlternatives) throws Exception {
expectErrorWithMessage(expected, requiredMessagePartAlternatives, env.defaultTimeoutMillis());
expectErrorWithMessage(expected, requiredMessagePartAlternatives, env.defaultTimeoutMillis(), env.defaultPollTimeoutMillis());
}

@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, String requiredMessagePart, long timeoutMillis) throws Exception {
expectErrorWithMessage(expected, Collections.singletonList(requiredMessagePart), timeoutMillis);
}

public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, List<String> requiredMessagePartAlternatives, long timeoutMillis) throws Exception {
final E err = expectError(expected, timeoutMillis);
expectErrorWithMessage(expected, requiredMessagePartAlternatives, timeoutMillis, timeoutMillis);
}

public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, List<String> requiredMessagePartAlternatives,
long totalTimeoutMillis, long pollTimeoutMillis) throws Exception {
final E err = expectError(expected, totalTimeoutMillis, pollTimeoutMillis);
final String message = err.getMessage();

boolean contains = false;
for (String requiredMessagePart : requiredMessagePartAlternatives)
for (String requiredMessagePart : requiredMessagePartAlternatives)
if (message.contains(requiredMessagePart)) contains = true; // not short-circuting loop, it is expected to
assertTrue(contains,
String.format("Got expected exception [%s] but missing message part [%s], was: %s",
err.getClass(), "anyOf: " + requiredMessagePartAlternatives, err.getMessage()));
String.format("Got expected exception [%s] but missing message part [%s], was: %s",
err.getClass(), "anyOf: " + requiredMessagePartAlternatives, err.getMessage()));
}

public <E extends Throwable> E expectError(Class<E> expected) throws Exception {
return expectError(expected, env.defaultTimeoutMillis());
}

public <E extends Throwable> E expectError(Class<E> expected, long timeoutMillis) throws Exception {
return expectError(expected, timeoutMillis, String.format("Expected onError(%s)", expected.getName()));
return expectError(expected, timeoutMillis, env.defaultPollTimeoutMillis());
}

public <E extends Throwable> E expectError(Class<E> expected, String errorMsg) throws Exception {
return expectError(expected, env.defaultTimeoutMillis(), errorMsg);
}

public <E extends Throwable> E expectError(Class<E> expected, long timeoutMillis, String errorMsg) throws Exception {
return received.expectError(expected, timeoutMillis, errorMsg);
return expectError(expected, timeoutMillis, env.defaultPollTimeoutMillis(), errorMsg);
}

public <E extends Throwable> E expectError(Class<E> expected, long totalTimeoutMillis, long pollTimeoutMillis) throws Exception {
return expectError(expected, totalTimeoutMillis, pollTimeoutMillis, String.format("Expected onError(%s)", expected.getName()));
}

public <E extends Throwable> E expectError(Class<E> expected, long totalTimeoutMillis, long pollTimeoutMillis,
String errorMsg) throws Exception {
return received.expectError(expected, totalTimeoutMillis, pollTimeoutMillis, errorMsg);
}

public void expectNone() throws InterruptedException {
Expand Down Expand Up @@ -1025,22 +1098,44 @@ public void expectCompletion(long timeoutMillis, String errorMsg) throws Interru
} // else, ok
}

@SuppressWarnings("unchecked")
/**
* @deprecated Deprecated in favor of {@link #expectError(Class, long, long, String)}.
*/
@Deprecated
public <E extends Throwable> E expectError(Class<E> clazz, long timeoutMillis, String errorMsg) throws Exception {
Thread.sleep(timeoutMillis);

if (env.asyncErrors.isEmpty()) {
return env.flopAndFail(String.format("%s within %d ms", errorMsg, timeoutMillis));
} else {
// ok, there was an expected error
Throwable thrown = env.asyncErrors.remove(0);
return expectError(clazz, timeoutMillis, timeoutMillis, errorMsg);
}

if (clazz.isInstance(thrown)) {
return (E) thrown;
@SuppressWarnings("unchecked")
final <E extends Throwable> E expectError(Class<E> clazz, long totalTimeoutMillis,
long pollTimeoutMillis,
String errorMsg) throws Exception {
long totalTimeoutRemainingNs = MILLISECONDS.toNanos(totalTimeoutMillis);
long timeStampANs = System.nanoTime();
long timeStampBNs;

for (;;) {
Thread.sleep(Math.min(pollTimeoutMillis, NANOSECONDS.toMillis(totalTimeoutRemainingNs)));

if (env.asyncErrors.isEmpty()) {
timeStampBNs = System.nanoTime();
totalTimeoutRemainingNs =- timeStampBNs - timeStampANs;
timeStampANs = timeStampBNs;

if (totalTimeoutRemainingNs <= 0) {
return env.flopAndFail(String.format("%s within %d ms", errorMsg, totalTimeoutMillis));
}
} else {
// ok, there was an expected error
Throwable thrown = env.asyncErrors.remove(0);

if (clazz.isInstance(thrown)) {
return (E) thrown;
} else {

return env.flopAndFail(String.format("%s within %d ms; Got %s but expected %s",
errorMsg, timeoutMillis, thrown.getClass().getCanonicalName(), clazz.getCanonicalName()));
return env.flopAndFail(String.format("%s within %d ms; Got %s but expected %s",
errorMsg, totalTimeoutMillis, thrown.getClass().getCanonicalName(), clazz.getCanonicalName()));
}
}
}
}
Expand Down