Skip to content

Synch docs and examples with spec - request(long) #93

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public interface Subscriber<T> {

```java
public interface Subscription {
public void request(int n);
public void request(long n);
public void cancel();
}
````
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.reactivestreams.example.multicast;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.reactivestreams.Subscription;
import org.reactivestreams.Subscriber;
Expand All @@ -20,11 +20,11 @@ public class StockPricePublisher implements Publisher<Stock> {
public void subscribe(final Subscriber<Stock> s) {
s.onSubscribe(new Subscription() {

AtomicInteger capacity = new AtomicInteger();
AtomicLong capacity = new AtomicLong();
EventHandler handler = new EventHandler(s, capacity);

@Override
public void request(int n) {
public void request(long n) {
if (capacity.getAndAdd(n) == 0) {
// was at 0, so start up consumption again
startConsuming();
Expand All @@ -47,16 +47,16 @@ public void startConsuming() {

private static final class EventHandler implements Handler {
private final Subscriber<Stock> s;
private final AtomicInteger capacity;
private final AtomicLong capacity;

private EventHandler(Subscriber<Stock> s, AtomicInteger capacity) {
private EventHandler(Subscriber<Stock> s, AtomicLong capacity) {
this.s = s;
this.capacity = capacity;
}

@Override
public void handle(Stock event) {
int c = capacity.get();
long c = capacity.get();
if (c == 0) {
// shortcut instead of doing decrement/increment loops while no capacity
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public void subscribe(final Subscriber<Integer> s) {
AtomicInteger capacity = new AtomicInteger();

@Override
public void request(int n) {
public void request(long n) {
System.out.println("signalAdditionalDemand => " + n);
if (capacity.getAndAdd(n) == 0) {
// start sending again if it wasn't already running
Expand Down
20 changes: 10 additions & 10 deletions api/src/main/java/org/reactivestreams/Subscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,35 @@
/**
* Will receive call to {@link #onSubscribe(Subscription)} once after passing an instance of {@link Subscriber} to {@link Publisher#subscribe(Subscriber)}.
* <p>
* No further notifications will be received until {@link Subscription#request(int)} is called.
* No further notifications will be received until {@link Subscription#request(long)} is called.
* <p>
* After signaling demand:
* <ul>
* <li>One or more invocations of {@link #onNext(Object)} up to the maximum number defined by {@link Subscription#request(int)}</li>
* <li>One or more invocations of {@link #onNext(Object)} up to the maximum number defined by {@link Subscription#request(long)}</li>
* <li>Single invocation of {@link #onError(Throwable)} or {@link Subscriber#onComplete()} which signals a terminal state after which no further events will be sent.
* </ul>
* <p>
* Demand can be signaled via {@link Subscription#request(int)} whenever the {@link Subscriber} instance is capable of handling more.
* Demand can be signaled via {@link Subscription#request(long)} whenever the {@link Subscriber} instance is capable of handling more.
*
* @param <T> the Type of element signaled.
*/
public interface Subscriber<T> {
/**
* Invoked after calling {@link Publisher#subscribe(Subscriber)}.
* <p>
* No data will start flowing until {@link Subscription#request(int)} is invoked.
* No data will start flowing until {@link Subscription#request(long)} is invoked.
* <p>
* It is the responsibility of this {@link Subscriber} instance to call {@link Subscription#request(int)} whenever more data is wanted.
* It is the responsibility of this {@link Subscriber} instance to call {@link Subscription#request(long)} whenever more data is wanted.
* <p>
* The {@link Publisher} will send notifications only in response to {@link Subscription#request(int)}.
* The {@link Publisher} will send notifications only in response to {@link Subscription#request(long)}.
*
* @param s
* {@link Subscription} that allows requesting data via {@link Subscription#request(int)}
* {@link Subscription} that allows requesting data via {@link Subscription#request(long)}
*/
public void onSubscribe(Subscription s);

/**
* Data notification sent by the {@link Publisher} in response to requests to {@link Subscription#request(int)}.
* Data notification sent by the {@link Publisher} in response to requests to {@link Subscription#request(long)}.
*
* @param t the element signaled
*/
Expand All @@ -40,7 +40,7 @@ public interface Subscriber<T> {
/**
* Failed terminal state.
* <p>
* No further events will be sent even if {@link Subscription#request(int)} is invoked again.
* No further events will be sent even if {@link Subscription#request(long)} is invoked again.
*
* @param t the throwable signaled
*/
Expand All @@ -49,7 +49,7 @@ public interface Subscriber<T> {
/**
* Successful terminal state.
* <p>
* No further events will be sent even if {@link Subscription#request(int)} is invoked again.
* No further events will be sent even if {@link Subscription#request(long)} is invoked again.
*/
public void onComplete();
}