Skip to content

Ensure TCK passes with empty publisher that synchronously completes #423

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
Expand Down Expand Up @@ -570,6 +571,8 @@ public void expectNone(long withinMillis, String errMsgPrefix) throws Interrupte

public static class ManualSubscriberWithSubscriptionSupport<T> extends ManualSubscriber<T> {

private final AtomicBoolean onSubscribeCalled = new AtomicBoolean();

public ManualSubscriberWithSubscriptionSupport(TestEnvironment env) {
super(env);
}
Expand All @@ -587,7 +590,7 @@ public void onNext(T element) {
@Override
public void onComplete() {
env.debug(this + "::onComplete()");
if (subscription.isCompleted()) {
if (onSubscribeCalled.get()) {
super.onComplete();
} else {
env.flop("Subscriber::onComplete() called before Subscriber::onSubscribe");
Expand All @@ -599,6 +602,7 @@ public void onSubscribe(Subscription s) {
env.debug(String.format("%s::onSubscribe(%s)", this, s));
if (!subscription.isCompleted()) {
subscription.complete(s);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a completeImmediately() method on the promise.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw. the plain complete() behavior is odd. I'd think once called, isCompleted() should report true but the implementation queues the object.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree - but I'm not familiar enough with the TCK to change it. Though I can understand why the event is queued, so that the order of events can be verified and duplicates can be detected etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll investigate this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for noticing, I think we can improve the impl there. I'll work on it today after done with work stuff.
I'll PR into this PR then :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reimplemented the Promise which makes this workwithout the additional atomic.
Please see here: jroper#1

onSubscribeCalled.set(true);
} else {
env.flop("Subscriber::onSubscribe called on an already-subscribed Subscriber");
}
Expand All @@ -607,7 +611,7 @@ public void onSubscribe(Subscription s) {
@Override
public void onError(Throwable cause) {
env.debug(String.format("%s::onError(%s)", this, cause));
if (subscription.isCompleted()) {
if (onSubscribeCalled.get()) {
super.onError(cause);
} else {
env.flop(cause, String.format("Subscriber::onError(%s) called before Subscriber::onSubscribe", cause));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,39 @@ public void optional_spec105_emptyStreamMustTerminateBySignallingOnComplete_shou
}, "Subscriber::onComplete() called before Subscriber::onSubscribe");
}

@Test
public void optional_spec105_emptyStreamMustTerminateBySignallingOnComplete_shouldPass_whenOnCompleteSynchronouslyInovked() throws Throwable {
final Publisher<Integer> synchronousOnCompletePublisher = new Publisher<Integer>() {
@Override public void subscribe(final Subscriber<? super Integer> s) {
s.onSubscribe(new Subscription() {
@Override public void request(long n) { }
@Override public void cancel() { }
});
s.onComplete();
}
};

requireOptionalTestPass(new ThrowingRunnable() {
@Override public void run() throws Throwable {
PublisherVerification<Integer> verification = new PublisherVerification<Integer>(newTestEnvironment()) {
@Override public Publisher<Integer> createPublisher(long elements) {
return synchronousOnCompletePublisher;
}

@Override public long maxElementsFromPublisher() {
return 0; // it is an "empty" Publisher
}

@Override public Publisher<Integer> createFailedPublisher() {
return null;
}
};

verification.optional_spec105_emptyStreamMustTerminateBySignallingOnComplete();
}
});
}

@Test
public void required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled_shouldFailForNotCompletingPublisher() throws Throwable {
requireTestFailure(new ThrowingRunnable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,18 @@ public void requireTestSkip(ThrowingRunnable run, String msgPart) {
throw new RuntimeException("Expected TCK to SKIP this test, instead if PASSed!");
}

public void requireOptionalTestPass(ThrowingRunnable run) {
try {
run.run();
} catch (SkipException skip) {
throw new RuntimeException("Expected TCK to PASS this test, instead it was SKIPPED", skip.getCause());
} catch (Throwable throwable) {
throw new RuntimeException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Repackage Errors as Exceptions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just being consistent with requireTestFailure and requireTestSkip.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does not really matter much as those will end up failing the test in the suite and one may always look into the logs. It's not like it's a production app where we have to watch out to not accidentally continue after an OOM or something... Fine as is I think

String.format("Expected TCK to PASS this test, yet it threw %s(%s) instead!",
throwable.getClass().getName(), throwable.getMessage()), throwable);
}
}

/**
* This publisher does NOT fulfil all Publisher spec requirements.
* It's just the bare minimum to enable this test to fail the Subscriber tests.
Expand Down