Skip to content

Commit 88fcda3

Browse files
Infinite request with Long.MAX_VALUE instead of -1
Migrating to this after discussions at reactive-streams/reactive-streams-jvm#62
1 parent 03311a9 commit 88fcda3

File tree

9 files changed

+20
-20
lines changed

9 files changed

+20
-20
lines changed

rxjava-core/src/main/java/rx/Subscriber.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -126,9 +126,9 @@ public final void setProducer(Producer producer) {
126126
if (setProducer) {
127127
op.setProducer(p);
128128
} else {
129-
// we execute the request with whatever has been requested (or -1)
129+
// we execute the request with whatever has been requested (or Long.MAX_VALUE)
130130
if (toRequest == Long.MIN_VALUE) {
131-
p.request(-1);
131+
p.request(Long.MAX_VALUE);
132132
} else {
133133
p.request(toRequest);
134134
}

rxjava-core/src/main/java/rx/internal/operators/OnSubscribeFromIterable.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ private IterableProducer(Subscriber<? super T> o, Iterator<? extends T> it) {
6262

6363
@Override
6464
public void request(long n) {
65-
if (n < 0) {
65+
if (n == Long.MAX_VALUE) {
6666
// fast-path without backpressure
6767
while (it.hasNext()) {
6868
if (o.isUnsubscribed()) {
@@ -71,7 +71,7 @@ public void request(long n) {
7171
o.onNext(it.next());
7272
}
7373
o.onCompleted();
74-
} else {
74+
} else if(n > 0) {
7575
// backpressure is requested
7676
long _c = REQUESTED_UPDATER.getAndAdd(this, n);
7777
if (_c == 0) {

rxjava-core/src/main/java/rx/internal/operators/OnSubscribeRange.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ private RangeProducer(Subscriber<? super Integer> o, int start, int end) {
5656

5757
@Override
5858
public void request(long n) {
59-
if (n < 0) {
59+
if (n == Long.MAX_VALUE) {
6060
// fast-path without backpressure
6161
for (long i = index; i <= end; i++) {
6262
if (o.isUnsubscribed()) {

rxjava-core/src/main/java/rx/internal/operators/OperatorMerge.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -391,8 +391,8 @@ public MergeProducer(MergeSubscriber<T> ms) {
391391

392392
@Override
393393
public void request(long n) {
394-
if (n < 0) {
395-
requested = -1;
394+
if (n == Long.MAX_VALUE) {
395+
requested = Long.MAX_VALUE;
396396
} else {
397397
REQUESTED.getAndAdd(this, n);
398398
ms.drainQueuesIfNeeded();

rxjava-core/src/main/java/rx/internal/operators/OperatorSkip.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -67,17 +67,17 @@ protected Producer onSetProducer(final Producer producer) {
6767

6868
@Override
6969
public void request(long n) {
70-
// add the skip num to the requested amount, since we'll skip everything and then emit to the buffer downstream
71-
if (n > 0) {
72-
producer.request(n + (toSkip - skipped));
73-
} else {
70+
if (n == Long.MAX_VALUE) {
7471
// infinite so leave it alone
7572
producer.request(n);
73+
} else if (n > 0) {
74+
// add the skip num to the requested amount, since we'll skip everything and then emit to the buffer downstream
75+
producer.request(n + (toSkip - skipped));
7676
}
7777
}
7878
};
7979
}
80-
80+
8181
};
8282
}
8383
}

rxjava-core/src/main/java/rx/internal/operators/OperatorTakeLast.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
5252
@Override
5353
public void onStart() {
5454
// we do this to break the chain of the child subscriber being passed through
55-
request(-1);
55+
request(Long.MAX_VALUE);
5656
}
5757

5858
@Override
@@ -109,8 +109,8 @@ void startEmitting() {
109109
@Override
110110
public void request(long n) {
111111
long _c = 0;
112-
if (n < 0) {
113-
requested = -1;
112+
if (n == Long.MAX_VALUE) {
113+
requested = Long.MAX_VALUE;
114114
} else {
115115
_c = REQUESTED_UPDATER.getAndAdd(this, n);
116116
}

rxjava-core/src/test/java/rx/SubscriberTest.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public void request(long n) {
6464
}
6565

6666
});
67-
assertEquals(-1, r.get());
67+
assertEquals(Long.MAX_VALUE, r.get());
6868
}
6969

7070
@Test
@@ -154,8 +154,8 @@ public void request(long n) {
154154
}
155155

156156
});
157-
// this will be -1 because it is decoupled and nothing requsted on the Operator subscriber
158-
assertEquals(-1, r.get());
157+
// this will be Long.MAX_VALUE because it is decoupled and nothing requsted on the Operator subscriber
158+
assertEquals(Long.MAX_VALUE, r.get());
159159
}
160160

161161
@Test

rxjava-core/src/test/java/rx/internal/operators/OnSubscribeFromIterableTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ public void testNoBackpressure() {
143143
OnSubscribeFromIterable<Integer> o = new OnSubscribeFromIterable<Integer>(Arrays.asList(1, 2, 3, 4, 5));
144144
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
145145
ts.assertReceivedOnNext(Collections.<Integer> emptyList());
146-
ts.request(-1); // infinite
146+
ts.request(Long.MAX_VALUE); // infinite
147147
o.call(ts);
148148
ts.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5));
149149
ts.assertTerminalEvent();

rxjava-core/src/test/java/rx/internal/operators/OnSubscribeRangeTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ public void testNoBackpressure() {
126126
OnSubscribeRange o = new OnSubscribeRange(1, list.size());
127127
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
128128
ts.assertReceivedOnNext(Collections.<Integer> emptyList());
129-
ts.request(-1); // infinite
129+
ts.request(Long.MAX_VALUE); // infinite
130130
o.call(ts);
131131
ts.assertReceivedOnNext(list);
132132
ts.assertTerminalEvent();

0 commit comments

Comments
 (0)