Skip to content

Commit ed9465a

Browse files
committed
Synch docs and examples with spec - request(long)
Refs #62, which changed Subscription API in 718c4d4
1 parent 8d3a0b3 commit ed9465a

File tree

4 files changed

+18
-18
lines changed

4 files changed

+18
-18
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public interface Subscriber<T> {
125125

126126
```java
127127
public interface Subscription {
128-
public void request(int n);
128+
public void request(long n);
129129
public void cancel();
130130
}
131131
````

api/src/examples/java/org/reactivestreams/example/multicast/StockPricePublisher.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package org.reactivestreams.example.multicast;
22

3-
import java.util.concurrent.atomic.AtomicInteger;
3+
import java.util.concurrent.atomic.AtomicLong;
44

55
import org.reactivestreams.Subscription;
66
import org.reactivestreams.Subscriber;
@@ -20,11 +20,11 @@ public class StockPricePublisher implements Publisher<Stock> {
2020
public void subscribe(final Subscriber<Stock> s) {
2121
s.onSubscribe(new Subscription() {
2222

23-
AtomicInteger capacity = new AtomicInteger();
23+
AtomicLong capacity = new AtomicLong();
2424
EventHandler handler = new EventHandler(s, capacity);
2525

2626
@Override
27-
public void request(int n) {
27+
public void request(long n) {
2828
if (capacity.getAndAdd(n) == 0) {
2929
// was at 0, so start up consumption again
3030
startConsuming();
@@ -47,16 +47,16 @@ public void startConsuming() {
4747

4848
private static final class EventHandler implements Handler {
4949
private final Subscriber<Stock> s;
50-
private final AtomicInteger capacity;
50+
private final AtomicLong capacity;
5151

52-
private EventHandler(Subscriber<Stock> s, AtomicInteger capacity) {
52+
private EventHandler(Subscriber<Stock> s, AtomicLong capacity) {
5353
this.s = s;
5454
this.capacity = capacity;
5555
}
5656

5757
@Override
5858
public void handle(Stock event) {
59-
int c = capacity.get();
59+
long c = capacity.get();
6060
if (c == 0) {
6161
// shortcut instead of doing decrement/increment loops while no capacity
6262
return;

api/src/examples/java/org/reactivestreams/example/unicast/InfiniteIncrementNumberPublisher.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public void subscribe(final Subscriber<Integer> s) {
1818
AtomicInteger capacity = new AtomicInteger();
1919

2020
@Override
21-
public void request(int n) {
21+
public void request(long n) {
2222
System.out.println("signalAdditionalDemand => " + n);
2323
if (capacity.getAndAdd(n) == 0) {
2424
// start sending again if it wasn't already running

api/src/main/java/org/reactivestreams/Subscriber.java

+10-10
Original file line numberDiff line numberDiff line change
@@ -3,35 +3,35 @@
33
/**
44
* Will receive call to {@link #onSubscribe(Subscription)} once after passing an instance of {@link Subscriber} to {@link Publisher#subscribe(Subscriber)}.
55
* <p>
6-
* No further notifications will be received until {@link Subscription#request(int)} is called.
6+
* No further notifications will be received until {@link Subscription#request(long)} is called.
77
* <p>
88
* After signaling demand:
99
* <ul>
10-
* <li>One or more invocations of {@link #onNext(Object)} up to the maximum number defined by {@link Subscription#request(int)}</li>
10+
* <li>One or more invocations of {@link #onNext(Object)} up to the maximum number defined by {@link Subscription#request(long)}</li>
1111
* <li>Single invocation of {@link #onError(Throwable)} or {@link Subscriber#onComplete()} which signals a terminal state after which no further events will be sent.
1212
* </ul>
1313
* <p>
14-
* Demand can be signaled via {@link Subscription#request(int)} whenever the {@link Subscriber} instance is capable of handling more.
14+
* Demand can be signaled via {@link Subscription#request(long)} whenever the {@link Subscriber} instance is capable of handling more.
1515
*
1616
* @param <T> the Type of element signaled.
1717
*/
1818
public interface Subscriber<T> {
1919
/**
2020
* Invoked after calling {@link Publisher#subscribe(Subscriber)}.
2121
* <p>
22-
* No data will start flowing until {@link Subscription#request(int)} is invoked.
22+
* No data will start flowing until {@link Subscription#request(long)} is invoked.
2323
* <p>
24-
* It is the responsibility of this {@link Subscriber} instance to call {@link Subscription#request(int)} whenever more data is wanted.
24+
* It is the responsibility of this {@link Subscriber} instance to call {@link Subscription#request(long)} whenever more data is wanted.
2525
* <p>
26-
* The {@link Publisher} will send notifications only in response to {@link Subscription#request(int)}.
26+
* The {@link Publisher} will send notifications only in response to {@link Subscription#request(long)}.
2727
*
2828
* @param s
29-
* {@link Subscription} that allows requesting data via {@link Subscription#request(int)}
29+
* {@link Subscription} that allows requesting data via {@link Subscription#request(long)}
3030
*/
3131
public void onSubscribe(Subscription s);
3232

3333
/**
34-
* Data notification sent by the {@link Publisher} in response to requests to {@link Subscription#request(int)}.
34+
* Data notification sent by the {@link Publisher} in response to requests to {@link Subscription#request(long)}.
3535
*
3636
* @param t the element signaled
3737
*/
@@ -40,7 +40,7 @@ public interface Subscriber<T> {
4040
/**
4141
* Failed terminal state.
4242
* <p>
43-
* No further events will be sent even if {@link Subscription#request(int)} is invoked again.
43+
* No further events will be sent even if {@link Subscription#request(long)} is invoked again.
4444
*
4545
* @param t the throwable signaled
4646
*/
@@ -49,7 +49,7 @@ public interface Subscriber<T> {
4949
/**
5050
* Successful terminal state.
5151
* <p>
52-
* No further events will be sent even if {@link Subscription#request(int)} is invoked again.
52+
* No further events will be sent even if {@link Subscription#request(long)} is invoked again.
5353
*/
5454
public void onComplete();
5555
}

0 commit comments

Comments
 (0)