Skip to content

remove requirement to track demand beyond Long.MAX_VALUE #203

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 23, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 52 additions & 52 deletions README.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,11 @@ final class SubscriptionImpl implements Subscription, Runnable {
private void doRequest(final long n) {
if (n < 1)
terminateDueTo(new IllegalArgumentException(subscriber + " violated the Reactive Streams rule 3.9 by requesting a non-positive number of elements."));
else if (demand + n < 1)
terminateDueTo(new IllegalStateException(subscriber + " violated the Reactive Streams rule 3.17 by demanding more elements than Long.MAX_VALUE."));
else {
else if (demand + n < 1) {
// As governed by rule 3.17, when demand overflows `Long.MAX_VALUE` we treat the signalled demand as "effectively unbounded"
demand = Long.MAX_VALUE; // Here we protect from the overflow and treat it as "effectively unbounded"
doSend(); // Then we proceed with sending data downstream
} else {
demand += n; // Here we record the downstream demand
doSend(); // Then we can proceed with sending data downstream
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,8 +362,8 @@ public void required_spec317_mustSupportACumulativePendingElementCountUpToLongMa
}

@Override @Test
public void required_spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable {
publisherVerification.required_spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue();
public void required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable {
publisherVerification.required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue();
}

// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -940,25 +940,30 @@ public void required_spec317_mustSupportACumulativePendingElementCountUpToLongMa
activePublisherTest(totalElements, true, new PublisherTestRun<T>() {
@Override
public void run(Publisher<T> pub) throws Throwable {
ManualSubscriber<T> sub = env.newManualSubscriber(pub);
final ManualSubscriber<T> sub = env.newManualSubscriber(pub);
sub.request(Long.MAX_VALUE / 2); // pending = Long.MAX_VALUE / 2
sub.request(Long.MAX_VALUE / 2); // pending = Long.MAX_VALUE - 1
sub.request(1); // pending = Long.MAX_VALUE

sub.nextElements(totalElements);
sub.expectCompletion();

env.verifyNoAsyncErrors();
try {
env.verifyNoAsyncErrors();
} finally {
sub.cancel();
}

}
});
}

// Verifies rule: https://github.com/reactive-streams/reactive-streams#3.17
@Override @Test
public void required_spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable {
public void required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable {
activePublisherTest(Integer.MAX_VALUE, false, new PublisherTestRun<T>() {
@Override public void run(Publisher<T> pub) throws Throwable {
ManualSubscriberWithSubscriptionSupport<T> sub = new BlackholeSubscriberWithSubscriptionSupport<T>(env) {
final ManualSubscriberWithSubscriptionSupport<T> sub = new BlackholeSubscriberWithSubscriptionSupport<T>(env) {
// arbitrarily set limit on nuber of request calls signalled, we expect overflow after already 2 calls,
// so 10 is relatively high and safe even if arbitrarily chosen
int callsCounter = 10;
Expand All @@ -982,10 +987,12 @@ public void onNext(T element) {
// we're pretty sure to overflow from those
sub.request(1);

sub.expectErrorWithMessage(IllegalStateException.class, "3.17");

// onError must be signalled only once, even with in-flight other request() messages that would trigger overflow again
env.verifyNoAsyncErrors(env.defaultTimeoutMillis());
// no onError should be signalled
try {
env.verifyNoAsyncErrors(env.defaultTimeoutMillis());
} finally {
sub.cancel();
}
}
});
}
Expand Down Expand Up @@ -1105,4 +1112,4 @@ public void notVerified(String message) {
throw new SkipException(message);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,5 @@ public interface PublisherVerificationRules {
void required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber() throws Throwable;
void required_spec317_mustSupportAPendingElementCountUpToLongMaxValue() throws Throwable;
void required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue() throws Throwable;
void required_spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable;
void required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable;
}
Original file line number Diff line number Diff line change
Expand Up @@ -473,17 +473,7 @@ public void required_spec317_mustSupportAPendingElementCountUpToLongMaxValue_sho
}

@Test
public void required_spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue_shouldFail_onAsynchDemandIgnoringPublisher() throws Throwable {
final ExecutorService signallersPool = Executors.newFixedThreadPool(2);
requireTestFailure(new ThrowingRunnable() {
@Override public void run() throws Throwable {
demandIgnoringAsynchronousPublisherVerification(signallersPool).required_spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue();
}
}, "Expected onError(java.lang.IllegalStateException)");
}

@Test
public void required_spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue_shouldFail_onSynchDemandIgnoringPublisher() throws Throwable {
public void required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue_shouldFail_onSynchOverflowingPublisher() throws Throwable {
requireTestFailure(new ThrowingRunnable() {
@Override public void run() throws Throwable {
customPublisherVerification(new Publisher<Integer>() {
Expand All @@ -494,20 +484,26 @@ public void required_spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue_shoul
@Override public void request(long n) {
// it does not protect from demand overflow!
demand += n;
if (demand < 0) {
// overflow
s.onError(new IllegalStateException("Illegally signalling onError (violates rule 3.17)")); // Illegally signal error
} else {
s.onNext(0);
}
}

@Override public void cancel() {
// noop
}
});
}
}).required_spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue();
}).required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue();
}
}, "Expected onError(java.lang.IllegalStateException)");
}, "Async error during test execution: Illegally signalling onError (violates rule 3.17)");
}

@Test
public void required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue_shouldFail_overflowingDemand() throws Throwable {
public void required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue_shouldFailWhenErrorSignalledOnceMaxValueReached() throws Throwable {
requireTestFailure(new ThrowingRunnable() {
@Override public void run() throws Throwable {
customPublisherVerification(new Publisher<Integer>() {
Expand All @@ -520,15 +516,16 @@ public void required_spec317_mustSupportACumulativePendingElementCountUpToLongMa

// this is a mistake, it should still be able to accumulate such demand
if (demand == Long.MAX_VALUE)
s.onError(new IllegalStateException("I'm signalling onError too soon! Cumulative demand equal to Long.MAX_VALUE is OK by the spec."));
s.onError(new IllegalStateException("Illegally signalling onError too soon! " +
"Cumulative demand equal to Long.MAX_VALUE is legal."));

s.onNext(0);
}
});
}
}).required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue();
}
}, "Async error during test execution: I'm signalling onError too soon!");
}, "Async error during test execution: Illegally signalling onError too soon!");
}


Expand Down