Skip to content

Commit e930022

Browse files
committed
Add AsyncRangePublisherTest which demonstrates TCK bug fix
1 parent f71f9ea commit e930022

File tree

1 file changed

+175
-0
lines changed

1 file changed

+175
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
package org.reactivestreams.example.unicast;
2+
3+
import org.reactivestreams.Publisher;
4+
import org.reactivestreams.Subscriber;
5+
import org.reactivestreams.Subscription;
6+
import org.reactivestreams.tck.PublisherVerification;
7+
import org.reactivestreams.tck.TestEnvironment;
8+
import org.testng.annotations.AfterClass;
9+
import org.testng.annotations.BeforeClass;
10+
import org.testng.annotations.Test;
11+
12+
import java.util.concurrent.BlockingQueue;
13+
import java.util.concurrent.Executor;
14+
import java.util.concurrent.ExecutorService;
15+
import java.util.concurrent.Executors;
16+
import java.util.concurrent.LinkedBlockingQueue;
17+
18+
@Test // Must be here for TestNG to find and run this, do not remove
19+
public class AsyncRangePublisherTest extends PublisherVerification<Integer> {
20+
private static final int TERMINAL_DELAY_MS = 200;
21+
private static final int DEFAULT_TIMEOUT_MS = TERMINAL_DELAY_MS * 4;
22+
private static final int DEFAULT_POLL_INTERVAL_MS = TERMINAL_DELAY_MS / 2;
23+
private ExecutorService e;
24+
@BeforeClass
25+
void before() { e = Executors.newCachedThreadPool(); }
26+
@AfterClass
27+
void after() { if (e != null) e.shutdown(); }
28+
29+
public AsyncRangePublisherTest() {
30+
super(new TestEnvironment(DEFAULT_TIMEOUT_MS, 50, DEFAULT_POLL_INTERVAL_MS));
31+
}
32+
33+
@Override
34+
public Publisher<Integer> createPublisher(long elements) {
35+
return new AsyncPublisher<Integer>(new RangePublisher(1, (int)elements), e);
36+
}
37+
38+
@Override
39+
public Publisher<Integer> createFailedPublisher() {
40+
return null;
41+
}
42+
43+
private static final class AsyncPublisher<T> implements Publisher<T> {
44+
private final Publisher<T> original;
45+
private final Executor executor;
46+
47+
private AsyncPublisher(Publisher<T> original, Executor executor) {
48+
this.original = requireNonNull(original);
49+
this.executor = requireNonNull(executor);
50+
}
51+
52+
@Override
53+
public void subscribe(Subscriber<? super T> s) {
54+
original.subscribe(new AsyncSubscriber<T>(requireNonNull(s), executor));
55+
}
56+
57+
private static final class AsyncSubscriber<T> implements Subscriber<T> {
58+
private final BlockingQueue<Object> signalQueue = new LinkedBlockingQueue<Object>();
59+
60+
private AsyncSubscriber(final Subscriber<? super T> original, final Executor executor) {
61+
try {
62+
executor.execute(new Runnable() {
63+
@Override
64+
public void run() {
65+
for (; ; ) {
66+
try {
67+
final Object signal = signalQueue.take();
68+
if (signal instanceof Cancelled) {
69+
return;
70+
} else if (signal instanceof TerminalSignal) {
71+
Thread.sleep(TERMINAL_DELAY_MS);
72+
TerminalSignal terminalSignal = (TerminalSignal) signal;
73+
if (terminalSignal.cause == null) {
74+
original.onComplete();
75+
} else {
76+
original.onError(terminalSignal.cause);
77+
}
78+
return;
79+
} else if (signal instanceof OnSubscribeSignal) {
80+
original.onSubscribe(((OnSubscribeSignal) signal).subscription);
81+
} else {
82+
@SuppressWarnings("unchecked")
83+
final T onNextSignal = ((OnNextSignal<T>) signal).onNext;
84+
original.onNext(onNextSignal);
85+
}
86+
} catch (InterruptedException ex) {
87+
throw new RuntimeException(ex);
88+
}
89+
}
90+
}
91+
});
92+
} catch (Throwable cause) {
93+
original.onSubscribe(new Subscription() {
94+
@Override
95+
public void request(long n) {
96+
}
97+
98+
@Override
99+
public void cancel() {
100+
}
101+
});
102+
original.onError(new IllegalStateException("Executor rejected", cause));
103+
}
104+
}
105+
106+
@Override
107+
public void onSubscribe(final Subscription s) {
108+
signalQueue.add(new OnSubscribeSignal(new Subscription() {
109+
@Override
110+
public void request(long n) {
111+
s.request(n);
112+
}
113+
114+
@Override
115+
public void cancel() {
116+
try {
117+
s.cancel();
118+
} finally {
119+
signalQueue.add(new Cancelled());
120+
}
121+
}
122+
}));
123+
}
124+
125+
@Override
126+
public void onNext(T t) {
127+
signalQueue.add(new OnNextSignal<T>(t));
128+
}
129+
130+
@Override
131+
public void onError(Throwable t) {
132+
signalQueue.add(new TerminalSignal(requireNonNull(t)));
133+
}
134+
135+
@Override
136+
public void onComplete() {
137+
signalQueue.add(new TerminalSignal(null));
138+
}
139+
}
140+
141+
private static final class TerminalSignal {
142+
private final Throwable cause;
143+
144+
private TerminalSignal(Throwable cause) {
145+
this.cause = cause;
146+
}
147+
}
148+
149+
private static final class OnSubscribeSignal {
150+
private final Subscription subscription;
151+
152+
private OnSubscribeSignal(Subscription subscription) {
153+
this.subscription = subscription;
154+
}
155+
}
156+
157+
private static final class OnNextSignal<T> {
158+
private final T onNext;
159+
160+
private OnNextSignal(T onNext) {
161+
this.onNext = onNext;
162+
}
163+
}
164+
165+
private static final class Cancelled {
166+
}
167+
168+
private static <T> T requireNonNull(T o) {
169+
if (o == null) {
170+
throw new NullPointerException();
171+
}
172+
return o;
173+
}
174+
}
175+
}

0 commit comments

Comments
 (0)