Skip to content

Commit bc13273

Browse files
committed
+tck #284 support "demand when all downstreams demand" Processor in TCK
1 parent 4264e1d commit bc13273

File tree

3 files changed

+195
-23
lines changed

3 files changed

+195
-23
lines changed

Diff for: tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java

+20-13
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.reactivestreams.tck.support.Function;
1212
import org.reactivestreams.tck.support.SubscriberWhiteboxVerificationRules;
1313
import org.reactivestreams.tck.support.PublisherVerificationRules;
14+
import org.reactivestreams.tck.support.TestException;
1415
import org.testng.annotations.BeforeMethod;
1516
import org.testng.annotations.Test;
1617

@@ -387,23 +388,29 @@ public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANo
387388
optionalMultipleSubscribersTest(2, new Function<Long,TestSetup>() {
388389
@Override
389390
public TestSetup apply(Long aLong) throws Throwable {
390-
return new TestSetup(env, processorBufferSize) {{
391+
return new TestSetup(env, processorBufferSize, false) {{
391392
final ManualSubscriberWithErrorCollection<T> sub1 = new ManualSubscriberWithErrorCollection<T>(env);
392-
env.subscribe(processor, sub1);
393-
394393
final ManualSubscriberWithErrorCollection<T> sub2 = new ManualSubscriberWithErrorCollection<T>(env);
394+
395+
// connect upstream
396+
env.subscribe(this, processor);
397+
// connect downstreams
398+
env.subscribe(processor, sub1);
399+
sub1.request(2);
395400
env.subscribe(processor, sub2);
401+
sub2.request(1);
396402

397-
sub1.request(1);
403+
// request bubbles up to upstream publisher:
398404
expectRequest();
399405
final T x = sendNextTFromUpstream();
400406
expectNextElement(sub1, x);
401407
sub1.request(1);
402408

403409
// sub1 has received one element, and has one demand pending
404-
// sub2 has not yet requested anything
410+
// sub2 has received one element, and no more pending demand
405411

406-
final Exception ex = new RuntimeException("Test exception");
412+
// if upstream fails, both should get the error signal
413+
final Exception ex = new TestException();
407414
sendError(ex);
408415
sub1.expectError(ex);
409416
sub2.expectError(ex);
@@ -472,11 +479,11 @@ public void onError(Throwable cause) {
472479
// must immediately pass on `onError` events received from its upstream to its downstream
473480
@Test
474481
public void mustImmediatelyPassOnOnErrorEventsReceivedFromItsUpstreamToItsDownstream() throws Exception {
475-
new TestSetup(env, processorBufferSize) {{
482+
new TestSetup(env, processorBufferSize, true) {{
476483
final ManualSubscriberWithErrorCollection<T> sub = new ManualSubscriberWithErrorCollection<T>(env);
477484
env.subscribe(processor, sub);
478485

479-
final Exception ex = new RuntimeException("Test exception");
486+
final Exception ex = new TestException();
480487
sendError(ex);
481488
sub.expectError(ex); // "immediately", i.e. without a preceding request
482489

@@ -629,13 +636,13 @@ public void required_mustRequestFromUpstreamForElementsThatHaveBeenRequestedLong
629636
optionalMultipleSubscribersTest(2, new Function<Long,TestSetup>() {
630637
@Override
631638
public TestSetup apply(Long subscribers) throws Throwable {
632-
return new TestSetup(env, processorBufferSize) {{
633-
ManualSubscriber<T> sub1 = newSubscriber();
639+
return new TestSetup(env, processorBufferSize, false) {{
640+
final ManualSubscriber<T> sub1 = newSubscriber();
634641
sub1.request(20);
635642

636643
long totalRequests = expectRequest();
637644
final T x = sendNextTFromUpstream();
638-
expectNextElement(sub1, x);
645+
expectNextElement(sub1, x); // correct, this is not valid in case of "wait for slowest"
639646

640647
if (totalRequests == 1) {
641648
totalRequests += expectRequest();
@@ -700,11 +707,11 @@ public abstract class TestSetup extends ManualPublisher<T> {
700707

701708
final Processor<T, T> processor;
702709

703-
public TestSetup(TestEnvironment env, int testBufferSize) throws InterruptedException {
710+
public TestSetup(TestEnvironment env, int testBufferSize, boolean subscribeProcessorToEnvPublisher) throws InterruptedException {
704711
super(env);
705712
tees = env.newManualSubscriber(createHelperPublisher(Long.MAX_VALUE));
706713
processor = createIdentityProcessor(testBufferSize);
707-
subscribe(processor);
714+
if (subscribeProcessorToEnvPublisher) subscribe(processor);
708715
}
709716

710717
public ManualSubscriber<T> newSubscriber() throws InterruptedException {

Diff for: tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java

+4
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,10 @@ public <T> T flopAndFail(String msg) {
183183
}
184184

185185

186+
public <T> void subscribe(Publisher<T> pub, Subscriber<T> sub) throws InterruptedException {
187+
pub.subscribe(sub);
188+
verifyNoAsyncErrorsNoDelay();
189+
}
186190

187191
public <T> void subscribe(Publisher<T> pub, TestSubscriber<T> sub) throws InterruptedException {
188192
subscribe(pub, sub, defaultTimeoutMillis);

Diff for: tck/src/test/java/org/reactivestreams/tck/IdentityProcessorVerificationTest.java

+171-10
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99
import org.testng.annotations.BeforeClass;
1010
import org.testng.annotations.Test;
1111

12-
import java.util.concurrent.ExecutorService;
13-
import java.util.concurrent.Executors;
12+
import java.util.concurrent.*;
13+
import java.util.concurrent.atomic.AtomicLong;
1414

1515
/**
1616
* Validates that the TCK's {@link IdentityProcessorVerification} fails with nice human readable errors.
@@ -27,25 +27,36 @@ public class IdentityProcessorVerificationTest extends TCKVerificationSupport {
2727
@Test
2828
public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError_shouldBeIgnored() throws Throwable {
2929
requireTestSkip(new ThrowingRunnable() {
30-
@Override public void run() throws Throwable {
31-
new IdentityProcessorVerification<Integer>(newTestEnvironment(), DEFAULT_TIMEOUT_MILLIS){
32-
@Override public Processor<Integer, Integer> createIdentityProcessor(int bufferSize) {
30+
@Override
31+
public void run() throws Throwable {
32+
new IdentityProcessorVerification<Integer>(newTestEnvironment(), DEFAULT_TIMEOUT_MILLIS) {
33+
@Override
34+
public Processor<Integer, Integer> createIdentityProcessor(int bufferSize) {
3335
return new NoopProcessor();
3436
}
3537

36-
@Override public ExecutorService publisherExecutorService() { return ex; }
38+
@Override
39+
public ExecutorService publisherExecutorService() {
40+
return ex;
41+
}
3742

38-
@Override public Integer createElement(int element) { return element; }
43+
@Override
44+
public Integer createElement(int element) {
45+
return element;
46+
}
3947

40-
@Override public Publisher<Integer> createHelperPublisher(long elements) {
48+
@Override
49+
public Publisher<Integer> createHelperPublisher(long elements) {
4150
return SKIP;
4251
}
4352

44-
@Override public Publisher<Integer> createFailedPublisher() {
53+
@Override
54+
public Publisher<Integer> createFailedPublisher() {
4555
return SKIP;
4656
}
4757

48-
@Override public long maxSupportedSubscribers() {
58+
@Override
59+
public long maxSupportedSubscribers() {
4960
return 1; // can only support 1 subscribe => unable to run this test
5061
}
5162
}.required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError();
@@ -115,6 +126,156 @@ public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANo
115126
}, "Did not receive expected error on downstream within " + DEFAULT_TIMEOUT_MILLIS);
116127
}
117128

129+
@Test
130+
public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError_shouldAllowSignalingElementAfterBothDownstreamsDemand() throws Throwable {
131+
final TestEnvironment env = newTestEnvironment();
132+
new IdentityProcessorVerification<Integer>(env, DEFAULT_TIMEOUT_MILLIS) {
133+
@Override
134+
public Processor<Integer, Integer> createIdentityProcessor(int bufferSize) { // knowingly ignoring buffer size, acting as-if 0
135+
return new Processor<Integer, Integer>() {
136+
137+
private volatile Subscription upstreamSubscription;
138+
139+
private final CopyOnWriteArrayList<MySubscription> subs = new CopyOnWriteArrayList<MySubscription>();
140+
private final CopyOnWriteArrayList<Subscriber<? super Integer>> subscribers = new CopyOnWriteArrayList<Subscriber<? super Integer>>();
141+
private final AtomicLong demand1 = new AtomicLong();
142+
private final AtomicLong demand2 = new AtomicLong();
143+
private final CountDownLatch awaitLatch = new CountDownLatch(2); // to know when both subscribers have signalled demand
144+
145+
@Override
146+
public void subscribe(final Subscriber<? super Integer> s) {
147+
int subscriberCount = subs.size();
148+
if (subscriberCount == 0) s.onSubscribe(createSubscription(awaitLatch, s, demand1));
149+
else if (subscriberCount == 1) s.onSubscribe(createSubscription(awaitLatch, s, demand2));
150+
else throw new RuntimeException(String.format("This for-test-purposes-processor supports only 2 subscribers, yet got %s!", subscriberCount));
151+
}
152+
153+
@Override
154+
public void onSubscribe(Subscription s) {
155+
this.upstreamSubscription = s;
156+
}
157+
158+
@Override
159+
public void onNext(Integer elem) {
160+
for (Subscriber<? super Integer> subscriber : subscribers) {
161+
try {
162+
subscriber.onNext(elem);
163+
} catch (Exception ex) {
164+
env.flop(ex, String.format("Calling onNext on [%s] should not throw! See https://github.com/reactive-streams/reactive-streams-jvm#2.13", subscriber));
165+
}
166+
}
167+
}
168+
169+
@Override
170+
public void onError(Throwable t) {
171+
for (Subscriber<? super Integer> subscriber : subscribers) {
172+
try {
173+
subscriber.onError(t);
174+
} catch (Exception ex) {
175+
env.flop(ex, String.format("Calling onError on [%s] should not throw! See https://github.com/reactive-streams/reactive-streams-jvm#2.13", subscriber));
176+
}
177+
}
178+
}
179+
180+
@Override
181+
public void onComplete() {
182+
for (Subscriber<? super Integer> subscriber : subscribers) {
183+
try {
184+
subscriber.onComplete();
185+
} catch (Exception ex) {
186+
env.flop(ex, String.format("Calling onComplete on [%s] should not throw! See https://github.com/reactive-streams/reactive-streams-jvm#2.13", subscriber));
187+
}
188+
}
189+
}
190+
191+
private Subscription createSubscription(CountDownLatch awaitLatch, final Subscriber<? super Integer> s, final AtomicLong demand) {
192+
final MySubscription sub = new MySubscription(awaitLatch, s, demand);
193+
subs.add(sub);
194+
subscribers.add(s);
195+
return sub;
196+
}
197+
198+
final class MySubscription implements Subscription {
199+
private final CountDownLatch awaitLatch;
200+
private final Subscriber<? super Integer> s;
201+
private final AtomicLong demand;
202+
203+
public MySubscription(CountDownLatch awaitTwoLatch, Subscriber<? super Integer> s, AtomicLong demand) {
204+
this.awaitLatch = awaitTwoLatch;
205+
this.s = s;
206+
this.demand = demand;
207+
}
208+
209+
@Override
210+
public void request(final long n) {
211+
ex.execute(new Runnable() {
212+
@Override
213+
public void run() {
214+
demand.addAndGet(n); // naive, but good enough for the test
215+
awaitLatch.countDown();
216+
try {
217+
awaitLatch.await(env.defaultTimeoutMillis(), TimeUnit.MILLISECONDS);
218+
while (demand.getAndDecrement() > 0) {
219+
upstreamSubscription.request(1);
220+
}
221+
} catch (InterruptedException e) {
222+
env.flop(e, "Interrupted while awaiting for all downstreams to signal some demand.");
223+
}
224+
225+
}
226+
});
227+
}
228+
229+
@Override
230+
public void cancel() {
231+
demand.set(Long.MIN_VALUE); // naive but OK for this test
232+
}
233+
234+
@Override
235+
public String toString() {
236+
return String.format("IdentityProcessorVerificationTest:MySubscription(%s, demand = %s)", s, demand);
237+
}
238+
}
239+
};
240+
}
241+
242+
@Override
243+
public ExecutorService publisherExecutorService() {
244+
return ex;
245+
}
246+
247+
@Override
248+
public Integer createElement(int element) {
249+
return element;
250+
}
251+
252+
@Override
253+
public Publisher<Integer> createHelperPublisher(long elements) {
254+
return new Publisher<Integer>() {
255+
@Override
256+
public void subscribe(final Subscriber<? super Integer> s) {
257+
s.onSubscribe(new NoopSubscription() {
258+
@Override public void request(long n) {
259+
ex.execute(new Runnable() {
260+
@Override public void run() {
261+
for (int i = 0; i < 10; i++) {
262+
s.onNext(i);
263+
}
264+
}
265+
});
266+
}
267+
});
268+
}
269+
};
270+
}
271+
272+
@Override
273+
public Publisher<Integer> createFailedPublisher() {
274+
return SKIP;
275+
}
276+
}.required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError();
277+
}
278+
118279
// FAILING IMPLEMENTATIONS //
119280

120281
final Publisher<Integer> SKIP = null;

0 commit comments

Comments
 (0)