Skip to content

Commit 3c82229

Browse files
jroperviktorklang
authored andcommitted
Ensure TCK passes with empty publisher that synchronously completes
Fixes #422
1 parent 8d68455 commit 3c82229

File tree

3 files changed

+51
-2
lines changed

3 files changed

+51
-2
lines changed

Diff for: tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.concurrent.CopyOnWriteArrayList;
2525
import java.util.concurrent.CountDownLatch;
2626
import java.util.concurrent.TimeUnit;
27+
import java.util.concurrent.atomic.AtomicBoolean;
2728

2829
import static org.testng.Assert.assertTrue;
2930
import static org.testng.Assert.fail;
@@ -570,6 +571,8 @@ public void expectNone(long withinMillis, String errMsgPrefix) throws Interrupte
570571

571572
public static class ManualSubscriberWithSubscriptionSupport<T> extends ManualSubscriber<T> {
572573

574+
private final AtomicBoolean onSubscribeCalled = new AtomicBoolean();
575+
573576
public ManualSubscriberWithSubscriptionSupport(TestEnvironment env) {
574577
super(env);
575578
}
@@ -587,7 +590,7 @@ public void onNext(T element) {
587590
@Override
588591
public void onComplete() {
589592
env.debug(this + "::onComplete()");
590-
if (subscription.isCompleted()) {
593+
if (onSubscribeCalled.get()) {
591594
super.onComplete();
592595
} else {
593596
env.flop("Subscriber::onComplete() called before Subscriber::onSubscribe");
@@ -599,6 +602,7 @@ public void onSubscribe(Subscription s) {
599602
env.debug(String.format("%s::onSubscribe(%s)", this, s));
600603
if (!subscription.isCompleted()) {
601604
subscription.complete(s);
605+
onSubscribeCalled.set(true);
602606
} else {
603607
env.flop("Subscriber::onSubscribe called on an already-subscribed Subscriber");
604608
}
@@ -607,7 +611,7 @@ public void onSubscribe(Subscription s) {
607611
@Override
608612
public void onError(Throwable cause) {
609613
env.debug(String.format("%s::onError(%s)", this, cause));
610-
if (subscription.isCompleted()) {
614+
if (onSubscribeCalled.get()) {
611615
super.onError(cause);
612616
} else {
613617
env.flop(cause, String.format("Subscriber::onError(%s) called before Subscriber::onSubscribe", cause));

Diff for: tck/src/test/java/org/reactivestreams/tck/PublisherVerificationTest.java

+33
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,39 @@ public void optional_spec105_emptyStreamMustTerminateBySignallingOnComplete_shou
228228
}, "Subscriber::onComplete() called before Subscriber::onSubscribe");
229229
}
230230

231+
@Test
232+
public void optional_spec105_emptyStreamMustTerminateBySignallingOnComplete_shouldPass_whenOnCompleteSynchronouslyInovked() throws Throwable {
233+
final Publisher<Integer> synchronousOnCompletePublisher = new Publisher<Integer>() {
234+
@Override public void subscribe(final Subscriber<? super Integer> s) {
235+
s.onSubscribe(new Subscription() {
236+
@Override public void request(long n) { }
237+
@Override public void cancel() { }
238+
});
239+
s.onComplete();
240+
}
241+
};
242+
243+
requireOptionalTestPass(new ThrowingRunnable() {
244+
@Override public void run() throws Throwable {
245+
PublisherVerification<Integer> verification = new PublisherVerification<Integer>(newTestEnvironment()) {
246+
@Override public Publisher<Integer> createPublisher(long elements) {
247+
return synchronousOnCompletePublisher;
248+
}
249+
250+
@Override public long maxElementsFromPublisher() {
251+
return 0; // it is an "empty" Publisher
252+
}
253+
254+
@Override public Publisher<Integer> createFailedPublisher() {
255+
return null;
256+
}
257+
};
258+
259+
verification.optional_spec105_emptyStreamMustTerminateBySignallingOnComplete();
260+
}
261+
});
262+
}
263+
231264
@Test
232265
public void required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled_shouldFailForNotCompletingPublisher() throws Throwable {
233266
requireTestFailure(new ThrowingRunnable() {

Diff for: tck/src/test/java/org/reactivestreams/tck/flow/support/TCKVerificationSupport.java

+12
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,18 @@ public void requireTestSkip(ThrowingRunnable run, String msgPart) {
8181
throw new RuntimeException("Expected TCK to SKIP this test, instead if PASSed!");
8282
}
8383

84+
public void requireOptionalTestPass(ThrowingRunnable run) {
85+
try {
86+
run.run();
87+
} catch (SkipException skip) {
88+
throw new RuntimeException("Expected TCK to PASS this test, instead it was SKIPPED", skip.getCause());
89+
} catch (Throwable throwable) {
90+
throw new RuntimeException(
91+
String.format("Expected TCK to PASS this test, yet it threw %s(%s) instead!",
92+
throwable.getClass().getName(), throwable.getMessage()), throwable);
93+
}
94+
}
95+
8496
/**
8597
* This publisher does NOT fulfil all Publisher spec requirements.
8698
* It's just the bare minimum to enable this test to fail the Subscriber tests.

0 commit comments

Comments
 (0)