Skip to content

Commit e521dbc

Browse files
committed
Merge pull request reactive-streams#203 from reactive-streams/wip-196-RK
remove requirement to track demand beyond Long.MAX_VALUE
2 parents 13ddd32 + a3005b7 commit e521dbc

File tree

6 files changed

+89
-83
lines changed

6 files changed

+89
-83
lines changed

README.md

+52-52
Large diffs are not rendered by default.

examples/src/main/java/org/reactivestreams/example/unicast/AsyncIterablePublisher.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,11 @@ final class SubscriptionImpl implements Subscription, Runnable {
8181
private void doRequest(final long n) {
8282
if (n < 1)
8383
terminateDueTo(new IllegalArgumentException(subscriber + " violated the Reactive Streams rule 3.9 by requesting a non-positive number of elements."));
84-
else if (demand + n < 1)
85-
terminateDueTo(new IllegalStateException(subscriber + " violated the Reactive Streams rule 3.17 by demanding more elements than Long.MAX_VALUE."));
86-
else {
84+
else if (demand + n < 1) {
85+
// As governed by rule 3.17, when demand overflows `Long.MAX_VALUE` we treat the signalled demand as "effectively unbounded"
86+
demand = Long.MAX_VALUE; // Here we protect from the overflow and treat it as "effectively unbounded"
87+
doSend(); // Then we proceed with sending data downstream
88+
} else {
8789
demand += n; // Here we record the downstream demand
8890
doSend(); // Then we can proceed with sending data downstream
8991
}

tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -362,8 +362,8 @@ public void required_spec317_mustSupportACumulativePendingElementCountUpToLongMa
362362
}
363363

364364
@Override @Test
365-
public void required_spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable {
366-
publisherVerification.required_spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue();
365+
public void required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable {
366+
publisherVerification.required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue();
367367
}
368368

369369
// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.4

tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java

+16-9
Original file line numberDiff line numberDiff line change
@@ -940,25 +940,30 @@ public void required_spec317_mustSupportACumulativePendingElementCountUpToLongMa
940940
activePublisherTest(totalElements, true, new PublisherTestRun<T>() {
941941
@Override
942942
public void run(Publisher<T> pub) throws Throwable {
943-
ManualSubscriber<T> sub = env.newManualSubscriber(pub);
943+
final ManualSubscriber<T> sub = env.newManualSubscriber(pub);
944944
sub.request(Long.MAX_VALUE / 2); // pending = Long.MAX_VALUE / 2
945945
sub.request(Long.MAX_VALUE / 2); // pending = Long.MAX_VALUE - 1
946946
sub.request(1); // pending = Long.MAX_VALUE
947947

948948
sub.nextElements(totalElements);
949949
sub.expectCompletion();
950950

951-
env.verifyNoAsyncErrors();
951+
try {
952+
env.verifyNoAsyncErrors();
953+
} finally {
954+
sub.cancel();
955+
}
956+
952957
}
953958
});
954959
}
955960

956961
// Verifies rule: https://github.com/reactive-streams/reactive-streams#3.17
957962
@Override @Test
958-
public void required_spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable {
963+
public void required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable {
959964
activePublisherTest(Integer.MAX_VALUE, false, new PublisherTestRun<T>() {
960965
@Override public void run(Publisher<T> pub) throws Throwable {
961-
ManualSubscriberWithSubscriptionSupport<T> sub = new BlackholeSubscriberWithSubscriptionSupport<T>(env) {
966+
final ManualSubscriberWithSubscriptionSupport<T> sub = new BlackholeSubscriberWithSubscriptionSupport<T>(env) {
962967
// arbitrarily set limit on nuber of request calls signalled, we expect overflow after already 2 calls,
963968
// so 10 is relatively high and safe even if arbitrarily chosen
964969
int callsCounter = 10;
@@ -982,10 +987,12 @@ public void onNext(T element) {
982987
// we're pretty sure to overflow from those
983988
sub.request(1);
984989

985-
sub.expectErrorWithMessage(IllegalStateException.class, "3.17");
986-
987-
// onError must be signalled only once, even with in-flight other request() messages that would trigger overflow again
988-
env.verifyNoAsyncErrors(env.defaultTimeoutMillis());
990+
// no onError should be signalled
991+
try {
992+
env.verifyNoAsyncErrors(env.defaultTimeoutMillis());
993+
} finally {
994+
sub.cancel();
995+
}
989996
}
990997
});
991998
}
@@ -1105,4 +1112,4 @@ public void notVerified(String message) {
11051112
throw new SkipException(message);
11061113
}
11071114

1108-
}
1115+
}

tck/src/main/java/org/reactivestreams/tck/support/PublisherVerificationRules.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,5 +39,5 @@ public interface PublisherVerificationRules {
3939
void required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber() throws Throwable;
4040
void required_spec317_mustSupportAPendingElementCountUpToLongMaxValue() throws Throwable;
4141
void required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue() throws Throwable;
42-
void required_spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable;
42+
void required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable;
4343
}

tck/src/test/java/org/reactivestreams/tck/PublisherVerificationTest.java

+13-16
Original file line numberDiff line numberDiff line change
@@ -473,17 +473,7 @@ public void required_spec317_mustSupportAPendingElementCountUpToLongMaxValue_sho
473473
}
474474

475475
@Test
476-
public void required_spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue_shouldFail_onAsynchDemandIgnoringPublisher() throws Throwable {
477-
final ExecutorService signallersPool = Executors.newFixedThreadPool(2);
478-
requireTestFailure(new ThrowingRunnable() {
479-
@Override public void run() throws Throwable {
480-
demandIgnoringAsynchronousPublisherVerification(signallersPool).required_spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue();
481-
}
482-
}, "Expected onError(java.lang.IllegalStateException)");
483-
}
484-
485-
@Test
486-
public void required_spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue_shouldFail_onSynchDemandIgnoringPublisher() throws Throwable {
476+
public void required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue_shouldFail_onSynchOverflowingPublisher() throws Throwable {
487477
requireTestFailure(new ThrowingRunnable() {
488478
@Override public void run() throws Throwable {
489479
customPublisherVerification(new Publisher<Integer>() {
@@ -494,20 +484,26 @@ public void required_spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue_shoul
494484
@Override public void request(long n) {
495485
// it does not protect from demand overflow!
496486
demand += n;
487+
if (demand < 0) {
488+
// overflow
489+
s.onError(new IllegalStateException("Illegally signalling onError (violates rule 3.17)")); // Illegally signal error
490+
} else {
491+
s.onNext(0);
492+
}
497493
}
498494

499495
@Override public void cancel() {
500496
// noop
501497
}
502498
});
503499
}
504-
}).required_spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue();
500+
}).required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue();
505501
}
506-
}, "Expected onError(java.lang.IllegalStateException)");
502+
}, "Async error during test execution: Illegally signalling onError (violates rule 3.17)");
507503
}
508504

509505
@Test
510-
public void required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue_shouldFail_overflowingDemand() throws Throwable {
506+
public void required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue_shouldFailWhenErrorSignalledOnceMaxValueReached() throws Throwable {
511507
requireTestFailure(new ThrowingRunnable() {
512508
@Override public void run() throws Throwable {
513509
customPublisherVerification(new Publisher<Integer>() {
@@ -520,15 +516,16 @@ public void required_spec317_mustSupportACumulativePendingElementCountUpToLongMa
520516

521517
// this is a mistake, it should still be able to accumulate such demand
522518
if (demand == Long.MAX_VALUE)
523-
s.onError(new IllegalStateException("I'm signalling onError too soon! Cumulative demand equal to Long.MAX_VALUE is OK by the spec."));
519+
s.onError(new IllegalStateException("Illegally signalling onError too soon! " +
520+
"Cumulative demand equal to Long.MAX_VALUE is legal."));
524521

525522
s.onNext(0);
526523
}
527524
});
528525
}
529526
}).required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue();
530527
}
531-
}, "Async error during test execution: I'm signalling onError too soon!");
528+
}, "Async error during test execution: Illegally signalling onError too soon!");
532529
}
533530

534531

0 commit comments

Comments
 (0)