diff --git a/README.md b/README.md index 0dd91383..ec21d954 100644 --- a/README.md +++ b/README.md @@ -125,7 +125,7 @@ public interface Subscriber { ```java public interface Subscription { - public void request(int n); + public void request(long n); public void cancel(); } ```` diff --git a/api/src/examples/java/org/reactivestreams/example/multicast/StockPricePublisher.java b/api/src/examples/java/org/reactivestreams/example/multicast/StockPricePublisher.java index 78aa3e6f..a9fab1b4 100644 --- a/api/src/examples/java/org/reactivestreams/example/multicast/StockPricePublisher.java +++ b/api/src/examples/java/org/reactivestreams/example/multicast/StockPricePublisher.java @@ -1,6 +1,6 @@ package org.reactivestreams.example.multicast; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.reactivestreams.Subscription; import org.reactivestreams.Subscriber; @@ -20,11 +20,11 @@ public class StockPricePublisher implements Publisher { public void subscribe(final Subscriber s) { s.onSubscribe(new Subscription() { - AtomicInteger capacity = new AtomicInteger(); + AtomicLong capacity = new AtomicLong(); EventHandler handler = new EventHandler(s, capacity); @Override - public void request(int n) { + public void request(long n) { if (capacity.getAndAdd(n) == 0) { // was at 0, so start up consumption again startConsuming(); @@ -47,16 +47,16 @@ public void startConsuming() { private static final class EventHandler implements Handler { private final Subscriber s; - private final AtomicInteger capacity; + private final AtomicLong capacity; - private EventHandler(Subscriber s, AtomicInteger capacity) { + private EventHandler(Subscriber s, AtomicLong capacity) { this.s = s; this.capacity = capacity; } @Override public void handle(Stock event) { - int c = capacity.get(); + long c = capacity.get(); if (c == 0) { // shortcut instead of doing decrement/increment loops while no capacity return; diff --git a/api/src/examples/java/org/reactivestreams/example/unicast/InfiniteIncrementNumberPublisher.java b/api/src/examples/java/org/reactivestreams/example/unicast/InfiniteIncrementNumberPublisher.java index 450e0089..dc1a432e 100644 --- a/api/src/examples/java/org/reactivestreams/example/unicast/InfiniteIncrementNumberPublisher.java +++ b/api/src/examples/java/org/reactivestreams/example/unicast/InfiniteIncrementNumberPublisher.java @@ -18,7 +18,7 @@ public void subscribe(final Subscriber s) { AtomicInteger capacity = new AtomicInteger(); @Override - public void request(int n) { + public void request(long n) { System.out.println("signalAdditionalDemand => " + n); if (capacity.getAndAdd(n) == 0) { // start sending again if it wasn't already running diff --git a/api/src/main/java/org/reactivestreams/Subscriber.java b/api/src/main/java/org/reactivestreams/Subscriber.java index 66fdd142..f5cb9978 100644 --- a/api/src/main/java/org/reactivestreams/Subscriber.java +++ b/api/src/main/java/org/reactivestreams/Subscriber.java @@ -3,15 +3,15 @@ /** * Will receive call to {@link #onSubscribe(Subscription)} once after passing an instance of {@link Subscriber} to {@link Publisher#subscribe(Subscriber)}. *

- * No further notifications will be received until {@link Subscription#request(int)} is called. + * No further notifications will be received until {@link Subscription#request(long)} is called. *

* After signaling demand: *

    - *
  • One or more invocations of {@link #onNext(Object)} up to the maximum number defined by {@link Subscription#request(int)}
  • + *
  • One or more invocations of {@link #onNext(Object)} up to the maximum number defined by {@link Subscription#request(long)}
  • *
  • Single invocation of {@link #onError(Throwable)} or {@link Subscriber#onComplete()} which signals a terminal state after which no further events will be sent. *
*

- * Demand can be signaled via {@link Subscription#request(int)} whenever the {@link Subscriber} instance is capable of handling more. + * Demand can be signaled via {@link Subscription#request(long)} whenever the {@link Subscriber} instance is capable of handling more. * * @param the Type of element signaled. */ @@ -19,19 +19,19 @@ public interface Subscriber { /** * Invoked after calling {@link Publisher#subscribe(Subscriber)}. *

- * No data will start flowing until {@link Subscription#request(int)} is invoked. + * No data will start flowing until {@link Subscription#request(long)} is invoked. *

- * It is the responsibility of this {@link Subscriber} instance to call {@link Subscription#request(int)} whenever more data is wanted. + * It is the responsibility of this {@link Subscriber} instance to call {@link Subscription#request(long)} whenever more data is wanted. *

- * The {@link Publisher} will send notifications only in response to {@link Subscription#request(int)}. + * The {@link Publisher} will send notifications only in response to {@link Subscription#request(long)}. * * @param s - * {@link Subscription} that allows requesting data via {@link Subscription#request(int)} + * {@link Subscription} that allows requesting data via {@link Subscription#request(long)} */ public void onSubscribe(Subscription s); /** - * Data notification sent by the {@link Publisher} in response to requests to {@link Subscription#request(int)}. + * Data notification sent by the {@link Publisher} in response to requests to {@link Subscription#request(long)}. * * @param t the element signaled */ @@ -40,7 +40,7 @@ public interface Subscriber { /** * Failed terminal state. *

- * No further events will be sent even if {@link Subscription#request(int)} is invoked again. + * No further events will be sent even if {@link Subscription#request(long)} is invoked again. * * @param t the throwable signaled */ @@ -49,7 +49,7 @@ public interface Subscriber { /** * Successful terminal state. *

- * No further events will be sent even if {@link Subscription#request(int)} is invoked again. + * No further events will be sent even if {@link Subscription#request(long)} is invoked again. */ public void onComplete(); }