Skip to content

Ensure TCK passes with empty publisher that synchronously completes #423

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 19 additions & 12 deletions tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
Expand Down Expand Up @@ -191,7 +193,7 @@ public void flop(Throwable thr, String msg) {
asyncErrors.add(thr);
}
}

/**
* To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously.
* This method does *NOT* fail the test - it's up to inspections of the error to fail the test if required.
Expand Down Expand Up @@ -801,7 +803,7 @@ public void expectCancelling() throws InterruptedException {
public void expectCancelling(long timeoutMillis) throws InterruptedException {
cancelled.expectClose(timeoutMillis, "Did not receive expected cancelling of upstream subscription");
}

public boolean isCancelled() throws InterruptedException {
return cancelled.isClosed();
}
Expand Down Expand Up @@ -882,34 +884,41 @@ public Promise(TestEnvironment env) {
}

private ArrayBlockingQueue<T> abq = new ArrayBlockingQueue<T>(1);
private volatile T _value = null;
private AtomicReference<T> _value = new AtomicReference<T>();

public T value() {
if (isCompleted()) {
return _value;
final T value = _value.get();
if (value != null) {
return value;
} else {
env.flop("Cannot access promise value before completion");
return null;
}
}

public boolean isCompleted() {
return _value != null;
return _value.get() != null;
}

/**
* Allows using expectCompletion to await for completion of the value and complete it _then_
*/
public void complete(T value) {
abq.add(value);
if (_value.compareAndSet(null, value)) {
// we add the value to the queue such to wake up any expectCompletion which was triggered before complete() was called
abq.add(value);
} else {
env.flop(String.format("Cannot complete a promise more than once! Present value: %s, attempted to set: %s", _value.get(), value));
}
}

/**
* Completes the promise right away, it is not possible to expectCompletion on a Promise completed this way
* Same as complete.
*
* Keeping this method for binary compatibility.
*/
public void completeImmediatly(T value) {
complete(value); // complete!
_value = value; // immediatly!
complete(value);
}

public void expectCompletion(long timeoutMillis, String errorMsg) throws InterruptedException {
Expand All @@ -918,8 +927,6 @@ public void expectCompletion(long timeoutMillis, String errorMsg) throws Interru

if (val == null) {
env.flop(String.format("%s within %d ms", errorMsg, timeoutMillis));
} else {
_value = val;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,39 @@ public void optional_spec105_emptyStreamMustTerminateBySignallingOnComplete_shou
}, "Subscriber::onComplete() called before Subscriber::onSubscribe");
}

@Test
public void optional_spec105_emptyStreamMustTerminateBySignallingOnComplete_shouldPass_whenOnCompleteSynchronouslyInovked() throws Throwable {
final Publisher<Integer> synchronousOnCompletePublisher = new Publisher<Integer>() {
@Override public void subscribe(final Subscriber<? super Integer> s) {
s.onSubscribe(new Subscription() {
@Override public void request(long n) { }
@Override public void cancel() { }
});
s.onComplete();
}
};

requireOptionalTestPass(new ThrowingRunnable() {
@Override public void run() throws Throwable {
PublisherVerification<Integer> verification = new PublisherVerification<Integer>(newTestEnvironment()) {
@Override public Publisher<Integer> createPublisher(long elements) {
return synchronousOnCompletePublisher;
}

@Override public long maxElementsFromPublisher() {
return 0; // it is an "empty" Publisher
}

@Override public Publisher<Integer> createFailedPublisher() {
return null;
}
};

verification.optional_spec105_emptyStreamMustTerminateBySignallingOnComplete();
}
});
}

@Test
public void required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled_shouldFailForNotCompletingPublisher() throws Throwable {
requireTestFailure(new ThrowingRunnable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,18 @@ public void requireTestSkip(ThrowingRunnable run, String msgPart) {
throw new RuntimeException("Expected TCK to SKIP this test, instead if PASSed!");
}

public void requireOptionalTestPass(ThrowingRunnable run) {
try {
run.run();
} catch (SkipException skip) {
throw new RuntimeException("Expected TCK to PASS this test, instead it was SKIPPED", skip.getCause());
} catch (Throwable throwable) {
throw new RuntimeException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Repackage Errors as Exceptions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just being consistent with requireTestFailure and requireTestSkip.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does not really matter much as those will end up failing the test in the suite and one may always look into the logs. It's not like it's a production app where we have to watch out to not accidentally continue after an OOM or something... Fine as is I think

String.format("Expected TCK to PASS this test, yet it threw %s(%s) instead!",
throwable.getClass().getName(), throwable.getMessage()), throwable);
}
}

/**
* This publisher does NOT fulfil all Publisher spec requirements.
* It's just the bare minimum to enable this test to fail the Subscriber tests.
Expand Down