Skip to content

Commit f3883f9

Browse files
committed
+tck reactive-streams#284 support "demand when all downstreams demand" Processor in TCK
1 parent 4264e1d commit f3883f9

File tree

3 files changed

+188
-27
lines changed

3 files changed

+188
-27
lines changed

Diff for: tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java

+20-13
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.reactivestreams.tck.support.Function;
1212
import org.reactivestreams.tck.support.SubscriberWhiteboxVerificationRules;
1313
import org.reactivestreams.tck.support.PublisherVerificationRules;
14+
import org.reactivestreams.tck.support.TestException;
1415
import org.testng.annotations.BeforeMethod;
1516
import org.testng.annotations.Test;
1617

@@ -387,23 +388,29 @@ public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANo
387388
optionalMultipleSubscribersTest(2, new Function<Long,TestSetup>() {
388389
@Override
389390
public TestSetup apply(Long aLong) throws Throwable {
390-
return new TestSetup(env, processorBufferSize) {{
391+
return new TestSetup(env, processorBufferSize, false) {{
391392
final ManualSubscriberWithErrorCollection<T> sub1 = new ManualSubscriberWithErrorCollection<T>(env);
392-
env.subscribe(processor, sub1);
393-
394393
final ManualSubscriberWithErrorCollection<T> sub2 = new ManualSubscriberWithErrorCollection<T>(env);
394+
395+
// connect upstream
396+
env.subscribe(this, processor);
397+
// connect downstreams
398+
env.subscribe(processor, sub1);
399+
sub1.request(2);
395400
env.subscribe(processor, sub2);
401+
sub2.request(1);
396402

397-
sub1.request(1);
403+
// request bubbles up to upstream publisher:
398404
expectRequest();
399405
final T x = sendNextTFromUpstream();
400406
expectNextElement(sub1, x);
401407
sub1.request(1);
402408

403409
// sub1 has received one element, and has one demand pending
404-
// sub2 has not yet requested anything
410+
// sub2 has received one element, and no more pending demand
405411

406-
final Exception ex = new RuntimeException("Test exception");
412+
// if upstream fails, both should get the error signal
413+
final Exception ex = new TestException();
407414
sendError(ex);
408415
sub1.expectError(ex);
409416
sub2.expectError(ex);
@@ -472,11 +479,11 @@ public void onError(Throwable cause) {
472479
// must immediately pass on `onError` events received from its upstream to its downstream
473480
@Test
474481
public void mustImmediatelyPassOnOnErrorEventsReceivedFromItsUpstreamToItsDownstream() throws Exception {
475-
new TestSetup(env, processorBufferSize) {{
482+
new TestSetup(env, processorBufferSize, true) {{
476483
final ManualSubscriberWithErrorCollection<T> sub = new ManualSubscriberWithErrorCollection<T>(env);
477484
env.subscribe(processor, sub);
478485

479-
final Exception ex = new RuntimeException("Test exception");
486+
final Exception ex = new TestException();
480487
sendError(ex);
481488
sub.expectError(ex); // "immediately", i.e. without a preceding request
482489

@@ -629,13 +636,13 @@ public void required_mustRequestFromUpstreamForElementsThatHaveBeenRequestedLong
629636
optionalMultipleSubscribersTest(2, new Function<Long,TestSetup>() {
630637
@Override
631638
public TestSetup apply(Long subscribers) throws Throwable {
632-
return new TestSetup(env, processorBufferSize) {{
633-
ManualSubscriber<T> sub1 = newSubscriber();
639+
return new TestSetup(env, processorBufferSize, false) {{
640+
final ManualSubscriber<T> sub1 = newSubscriber();
634641
sub1.request(20);
635642

636643
long totalRequests = expectRequest();
637644
final T x = sendNextTFromUpstream();
638-
expectNextElement(sub1, x);
645+
expectNextElement(sub1, x); // correct, this is not valid in case of "wait for slowest"
639646

640647
if (totalRequests == 1) {
641648
totalRequests += expectRequest();
@@ -700,11 +707,11 @@ public abstract class TestSetup extends ManualPublisher<T> {
700707

701708
final Processor<T, T> processor;
702709

703-
public TestSetup(TestEnvironment env, int testBufferSize) throws InterruptedException {
710+
public TestSetup(TestEnvironment env, int testBufferSize, boolean subscribeProcessorToEnvPublisher) throws InterruptedException {
704711
super(env);
705712
tees = env.newManualSubscriber(createHelperPublisher(Long.MAX_VALUE));
706713
processor = createIdentityProcessor(testBufferSize);
707-
subscribe(processor);
714+
if (subscribeProcessorToEnvPublisher) subscribe(processor);
708715
}
709716

710717
public ManualSubscriber<T> newSubscriber() throws InterruptedException {

Diff for: tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java

+4
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,10 @@ public <T> T flopAndFail(String msg) {
183183
}
184184

185185

186+
public <T> void subscribe(Publisher<T> pub, Subscriber<T> sub) throws InterruptedException {
187+
pub.subscribe(sub);
188+
verifyNoAsyncErrorsNoDelay();
189+
}
186190

187191
public <T> void subscribe(Publisher<T> pub, TestSubscriber<T> sub) throws InterruptedException {
188192
subscribe(pub, sub, defaultTimeoutMillis);

Diff for: tck/src/test/java/org/reactivestreams/tck/IdentityProcessorVerificationTest.java

+164-14
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99
import org.testng.annotations.BeforeClass;
1010
import org.testng.annotations.Test;
1111

12-
import java.util.concurrent.ExecutorService;
13-
import java.util.concurrent.Executors;
12+
import java.util.concurrent.*;
13+
import java.util.concurrent.atomic.AtomicLong;
1414

1515
/**
1616
* Validates that the TCK's {@link IdentityProcessorVerification} fails with nice human readable errors.
@@ -27,25 +27,28 @@ public class IdentityProcessorVerificationTest extends TCKVerificationSupport {
2727
@Test
2828
public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError_shouldBeIgnored() throws Throwable {
2929
requireTestSkip(new ThrowingRunnable() {
30-
@Override public void run() throws Throwable {
31-
new IdentityProcessorVerification<Integer>(newTestEnvironment(), DEFAULT_TIMEOUT_MILLIS){
32-
@Override public Processor<Integer, Integer> createIdentityProcessor(int bufferSize) {
30+
@Override
31+
public void run() throws Throwable {
32+
new IdentityProcessorVerification<Integer>(newTestEnvironment(), DEFAULT_TIMEOUT_MILLIS) {
33+
@Override
34+
public Processor<Integer, Integer> createIdentityProcessor(int bufferSize) {
3335
return new NoopProcessor();
3436
}
3537

36-
@Override public ExecutorService publisherExecutorService() { return ex; }
38+
@Override
39+
public ExecutorService publisherExecutorService() { return ex; }
3740

38-
@Override public Integer createElement(int element) { return element; }
41+
@Override
42+
public Integer createElement(int element) { return element; }
3943

40-
@Override public Publisher<Integer> createHelperPublisher(long elements) {
41-
return SKIP;
42-
}
44+
@Override
45+
public Publisher<Integer> createHelperPublisher(long elements) { return SKIP; }
4346

44-
@Override public Publisher<Integer> createFailedPublisher() {
45-
return SKIP;
46-
}
47+
@Override
48+
public Publisher<Integer> createFailedPublisher() { return SKIP; }
4749

48-
@Override public long maxSupportedSubscribers() {
50+
@Override
51+
public long maxSupportedSubscribers() {
4952
return 1; // can only support 1 subscribe => unable to run this test
5053
}
5154
}.required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError();
@@ -115,6 +118,153 @@ public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANo
115118
}, "Did not receive expected error on downstream within " + DEFAULT_TIMEOUT_MILLIS);
116119
}
117120

121+
@Test
122+
public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError_shouldAllowSignalingElementAfterBothDownstreamsDemand() throws Throwable {
123+
final TestEnvironment env = newTestEnvironment();
124+
new IdentityProcessorVerification<Integer>(env, DEFAULT_TIMEOUT_MILLIS) {
125+
@Override
126+
public Processor<Integer, Integer> createIdentityProcessor(int bufferSize) { // knowingly ignoring buffer size, acting as-if 0
127+
return new Processor<Integer, Integer>() {
128+
129+
private volatile Subscription upstreamSubscription;
130+
131+
private final CopyOnWriteArrayList<MySubscription> subs = new CopyOnWriteArrayList<MySubscription>();
132+
private final CopyOnWriteArrayList<Subscriber<? super Integer>> subscribers = new CopyOnWriteArrayList<Subscriber<? super Integer>>();
133+
private final AtomicLong demand1 = new AtomicLong();
134+
private final AtomicLong demand2 = new AtomicLong();
135+
private final CountDownLatch awaitLatch = new CountDownLatch(2); // to know when both subscribers have signalled demand
136+
137+
@Override
138+
public void subscribe(final Subscriber<? super Integer> s) {
139+
int subscriberCount = subs.size();
140+
if (subscriberCount == 0) s.onSubscribe(createSubscription(awaitLatch, s, demand1));
141+
else if (subscriberCount == 1) s.onSubscribe(createSubscription(awaitLatch, s, demand2));
142+
else throw new RuntimeException(String.format("This for-test-purposes-processor supports only 2 subscribers, yet got %s!", subscriberCount));
143+
}
144+
145+
@Override
146+
public void onSubscribe(Subscription s) {
147+
this.upstreamSubscription = s;
148+
}
149+
150+
@Override
151+
public void onNext(Integer elem) {
152+
for (Subscriber<? super Integer> subscriber : subscribers) {
153+
try {
154+
subscriber.onNext(elem);
155+
} catch (Exception ex) {
156+
env.flop(ex, String.format("Calling onNext on [%s] should not throw! See https://github.com/reactive-streams/reactive-streams-jvm#2.13", subscriber));
157+
}
158+
}
159+
}
160+
161+
@Override
162+
public void onError(Throwable t) {
163+
for (Subscriber<? super Integer> subscriber : subscribers) {
164+
try {
165+
subscriber.onError(t);
166+
} catch (Exception ex) {
167+
env.flop(ex, String.format("Calling onError on [%s] should not throw! See https://github.com/reactive-streams/reactive-streams-jvm#2.13", subscriber));
168+
}
169+
}
170+
}
171+
172+
@Override
173+
public void onComplete() {
174+
for (Subscriber<? super Integer> subscriber : subscribers) {
175+
try {
176+
subscriber.onComplete();
177+
} catch (Exception ex) {
178+
env.flop(ex, String.format("Calling onComplete on [%s] should not throw! See https://github.com/reactive-streams/reactive-streams-jvm#2.13", subscriber));
179+
}
180+
}
181+
}
182+
183+
private Subscription createSubscription(CountDownLatch awaitLatch, final Subscriber<? super Integer> s, final AtomicLong demand) {
184+
final MySubscription sub = new MySubscription(awaitLatch, s, demand);
185+
subs.add(sub);
186+
subscribers.add(s);
187+
return sub;
188+
}
189+
190+
final class MySubscription implements Subscription {
191+
private final CountDownLatch awaitLatch;
192+
private final Subscriber<? super Integer> s;
193+
private final AtomicLong demand;
194+
195+
public MySubscription(CountDownLatch awaitTwoLatch, Subscriber<? super Integer> s, AtomicLong demand) {
196+
this.awaitLatch = awaitTwoLatch;
197+
this.s = s;
198+
this.demand = demand;
199+
}
200+
201+
@Override
202+
public void request(final long n) {
203+
new Thread(new Runnable() {
204+
@Override
205+
public void run() {
206+
demand.addAndGet(n); // naive, but good enough for the test
207+
awaitLatch.countDown();
208+
try {
209+
awaitLatch.await(env.defaultTimeoutMillis(), TimeUnit.MILLISECONDS);
210+
while (demand.getAndDecrement() > 0) {
211+
upstreamSubscription.request(1);
212+
}
213+
} catch (InterruptedException e) {
214+
env.flop(e, "Interrupted while awaiting for all downstreams to signal some demand.");
215+
}
216+
217+
}
218+
}).start();
219+
}
220+
221+
@Override
222+
public void cancel() {
223+
demand.set(Long.MIN_VALUE); // naive but OK for this test
224+
}
225+
226+
@Override
227+
public String toString() {
228+
return String.format("IdentityProcessorVerificationTest:MySubscription(%s, demand = %s)", s, demand);
229+
}
230+
}
231+
};
232+
}
233+
234+
@Override
235+
public ExecutorService publisherExecutorService() {
236+
return ex;
237+
}
238+
239+
@Override
240+
public Integer createElement(int element) {
241+
return element;
242+
}
243+
244+
@Override
245+
public Publisher<Integer> createHelperPublisher(long elements) {
246+
return new Publisher<Integer>() {
247+
@Override
248+
public void subscribe(final Subscriber<? super Integer> s) {
249+
s.onSubscribe(new NoopSubscription() {
250+
@Override
251+
public void request(long n) {
252+
for (int i = 0; i < 10; i++) {
253+
s.onNext(i);
254+
}
255+
}
256+
});
257+
}
258+
};
259+
}
260+
261+
@Override
262+
public Publisher<Integer> createFailedPublisher() {
263+
return SKIP;
264+
}
265+
}.required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError();
266+
}
267+
118268
// FAILING IMPLEMENTATIONS //
119269

120270
final Publisher<Integer> SKIP = null;

0 commit comments

Comments
 (0)