-
Notifications
You must be signed in to change notification settings - Fork 534
Fix missing cancel() in tests that don't consume the entire source #346
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -212,17 +212,20 @@ public void required_spec101_subscriptionRequestMustResultInTheCorrectNumberOfPr | |
public void run(Publisher<T> pub) throws InterruptedException { | ||
|
||
ManualSubscriber<T> sub = env.newManualSubscriber(pub); | ||
|
||
sub.expectNone(String.format("Publisher %s produced value before the first `request`: ", pub)); | ||
sub.request(1); | ||
sub.nextElement(String.format("Publisher %s produced no element after first `request`", pub)); | ||
sub.expectNone(String.format("Publisher %s produced unrequested: ", pub)); | ||
|
||
sub.request(1); | ||
sub.request(2); | ||
sub.nextElements(3, env.defaultTimeoutMillis(), String.format("Publisher %s produced less than 3 elements after two respective `request` calls", pub)); | ||
|
||
sub.expectNone(String.format("Publisher %sproduced unrequested ", pub)); | ||
try { | ||
sub.expectNone(String.format("Publisher %s produced value before the first `request`: ", pub)); | ||
sub.request(1); | ||
sub.nextElement(String.format("Publisher %s produced no element after first `request`", pub)); | ||
sub.expectNone(String.format("Publisher %s produced unrequested: ", pub)); | ||
|
||
sub.request(1); | ||
sub.request(2); | ||
sub.nextElements(3, env.defaultTimeoutMillis(), String.format("Publisher %s produced less than 3 elements after two respective `request` calls", pub)); | ||
|
||
sub.expectNone(String.format("Publisher %sproduced unrequested ", pub)); | ||
} finally { | ||
sub.cancel(); | ||
} | ||
} | ||
}); | ||
} | ||
|
@@ -486,30 +489,39 @@ public void required_spec109_mustIssueOnSubscribeForNonNullSubscriber() throws T | |
@Override | ||
public void run(Publisher<T> pub) throws Throwable { | ||
final Latch onSubscribeLatch = new Latch(env); | ||
pub.subscribe(new Subscriber<T>() { | ||
@Override | ||
public void onError(Throwable cause) { | ||
onSubscribeLatch.assertClosed("onSubscribe should be called prior to onError always"); | ||
} | ||
|
||
@Override | ||
public void onSubscribe(Subscription subs) { | ||
onSubscribeLatch.assertOpen("Only one onSubscribe call expected"); | ||
onSubscribeLatch.close(); | ||
} | ||
|
||
@Override | ||
public void onNext(T elem) { | ||
onSubscribeLatch.assertClosed("onSubscribe should be called prior to onNext always"); | ||
} | ||
|
||
@Override | ||
public void onComplete() { | ||
onSubscribeLatch.assertClosed("onSubscribe should be called prior to onComplete always"); | ||
final AtomicReference<Subscription> cancel = new AtomicReference<Subscription>(); | ||
try { | ||
pub.subscribe(new Subscriber<T>() { | ||
@Override | ||
public void onError(Throwable cause) { | ||
onSubscribeLatch.assertClosed("onSubscribe should be called prior to onError always"); | ||
} | ||
|
||
@Override | ||
public void onSubscribe(Subscription subs) { | ||
cancel.set(subs); | ||
onSubscribeLatch.assertOpen("Only one onSubscribe call expected"); | ||
onSubscribeLatch.close(); | ||
} | ||
|
||
@Override | ||
public void onNext(T elem) { | ||
onSubscribeLatch.assertClosed("onSubscribe should be called prior to onNext always"); | ||
} | ||
|
||
@Override | ||
public void onComplete() { | ||
onSubscribeLatch.assertClosed("onSubscribe should be called prior to onComplete always"); | ||
} | ||
}); | ||
onSubscribeLatch.expectClose("Should have received onSubscribe"); | ||
env.verifyNoAsyncErrorsNoDelay(); | ||
} finally { | ||
Subscription s = cancel.getAndSet(null); | ||
if (s != null) { | ||
s.cancel(); | ||
} | ||
}); | ||
onSubscribeLatch.expectClose("Should have received onSubscribe"); | ||
env.verifyNoAsyncErrorsNoDelay(); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
} | ||
}); | ||
} | ||
|
@@ -560,7 +572,15 @@ public void run(Publisher<T> pub) throws Throwable { | |
ManualSubscriber<T> sub1 = env.newManualSubscriber(pub); | ||
ManualSubscriber<T> sub2 = env.newManualSubscriber(pub); | ||
|
||
env.verifyNoAsyncErrors(); | ||
try { | ||
env.verifyNoAsyncErrors(); | ||
} finally { | ||
try { | ||
sub1.cancel(); | ||
} finally { | ||
sub2.cancel(); | ||
} | ||
} | ||
} | ||
}); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,176 @@ | ||
/************************************************************************ | ||
* Licensed under Public Domain (CC0) * | ||
* * | ||
* To the extent possible under law, the person who associated CC0 with * | ||
* this code has waived all copyright and related or neighboring * | ||
* rights to this code. * | ||
* * | ||
* You should have received a copy of the CC0 legalcode along with this * | ||
* work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.* | ||
************************************************************************/ | ||
|
||
package org.reactivestreams.tck; | ||
|
||
import java.util.Map; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.atomic.*; | ||
|
||
import org.reactivestreams.*; | ||
import org.testng.annotations.*; | ||
|
||
@Test | ||
public class RangePublisherTest extends PublisherVerification<Integer> { | ||
|
||
static final Map<Integer, StackTraceElement[]> stacks = new ConcurrentHashMap<Integer, StackTraceElement[]>(); | ||
|
||
static final Map<Integer, Boolean> states = new ConcurrentHashMap<Integer, Boolean>(); | ||
|
||
static final AtomicInteger id = new AtomicInteger(); | ||
|
||
@AfterClass | ||
public static void afterClass() { | ||
boolean fail = false; | ||
StringBuilder b = new StringBuilder(); | ||
for (Map.Entry<Integer, Boolean> t : states.entrySet()) { | ||
if (!t.getValue()) { | ||
b.append("\r\n-------------------------------"); | ||
for (Object o : stacks.get(t.getKey())) { | ||
b.append("\r\nat ").append(o); | ||
} | ||
fail = true; | ||
} | ||
} | ||
if (fail) { | ||
throw new AssertionError("Cancellations were missing:" + b); | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Useful, thanks There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice work, @akarnokd! |
||
|
||
public RangePublisherTest() { | ||
super(new TestEnvironment()); | ||
} | ||
|
||
@Override | ||
public Publisher<Integer> createPublisher(long elements) { | ||
return new RangePublisher(1, elements); | ||
} | ||
|
||
@Override | ||
public Publisher<Integer> createFailedPublisher() { | ||
return null; | ||
} | ||
|
||
static final class RangePublisher | ||
implements Publisher<Integer> { | ||
|
||
final StackTraceElement[] stacktrace; | ||
|
||
final long start; | ||
|
||
final long count; | ||
|
||
RangePublisher(long start, long count) { | ||
this.stacktrace = Thread.currentThread().getStackTrace(); | ||
this.start = start; | ||
this.count = count; | ||
} | ||
|
||
@Override | ||
public void subscribe(Subscriber<? super Integer> s) { | ||
if (s == null) { | ||
throw new NullPointerException(); | ||
} | ||
|
||
int ids = id.incrementAndGet(); | ||
|
||
RangeSubscription parent = new RangeSubscription(s, ids, start, start + count); | ||
stacks.put(ids, stacktrace); | ||
states.put(ids, false); | ||
s.onSubscribe(parent); | ||
} | ||
|
||
static final class RangeSubscription extends AtomicLong implements Subscription { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it be of greater utility to have it be an IteratorSubscription? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The main purpose was to track cancellation/completion in response to the TCK and this was the simplest multi-valued Working with an iterator requires a lot of try-catches around the method calls to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @akarnokd Fair enough, we can leave that to another time. :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess you meant an example There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @akarnokd No, not an example. I thought it might be iteresting for the TCK to have a sync publisher which can be supplied an Iterable :) |
||
|
||
private static final long serialVersionUID = 9066221863682220604L; | ||
|
||
final Subscriber<? super Integer> actual; | ||
|
||
final int ids; | ||
|
||
final long end; | ||
|
||
long index; | ||
|
||
volatile boolean cancelled; | ||
|
||
RangeSubscription(Subscriber<? super Integer> actual, int ids, long start, long end) { | ||
this.actual = actual; | ||
this.ids = ids; | ||
this.index = start; | ||
this.end = end; | ||
} | ||
|
||
@Override | ||
public void request(long n) { | ||
if (!cancelled) { | ||
if (n <= 0L) { | ||
cancelled = true; | ||
states.put(ids, true); | ||
actual.onError(new IllegalArgumentException("§3.9 violated")); | ||
return; | ||
} | ||
|
||
for (;;) { | ||
long r = get(); | ||
long u = r + n; | ||
if (u < 0L) { | ||
u = Long.MAX_VALUE; | ||
} | ||
if (compareAndSet(r, u)) { | ||
if (r == 0) { | ||
break; | ||
} | ||
return; | ||
} | ||
} | ||
|
||
long idx = index; | ||
long f = end; | ||
|
||
for (;;) { | ||
long e = 0; | ||
while (e != n && idx != f) { | ||
if (cancelled) { | ||
return; | ||
} | ||
|
||
actual.onNext((int)idx); | ||
|
||
idx++; | ||
e++; | ||
} | ||
|
||
if (idx == f) { | ||
if (!cancelled) { | ||
states.put(ids, true); | ||
actual.onComplete(); | ||
} | ||
return; | ||
} | ||
|
||
index = idx; | ||
n = addAndGet(-n); | ||
if (n == 0) { | ||
break; | ||
} | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
public void cancel() { | ||
cancelled = true; | ||
states.put(ids, true); | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍