Skip to content

Commit 5094513

Browse files
committed
+tck reactive-streams#198 allow 0-length publishers to be tested
1 parent 737c1f2 commit 5094513

File tree

7 files changed

+102
-13
lines changed

7 files changed

+102
-13
lines changed

examples/src/test/java/org/reactivestreams/example/unicast/AsyncSubscriberTest.java

+7-6
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,17 @@
44
import org.reactivestreams.Subscriber;
55
import org.reactivestreams.tck.SubscriberBlackboxVerification;
66
import org.reactivestreams.tck.TestEnvironment;
7+
import org.testng.annotations.AfterClass;
8+
import org.testng.annotations.BeforeClass;
79
import org.testng.annotations.Test;
8-
import static org.testng.Assert.assertEquals;
910

10-
import org.testng.annotations.BeforeClass;
11-
import org.testng.annotations.AfterClass;
12-
import java.util.concurrent.Executors;
13-
import java.util.concurrent.ExecutorService;
1411
import java.util.concurrent.CountDownLatch;
15-
import java.util.concurrent.atomic.AtomicLong;
12+
import java.util.concurrent.ExecutorService;
13+
import java.util.concurrent.Executors;
1614
import java.util.concurrent.TimeUnit;
15+
import java.util.concurrent.atomic.AtomicLong;
16+
17+
import static org.testng.Assert.assertEquals;
1718

1819
@Test // Must be here for TestNG to find and run this, do not remove
1920
public class AsyncSubscriberTest extends SubscriberBlackboxVerification<Integer> {

examples/src/test/java/org/reactivestreams/example/unicast/IterablePublisherTest.java

+3-5
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,14 @@
11
package org.reactivestreams.example.unicast;
22

3-
import java.util.Collections;
4-
import java.util.Iterator;
53
import org.reactivestreams.Publisher;
64
import org.reactivestreams.tck.PublisherVerification;
75
import org.reactivestreams.tck.TestEnvironment;
6+
import org.testng.annotations.AfterClass;
7+
import org.testng.annotations.BeforeClass;
88
import org.testng.annotations.Test;
99

10-
import org.testng.annotations.BeforeClass;
11-
import org.testng.annotations.AfterClass;
12-
import java.util.concurrent.Executors;
1310
import java.util.concurrent.ExecutorService;
11+
import java.util.concurrent.Executors;
1412

1513
@Test // Must be here for TestNG to find and run this, do not remove
1614
public class IterablePublisherTest extends PublisherVerification<Integer> {

tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java

+5
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,11 @@ public void required_spec105_mustSignalOnCompleteWhenFiniteStreamTerminates() th
241241
publisherVerification.required_spec105_mustSignalOnCompleteWhenFiniteStreamTerminates();
242242
}
243243

244+
@Override @Test
245+
public void optional_spec105_emptyStreamMustTerminateBySignallingOnComplete() throws Throwable {
246+
publisherVerification.optional_spec105_emptyStreamMustTerminateBySignallingOnComplete();
247+
}
248+
244249
@Override @Test
245250
public void untested_spec106_mustConsiderSubscriptionCancelledAfterOnErrorOrOnCompleteHasBeenCalled() throws Throwable {
246251
publisherVerification.untested_spec106_mustConsiderSubscriptionCancelledAfterOnErrorOrOnCompleteHasBeenCalled();

tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ Optional<T> requestNextElementOrEndOfStream(Publisher<T> pub, ManualSubscriber<T
179179

180180
@Override @Test
181181
public void required_validate_maxElementsFromPublisher() throws Exception {
182-
assertTrue(maxElementsFromPublisher() > 0, "maxElementsFromPublisher MUST return a number > 0");
182+
assertTrue(maxElementsFromPublisher() >= 0, "maxElementsFromPublisher MUST return a number >= 0");
183183
}
184184

185185
@Override @Test
@@ -383,6 +383,20 @@ public void run(Publisher<T> pub) throws Throwable {
383383
});
384384
}
385385

386+
// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.5
387+
@Override @Test
388+
public void optional_spec105_emptyStreamMustTerminateBySignallingOnComplete() throws Throwable {
389+
activePublisherTest(0, true, new PublisherTestRun<T>() {
390+
@Override
391+
public void run(Publisher<T> pub) throws Throwable {
392+
ManualSubscriber<T> sub = env.newManualSubscriber(pub);
393+
sub.request(1);
394+
sub.expectCompletion();
395+
sub.expectNone();
396+
}
397+
});
398+
}
399+
386400
// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.6
387401
@Override @Test
388402
public void untested_spec106_mustConsiderSubscriptionCancelledAfterOnErrorOrOnCompleteHasBeenCalled() throws Throwable {

tck/src/main/java/org/reactivestreams/tck/support/PublisherVerificationRules.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package org.reactivestreams.tck.support;
22

33

4-
54
/**
65
* Internal TCK use only.
76
* Add / Remove tests for PublisherVerification here to make sure that they arre added/removed in the other places.
@@ -16,6 +15,7 @@ public interface PublisherVerificationRules {
1615
void stochastic_spec103_mustSignalOnMethodsSequentially() throws Throwable;
1716
void optional_spec104_mustSignalOnErrorWhenFails() throws Throwable;
1817
void required_spec105_mustSignalOnCompleteWhenFiniteStreamTerminates() throws Throwable;
18+
void optional_spec105_emptyStreamMustTerminateBySignallingOnComplete() throws Throwable;
1919
void untested_spec106_mustConsiderSubscriptionCancelledAfterOnErrorOrOnCompleteHasBeenCalled() throws Throwable;
2020
void required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled() throws Throwable;
2121
void untested_spec107_mustNotEmitFurtherSignalsOnceOnErrorHasBeenSignalled() throws Throwable;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package org.reactivestreams.tck;
2+
3+
import org.reactivestreams.example.unicast.AsyncIterablePublisher;
4+
import org.reactivestreams.Publisher;
5+
import org.testng.annotations.AfterClass;
6+
import org.testng.annotations.BeforeClass;
7+
import org.testng.annotations.Test;
8+
9+
import java.util.Collections;
10+
import java.util.concurrent.ExecutorService;
11+
import java.util.concurrent.Executors;
12+
13+
@Test
14+
public class EmptyLazyPublisherTest extends PublisherVerification<Integer> {
15+
16+
private ExecutorService ex;
17+
18+
public EmptyLazyPublisherTest() {
19+
super(new TestEnvironment());
20+
}
21+
22+
@BeforeClass
23+
void before() { ex = Executors.newFixedThreadPool(4); }
24+
25+
@AfterClass
26+
void after() { if (ex != null) ex.shutdown(); }
27+
28+
@Override
29+
public Publisher<Integer> createPublisher(long elements) {
30+
return new AsyncIterablePublisher<Integer>(Collections.<Integer>emptyList(), ex);
31+
}
32+
33+
@Override
34+
public Publisher<Integer> createErrorStatePublisher() {
35+
return null;
36+
}
37+
38+
@Override
39+
public long maxElementsFromPublisher() {
40+
return 0;
41+
}
42+
}

tck/src/test/java/org/reactivestreams/tck/PublisherVerificationTest.java

+29
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,35 @@ public void required_spec105_mustSignalOnCompleteWhenFiniteStreamTerminates_shou
184184
}, "Expected end-of-stream but got element [3]");
185185
}
186186

187+
@Test
188+
public void optional_spec105_emptyStreamMustTerminateBySignallingOnComplete_shouldNotAllowEagerOnComplete() throws Throwable {
189+
final Publisher<Integer> illegalEmptyEagerOnCompletePublisher = new Publisher<Integer>() {
190+
@Override public void subscribe(final Subscriber<? super Integer> s) {
191+
s.onComplete();
192+
}
193+
};
194+
195+
requireTestFailure(new ThrowingRunnable() {
196+
@Override public void run() throws Throwable {
197+
PublisherVerification<Integer> verification = new PublisherVerification<Integer>(newTestEnvironment()) {
198+
@Override public Publisher<Integer> createPublisher(long elements) {
199+
return illegalEmptyEagerOnCompletePublisher;
200+
}
201+
202+
@Override public long maxElementsFromPublisher() {
203+
return 0; // it is an "empty" Publisher
204+
}
205+
206+
@Override public Publisher<Integer> createErrorStatePublisher() {
207+
return null;
208+
}
209+
};
210+
211+
verification.optional_spec105_emptyStreamMustTerminateBySignallingOnComplete();
212+
}
213+
}, "Subscriber::onComplete() called before Subscriber::onSubscribe");
214+
}
215+
187216
@Test
188217
public void required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled_shouldFailForNotCompletingPublisher() throws Throwable {
189218
requireTestFailure(new ThrowingRunnable() {

0 commit comments

Comments
 (0)