Skip to content

Commit 74ae8b8

Browse files
committed
Fix missing cancel() from in tests that don't consume the entire source
1 parent cdedc8f commit 74ae8b8

File tree

2 files changed

+231
-35
lines changed

2 files changed

+231
-35
lines changed

tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java

+55-35
Original file line numberDiff line numberDiff line change
@@ -212,17 +212,20 @@ public void required_spec101_subscriptionRequestMustResultInTheCorrectNumberOfPr
212212
public void run(Publisher<T> pub) throws InterruptedException {
213213

214214
ManualSubscriber<T> sub = env.newManualSubscriber(pub);
215-
216-
sub.expectNone(String.format("Publisher %s produced value before the first `request`: ", pub));
217-
sub.request(1);
218-
sub.nextElement(String.format("Publisher %s produced no element after first `request`", pub));
219-
sub.expectNone(String.format("Publisher %s produced unrequested: ", pub));
220-
221-
sub.request(1);
222-
sub.request(2);
223-
sub.nextElements(3, env.defaultTimeoutMillis(), String.format("Publisher %s produced less than 3 elements after two respective `request` calls", pub));
224-
225-
sub.expectNone(String.format("Publisher %sproduced unrequested ", pub));
215+
try {
216+
sub.expectNone(String.format("Publisher %s produced value before the first `request`: ", pub));
217+
sub.request(1);
218+
sub.nextElement(String.format("Publisher %s produced no element after first `request`", pub));
219+
sub.expectNone(String.format("Publisher %s produced unrequested: ", pub));
220+
221+
sub.request(1);
222+
sub.request(2);
223+
sub.nextElements(3, env.defaultTimeoutMillis(), String.format("Publisher %s produced less than 3 elements after two respective `request` calls", pub));
224+
225+
sub.expectNone(String.format("Publisher %sproduced unrequested ", pub));
226+
} finally {
227+
sub.cancel();
228+
}
226229
}
227230
});
228231
}
@@ -486,30 +489,39 @@ public void required_spec109_mustIssueOnSubscribeForNonNullSubscriber() throws T
486489
@Override
487490
public void run(Publisher<T> pub) throws Throwable {
488491
final Latch onSubscribeLatch = new Latch(env);
489-
pub.subscribe(new Subscriber<T>() {
490-
@Override
491-
public void onError(Throwable cause) {
492-
onSubscribeLatch.assertClosed("onSubscribe should be called prior to onError always");
493-
}
494-
495-
@Override
496-
public void onSubscribe(Subscription subs) {
497-
onSubscribeLatch.assertOpen("Only one onSubscribe call expected");
498-
onSubscribeLatch.close();
499-
}
500-
501-
@Override
502-
public void onNext(T elem) {
503-
onSubscribeLatch.assertClosed("onSubscribe should be called prior to onNext always");
504-
}
505-
506-
@Override
507-
public void onComplete() {
508-
onSubscribeLatch.assertClosed("onSubscribe should be called prior to onComplete always");
492+
final AtomicReference<Subscription> cancel = new AtomicReference<Subscription>();
493+
try {
494+
pub.subscribe(new Subscriber<T>() {
495+
@Override
496+
public void onError(Throwable cause) {
497+
onSubscribeLatch.assertClosed("onSubscribe should be called prior to onError always");
498+
}
499+
500+
@Override
501+
public void onSubscribe(Subscription subs) {
502+
cancel.set(subs);
503+
onSubscribeLatch.assertOpen("Only one onSubscribe call expected");
504+
onSubscribeLatch.close();
505+
}
506+
507+
@Override
508+
public void onNext(T elem) {
509+
onSubscribeLatch.assertClosed("onSubscribe should be called prior to onNext always");
510+
}
511+
512+
@Override
513+
public void onComplete() {
514+
onSubscribeLatch.assertClosed("onSubscribe should be called prior to onComplete always");
515+
}
516+
});
517+
onSubscribeLatch.expectClose("Should have received onSubscribe");
518+
env.verifyNoAsyncErrorsNoDelay();
519+
} finally {
520+
Subscription s = cancel.getAndSet(null);
521+
if (s != null) {
522+
s.cancel();
509523
}
510-
});
511-
onSubscribeLatch.expectClose("Should have received onSubscribe");
512-
env.verifyNoAsyncErrorsNoDelay();
524+
}
513525
}
514526
});
515527
}
@@ -560,7 +572,15 @@ public void run(Publisher<T> pub) throws Throwable {
560572
ManualSubscriber<T> sub1 = env.newManualSubscriber(pub);
561573
ManualSubscriber<T> sub2 = env.newManualSubscriber(pub);
562574

563-
env.verifyNoAsyncErrors();
575+
try {
576+
env.verifyNoAsyncErrors();
577+
} finally {
578+
try {
579+
sub1.cancel();
580+
} finally {
581+
sub2.cancel();
582+
}
583+
}
564584
}
565585
});
566586
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
/************************************************************************
2+
* Licensed under Public Domain (CC0) *
3+
* *
4+
* To the extent possible under law, the person who associated CC0 with *
5+
* this code has waived all copyright and related or neighboring *
6+
* rights to this code. *
7+
* *
8+
* You should have received a copy of the CC0 legalcode along with this *
9+
* work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.*
10+
************************************************************************/
11+
12+
package org.reactivestreams.tck;
13+
14+
import java.util.Map;
15+
import java.util.concurrent.ConcurrentHashMap;
16+
import java.util.concurrent.atomic.*;
17+
18+
import org.reactivestreams.*;
19+
import org.testng.annotations.*;
20+
21+
@Test
22+
public class RangePublisherTest extends PublisherVerification<Integer> {
23+
24+
static final Map<Integer, StackTraceElement[]> stacks = new ConcurrentHashMap<Integer, StackTraceElement[]>();
25+
26+
static final Map<Integer, Boolean> states = new ConcurrentHashMap<Integer, Boolean>();
27+
28+
static final AtomicInteger id = new AtomicInteger();
29+
30+
@AfterClass
31+
public static void afterClass() {
32+
boolean fail = false;
33+
StringBuilder b = new StringBuilder();
34+
for (Map.Entry<Integer, Boolean> t : states.entrySet()) {
35+
if (!t.getValue()) {
36+
b.append("\r\n-------------------------------");
37+
for (Object o : stacks.get(t.getKey())) {
38+
b.append("\r\nat ").append(o);
39+
}
40+
fail = true;
41+
}
42+
}
43+
if (fail) {
44+
throw new AssertionError("Cancellations were missing:" + b);
45+
}
46+
}
47+
48+
public RangePublisherTest() {
49+
super(new TestEnvironment(25));
50+
}
51+
52+
@Override
53+
public Publisher<Integer> createPublisher(long elements) {
54+
return new RangePublisher(1, elements);
55+
}
56+
57+
@Override
58+
public Publisher<Integer> createFailedPublisher() {
59+
return null;
60+
}
61+
62+
static final class RangePublisher
63+
implements Publisher<Integer> {
64+
65+
final StackTraceElement[] stacktrace;
66+
67+
final long start;
68+
69+
final long count;
70+
71+
RangePublisher(long start, long count) {
72+
this.stacktrace = Thread.currentThread().getStackTrace();
73+
this.start = start;
74+
this.count = count;
75+
}
76+
77+
@Override
78+
public void subscribe(Subscriber<? super Integer> s) {
79+
if (s == null) {
80+
throw new NullPointerException();
81+
}
82+
83+
int ids = id.incrementAndGet();
84+
85+
RangeSubscription parent = new RangeSubscription(s, ids, start, start + count);
86+
stacks.put(ids, stacktrace);
87+
states.put(ids, false);
88+
s.onSubscribe(parent);
89+
}
90+
91+
static final class RangeSubscription extends AtomicLong implements Subscription {
92+
93+
private static final long serialVersionUID = 9066221863682220604L;
94+
95+
final Subscriber<? super Integer> actual;
96+
97+
final int ids;
98+
99+
final long end;
100+
101+
long index;
102+
103+
volatile boolean cancelled;
104+
105+
RangeSubscription(Subscriber<? super Integer> actual, int ids, long start, long end) {
106+
this.actual = actual;
107+
this.ids = ids;
108+
this.index = start;
109+
this.end = end;
110+
}
111+
112+
@Override
113+
public void request(long n) {
114+
if (!cancelled) {
115+
if (n <= 0L) {
116+
cancelled = true;
117+
states.put(ids, true);
118+
actual.onError(new IllegalArgumentException("§3.9 violated"));
119+
return;
120+
}
121+
122+
for (;;) {
123+
long r = get();
124+
long u = r + n;
125+
if (u < 0L) {
126+
u = Long.MAX_VALUE;
127+
}
128+
if (compareAndSet(r, u)) {
129+
if (r == 0) {
130+
break;
131+
}
132+
return;
133+
}
134+
}
135+
136+
long idx = index;
137+
long f = end;
138+
139+
for (;;) {
140+
long e = 0;
141+
while (e != n && idx != f) {
142+
if (cancelled) {
143+
return;
144+
}
145+
146+
actual.onNext((int)idx);
147+
148+
idx++;
149+
e++;
150+
}
151+
152+
if (idx == f) {
153+
if (!cancelled) {
154+
states.put(ids, true);
155+
actual.onComplete();
156+
}
157+
return;
158+
}
159+
160+
index = idx;
161+
n = addAndGet(-n);
162+
if (n == 0) {
163+
break;
164+
}
165+
}
166+
}
167+
}
168+
169+
@Override
170+
public void cancel() {
171+
cancelled = true;
172+
states.put(ids, true);
173+
}
174+
}
175+
}
176+
}

0 commit comments

Comments
 (0)