diff --git a/spi/src/examples/java/org/reactivestreams/example/multicast/MulticastExample.java b/spi/src/examples/java/org/reactivestreams/example/multicast/MulticastExample.java new file mode 100644 index 00000000..7c7b2a26 --- /dev/null +++ b/spi/src/examples/java/org/reactivestreams/example/multicast/MulticastExample.java @@ -0,0 +1,23 @@ +package org.reactivestreams.example.multicast; + +import org.reactivestreams.Publisher; + +public class MulticastExample { + + /** + * Each subscribe will join an existing stream. + * + * @param args + * @throws InterruptedException + */ + public static void main(String... args) throws InterruptedException { + Publisher dataStream = new StockPricePublisher(); + + dataStream.subscribe(new StockPriceSubscriber(5, 500)); // 500ms on each event, infinite + dataStream.subscribe(new StockPriceSubscriber(10, 2000)); // 2000ms on each event, infinite + Thread.sleep(5000); + dataStream.subscribe(new StockPriceSubscriber(10, 111, 20)); // 111ms on each event, take 20 + Thread.sleep(5000); + dataStream.subscribe(new StockPriceSubscriber(10, 222, 20));// 222ms on each event, take 20 + } +} diff --git a/spi/src/examples/java/org/reactivestreams/example/multicast/NeverEndingStockStream.java b/spi/src/examples/java/org/reactivestreams/example/multicast/NeverEndingStockStream.java new file mode 100644 index 00000000..0086e8c1 --- /dev/null +++ b/spi/src/examples/java/org/reactivestreams/example/multicast/NeverEndingStockStream.java @@ -0,0 +1,75 @@ +package org.reactivestreams.example.multicast; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Simulate a network connection that is firing data at you. + *

+ * Purposefully not using the `Subscriber` and `Publisher` types to not confuse things with `StockPricePublisher` + */ +public class NeverEndingStockStream { + + private static final NeverEndingStockStream INSTANCE = new NeverEndingStockStream(); + + private NeverEndingStockStream() { + } + + // using array because it is far faster than list/set for iteration + // which is where most of the time in the tight loop will go (... well, beside object allocation) + private volatile Handler[] handlers = new Handler[0]; + + public static synchronized void addHandler(Handler handler) { + if (INSTANCE.handlers.length == 0) { + INSTANCE.handlers = new Handler[] { handler }; + } else { + Handler[] newHandlers = new Handler[INSTANCE.handlers.length + 1]; + System.arraycopy(INSTANCE.handlers, 0, newHandlers, 0, INSTANCE.handlers.length); + newHandlers[newHandlers.length - 1] = handler; + INSTANCE.handlers = newHandlers; + } + INSTANCE.startIfNeeded(); + } + + public static synchronized void removeHandler(Handler handler) { + // too lazy to do the array handling + HashSet set = new HashSet<>(Arrays.asList(INSTANCE.handlers)); + set.remove(handler); + INSTANCE.handlers = set.toArray(new Handler[set.size()]); + } + + public static interface Handler { + public void handle(Stock event); + } + + private final AtomicBoolean running = new AtomicBoolean(false); + private final AtomicLong stockIndex = new AtomicLong(); + + private void startIfNeeded() { + if (running.compareAndSet(false, true)) { + new Thread(new Runnable() { + + @Override + public void run() { + while (handlers.length > 0) { + long l = stockIndex.incrementAndGet(); + Stock s = new Stock(l); + for (Handler h : handlers) { + h.handle(s); + } + try { + // just so it is someone sane how fast this is moving + Thread.sleep(1); + } catch (InterruptedException e) { + } + } + running.set(false); + } + + }).start(); + } + } + +} diff --git a/spi/src/examples/java/org/reactivestreams/example/multicast/Stock.java b/spi/src/examples/java/org/reactivestreams/example/multicast/Stock.java new file mode 100644 index 00000000..431e60de --- /dev/null +++ b/spi/src/examples/java/org/reactivestreams/example/multicast/Stock.java @@ -0,0 +1,15 @@ +package org.reactivestreams.example.multicast; + +public class Stock { + + private final long l; + + public Stock(long l) { + this.l = l; + } + + public long getPrice() { + return l; + } + +} diff --git a/spi/src/examples/java/org/reactivestreams/example/multicast/StockPricePublisher.java b/spi/src/examples/java/org/reactivestreams/example/multicast/StockPricePublisher.java new file mode 100644 index 00000000..c1478ea2 --- /dev/null +++ b/spi/src/examples/java/org/reactivestreams/example/multicast/StockPricePublisher.java @@ -0,0 +1,73 @@ +package org.reactivestreams.example.multicast; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.reactivestreams.example.multicast.NeverEndingStockStream.Handler; + +/** + * Publisher of stock prices from a never ending stream. + *

+ * It will share a single underlying stream with as many subscribers as it receives. + *

+ * If the subscriber can not keep up, it will drop (different strategies could be implemented, configurable, etc). + */ +public class StockPricePublisher implements Publisher { + + @Override + public void subscribe(final Subscriber s) { + s.onSubscribe(new Subscription() { + + AtomicInteger capacity = new AtomicInteger(); + EventHandler handler = new EventHandler(s, capacity); + + @Override + public void signalAdditionalDemand(int n) { + if (capacity.getAndAdd(n) == 0) { + // was at 0, so start up consumption again + startConsuming(); + } + } + + @Override + public void cancel() { + System.out.println("StockPricePublisher => Cancel Subscription"); + NeverEndingStockStream.removeHandler(handler); + } + + public void startConsuming() { + NeverEndingStockStream.addHandler(handler); + } + + }); + + } + + private static final class EventHandler implements Handler { + private final Subscriber s; + private final AtomicInteger capacity; + + private EventHandler(Subscriber s, AtomicInteger capacity) { + this.s = s; + this.capacity = capacity; + } + + @Override + public void handle(Stock event) { + int c = capacity.get(); + if (c == 0) { + // shortcut instead of doing decrement/increment loops while no capacity + return; + } + if (capacity.getAndDecrement() > 0) { + s.onNext(event); + } else { + // we just decremented below 0 so increment back one + capacity.incrementAndGet(); + } + } + } + +} diff --git a/spi/src/examples/java/org/reactivestreams/example/multicast/StockPriceSubscriber.java b/spi/src/examples/java/org/reactivestreams/example/multicast/StockPriceSubscriber.java new file mode 100644 index 00000000..c598fc6d --- /dev/null +++ b/spi/src/examples/java/org/reactivestreams/example/multicast/StockPriceSubscriber.java @@ -0,0 +1,78 @@ +package org.reactivestreams.example.multicast; + +import java.util.concurrent.ArrayBlockingQueue; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +public class StockPriceSubscriber implements Subscriber { + + private final ArrayBlockingQueue buffer; + private final int delayPerStock; + private volatile boolean terminated = false; + private final int take; + + public StockPriceSubscriber(int bufferSize, int delayPerStock, int take) { + this.buffer = new ArrayBlockingQueue<>(bufferSize); + this.delayPerStock = delayPerStock; + this.take = take; + } + + public StockPriceSubscriber(int bufferSize, int delayPerStock) { + this(bufferSize, delayPerStock, -1); + } + + @Override + public void onSubscribe(Subscription s) { + System.out.println("StockPriceSubscriber.onSubscribe => request " + buffer.remainingCapacity()); + s.signalAdditionalDemand(buffer.remainingCapacity()); + startAsyncWork(s); + } + + @Override + public void onNext(Stock t) { + buffer.add(t); + } + + @Override + public void onError(Throwable t) { + terminated = true; + throw new RuntimeException(t); + } + + @Override + public void onCompleted() { + terminated = true; + } + + private void startAsyncWork(final Subscription s) { + System.out.println("StockPriceSubscriber => Start new worker thread"); + /* don't write real code like this! just for quick demo */ + new Thread(new Runnable() { + public void run() { + int received = 0; + + while (!terminated) { + Stock v = buffer.poll(); + try { + Thread.sleep(delayPerStock); + } catch (Exception e) { + e.printStackTrace(); + } + if (buffer.size() < 3) { + s.signalAdditionalDemand(buffer.remainingCapacity()); + } + if (v != null) { + received++; + System.out.println("StockPriceSubscriber[" + delayPerStock + "] => " + v.getPrice()); + if (take > 0 && received >= take) { + s.cancel(); + terminated = true; + } + } + } + } + }).start(); + } + +} diff --git a/spi/src/examples/java/org/reactivestreams/example/unicast/InfiniteIncrementNumberPublisher.java b/spi/src/examples/java/org/reactivestreams/example/unicast/InfiniteIncrementNumberPublisher.java new file mode 100644 index 00000000..f7f823ff --- /dev/null +++ b/spi/src/examples/java/org/reactivestreams/example/unicast/InfiniteIncrementNumberPublisher.java @@ -0,0 +1,52 @@ +package org.reactivestreams.example.unicast; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +class InfiniteIncrementNumberPublisher implements Publisher { + + @Override + public void subscribe(final Subscriber s) { + + final AtomicInteger i = new AtomicInteger(); + + Subscription subscription = new Subscription() { + + AtomicInteger capacity = new AtomicInteger(); + + @Override + public void signalAdditionalDemand(int n) { + System.out.println("signalAdditionalDemand => " + n); + if (capacity.getAndAdd(n) == 0) { + // start sending again if it wasn't already running + send(); + } + } + + private void send() { + System.out.println("send => " + capacity.get()); + // this would normally use an eventloop, actor, whatever + new Thread(new Runnable() { + + public void run() { + do { + s.onNext(i.incrementAndGet()); + } while (capacity.decrementAndGet() > 0); + } + }).start(); + } + + @Override + public void cancel() { + capacity.set(-1); + } + + }; + + s.onSubscribe(subscription); + + } +} \ No newline at end of file diff --git a/spi/src/examples/java/org/reactivestreams/example/unicast/NumberSubscriberThatHopsThreads.java b/spi/src/examples/java/org/reactivestreams/example/unicast/NumberSubscriberThatHopsThreads.java new file mode 100644 index 00000000..1a8591ce --- /dev/null +++ b/spi/src/examples/java/org/reactivestreams/example/unicast/NumberSubscriberThatHopsThreads.java @@ -0,0 +1,64 @@ +package org.reactivestreams.example.unicast; + +import java.util.concurrent.ArrayBlockingQueue; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +class NumberSubscriberThatHopsThreads implements Subscriber { + + final int BUFFER_SIZE = 10; + private final ArrayBlockingQueue buffer = new ArrayBlockingQueue<>(BUFFER_SIZE); + private volatile boolean terminated = false; + private final String token; + + NumberSubscriberThatHopsThreads(String token) { + this.token = token; + } + + @Override + public void onSubscribe(Subscription s) { + System.out.println("onSubscribe => request " + BUFFER_SIZE); + s.signalAdditionalDemand(BUFFER_SIZE); + startAsyncWork(s); + } + + @Override + public void onNext(Integer t) { + buffer.add(t); + } + + @Override + public void onError(Throwable t) { + terminated = true; + throw new RuntimeException(t); + } + + @Override + public void onCompleted() { + terminated = true; + } + + private void startAsyncWork(final Subscription s) { + System.out.println("**** Start new worker thread"); + /* don't write real code like this! just for quick demo */ + new Thread(new Runnable() { + public void run() { + while (!terminated) { + Integer v = buffer.poll(); + try { + Thread.sleep(100); + } catch (Exception e) { + e.printStackTrace(); + } + if (buffer.size() < 3) { + s.signalAdditionalDemand(BUFFER_SIZE - buffer.size()); + } + if (v != null) { + System.out.println(token + " => Did stuff with v: " + v); + } + } + } + }).start(); + } +} \ No newline at end of file diff --git a/spi/src/examples/java/org/reactivestreams/example/unicast/UnicastExample.java b/spi/src/examples/java/org/reactivestreams/example/unicast/UnicastExample.java new file mode 100644 index 00000000..59c7e197 --- /dev/null +++ b/spi/src/examples/java/org/reactivestreams/example/unicast/UnicastExample.java @@ -0,0 +1,21 @@ +package org.reactivestreams.example.unicast; + +import org.reactivestreams.Publisher; + +public class UnicastExample { + + /** + * Each subscribe will start a new stream starting at 0. + * + * @param args + * @throws InterruptedException + */ + public static void main(String... args) throws InterruptedException { + Publisher dataStream = new InfiniteIncrementNumberPublisher(); + + dataStream.subscribe(new NumberSubscriberThatHopsThreads("A")); + Thread.sleep(2000); + dataStream.subscribe(new NumberSubscriberThatHopsThreads("B")); + } + +} diff --git a/spi/src/main/java/org/reactivestreams/Publisher.java b/spi/src/main/java/org/reactivestreams/Publisher.java new file mode 100644 index 00000000..6bf73c65 --- /dev/null +++ b/spi/src/main/java/org/reactivestreams/Publisher.java @@ -0,0 +1,17 @@ +package org.reactivestreams; + +public interface Publisher { + + /** + * Request {@link Subscription} to start streaming data. + *

+ * This is a "factory method" and can be called multiple times, each time starting a new {@link Subscription}. + *

+ * Each {@link Subscription} will work for only a single {@link Subscriber}. + *

+ * A {@link Subscriber} should only subscribe once to a single {@link Publisher}. + * + * @param s + */ + public void subscribe(Subscriber s); +} diff --git a/spi/src/main/java/org/reactivestreams/Subscriber.java b/spi/src/main/java/org/reactivestreams/Subscriber.java new file mode 100644 index 00000000..6450bca3 --- /dev/null +++ b/spi/src/main/java/org/reactivestreams/Subscriber.java @@ -0,0 +1,55 @@ +package org.reactivestreams; + +/** + * Will receive call to {@link #onSubscribe(Subscription)} once after passing an instance of {@link Subscriber} to {@link Publisher#subscribe(Subscriber)}. + *

+ * No further notifications will be received until {@link Subscription#signalAdditionalDemand(int)} is called. + *

+ * After signaling demand: + *

+ *

+ * Demand can be signalled via {@link Subscription#signalAdditionalDemand(int)} whenever the {@link Subscriber} instance is capable of handling more. + * + * @param + */ +public interface Subscriber { + /** + * Invoked after calling {@link Publisher#subscribe(Subscriber)}. + *

+ * No data will start flowing until {@link Subscription#signalAdditionalDemand(int)} is invoked. + *

+ * It is the resonsibility of this {@link Subscriber} instance to call {@link Subscription#signalAdditionalDemand(int)} whenever more data is wanted. + *

+ * The {@link Publisher} will send notifications only in response to {@link Subscription#signalAdditionalDemand(int)}. + * + * @param s + * {@link Subscription} that allows requesting data via {@link Subscription#signalAdditionalDemand(int)} + */ + public void onSubscribe(Subscription s); + + /** + * Data notification sent by the {@link Publisher} in response to requests to {@link Subscription#signalAdditionalDemand(int)}. + * + * @param t + */ + public void onNext(T t); + + /** + * Failed terminal state. + *

+ * No further events will be sent even if {@link Subscription#signalAdditionalDemand(int)} is invoked again. + * + * @param t + */ + public void onError(Throwable t); + + /** + * Successful terminal state. + *

+ * No further events will be sent even if {@link Subscription#signalAdditionalDemand(int)} is invoked again. + */ + public void onCompleted(); +} diff --git a/spi/src/main/java/org/reactivestreams/Subscription.java b/spi/src/main/java/org/reactivestreams/Subscription.java new file mode 100644 index 00000000..0c2fc456 --- /dev/null +++ b/spi/src/main/java/org/reactivestreams/Subscription.java @@ -0,0 +1,29 @@ +package org.reactivestreams; + +/** + * A {@link Subscription} represents a one-to-one lifecycle of a {@link Subscriber} subscribing to a {@link Publisher}. + *

+ * It can only be used once by a single {@link Subscriber}. + *

+ * It is used to both signal desire for data and cancel demand (and allow resource cleanup). + * + */ +public interface Subscription { + /** + * No events will be sent by a {@link Publisher} until demand is signalled via this method. + *

+ * It can be called however often and whenever needed. + *

+ * Whatever has been signalled can be sent by the {@link Publisher} so only signal demand for what can be safely handled. + * + * @param n + */ + public void signalAdditionalDemand(int n); + + /** + * Request the {@link Publisher} to stop sending data and clean up resources. + *

+ * Data may still be sent to meet previously signalled demand after calling cancel as this request is asynchronous. + */ + public void cancel(); +} diff --git a/spi/src/main/java/org/reactivestreams/api/Consumer.java b/spi/src/main/java/org/reactivestreams/api/Consumer.java deleted file mode 100644 index d9696edd..00000000 --- a/spi/src/main/java/org/reactivestreams/api/Consumer.java +++ /dev/null @@ -1,21 +0,0 @@ -package org.reactivestreams.api; - -import org.reactivestreams.spi.Subscriber; - -/** - * A Consumer is the logical sink of elements of a given type. - * The underlying implementation is done by way of a {@link org.reactivestreams.spi.Subscriber Subscriber}. - * This interface is the user-level API for a sink while a Subscriber is the SPI. - *

- * Implementations of this interface will typically offer domain- or language-specific - * methods for transforming or otherwise interacting with the stream of elements. - */ -public interface Consumer { - - /** - * Get the underlying {@link org.reactivestreams.spi.Subscriber Subscriber} for this Consumer. This method should only be used by - * implementations of this API. - * @return the underlying subscriber for this consumer - */ - public Subscriber getSubscriber(); -} \ No newline at end of file diff --git a/spi/src/main/java/org/reactivestreams/api/Processor.java b/spi/src/main/java/org/reactivestreams/api/Processor.java deleted file mode 100644 index da522cd6..00000000 --- a/spi/src/main/java/org/reactivestreams/api/Processor.java +++ /dev/null @@ -1,10 +0,0 @@ -package org.reactivestreams.api; - -/** - * A Processor is a stand-alone representation of a transformation for - * elements from In to Out types. Implementations of this API will provide - * factory methods for creating Processors and connecting them to - * {@link org.reactivestreams.api.Producer Producer} and {@link org.reactivestreams.api.Consumer Consumer}. - */ -public interface Processor extends Consumer, Producer { -} diff --git a/spi/src/main/java/org/reactivestreams/api/Producer.java b/spi/src/main/java/org/reactivestreams/api/Producer.java deleted file mode 100644 index 3e8b5640..00000000 --- a/spi/src/main/java/org/reactivestreams/api/Producer.java +++ /dev/null @@ -1,38 +0,0 @@ -package org.reactivestreams.api; - -import org.reactivestreams.spi.Publisher; - -/** - * A Producer is the logical source of elements of a given type. - * The underlying implementation is done by way of a {@link org.reactivestreams.spi.Publisher Publisher}. - * This interface is the user-level API for a source while a Publisher is the - * SPI. - *

- * Implementations of this interface will typically offer domain- or language-specific - * methods for transforming or otherwise interacting with the produced stream of elements. - */ -public interface Producer { - - /** - * Get the underlying {@link org.reactivestreams.spi.Publisher Publisher} for this Producer. This method should only be used by - * implementations of this API. - * @return the underlying publisher for this producer - */ - public Publisher getPublisher(); - - /** - * Connect the given consumer to this producer. This means that the - * Subscriber underlying the {@link org.reactivestreams.api.Consumer Consumer} subscribes to this Producer’s - * underlying {@link org.reactivestreams.spi.Publisher Publisher}, which will initiate the transfer of the produced - * stream of elements from producer to consumer until either of three things - * happen: - *

- *

- * @param consumer The consumer to register with this producer. - */ - public void produceTo(Consumer consumer); -} diff --git a/spi/src/main/java/org/reactivestreams/spi/Publisher.java b/spi/src/main/java/org/reactivestreams/spi/Publisher.java deleted file mode 100644 index 7a4a2154..00000000 --- a/spi/src/main/java/org/reactivestreams/spi/Publisher.java +++ /dev/null @@ -1,16 +0,0 @@ -package org.reactivestreams.spi; - -/** - * A Publisher is a source of elements of a given type. One or more {@link org.reactivestreams.spi.Subscriber Subscriber} may be connected - * to this Publisher in order to receive the published elements, contingent on availability of these - * elements as well as the presence of demand signaled by the Subscriber via {@link org.reactivestreams.spi.Subscription#requestMore(int) requestMore}. - */ -public interface Publisher { - - /** - * Subscribe the given {@link org.reactivestreams.spi.Subscriber Subscriber} to this Publisher. A Subscriber can at most be subscribed once - * to a given Publisher, and to at most one Publisher in total. - * @param subscriber The subscriber to register with this publisher. - */ - public void subscribe(Subscriber subscriber); -} diff --git a/spi/src/main/java/org/reactivestreams/spi/Subscriber.java b/spi/src/main/java/org/reactivestreams/spi/Subscriber.java deleted file mode 100644 index c900229c..00000000 --- a/spi/src/main/java/org/reactivestreams/spi/Subscriber.java +++ /dev/null @@ -1,43 +0,0 @@ -package org.reactivestreams.spi; - -/** - * A Subscriber receives elements from a {@link org.reactivestreams.spi.Publisher Publisher} based on the {@link org.reactivestreams.spi.Subscription Subscription} it has. - * The Publisher may supply elements as they become available, the Subscriber signals demand via - * {@link org.reactivestreams.spi.Subscription#requestMore(int) requestMore} and elements from when supply and demand are both present. - */ -public interface Subscriber { - - /** - * The {@link org.reactivestreams.spi.Publisher Publisher} generates a {@link org.reactivestreams.spi.Subscription Subscription} upon {@link org.reactivestreams.spi.Publisher#subscribe(org.reactivestreams.spi.Subscriber) subscribe} and passes - * it on to the Subscriber named there using this method. The Publisher may choose to reject - * the subscription request by calling {@link #onError onError} instead. - * @param subscription The subscription which connects this subscriber to its publisher. - */ - public void onSubscribe(Subscription subscription); - - /** - * The {@link org.reactivestreams.spi.Publisher Publisher} calls this method to pass one element to this Subscriber. The element - * must not be null. The Publisher must not call this method more often than - * the Subscriber has signaled demand for via the corresponding {@link org.reactivestreams.spi.Subscription Subscription}. - * @param element The element that is passed from publisher to subscriber. - */ - public void onNext(T element); - - /** - * The {@link org.reactivestreams.spi.Publisher Publisher} calls this method in order to signal that it terminated normally. - * No more elements will be forthcoming and none of the Subscriber’s methods will be called hereafter. - */ - public void onComplete(); - - /** - * The {@link org.reactivestreams.spi.Publisher Publisher} calls this method to signal that the stream of elements has failed - * and is being aborted. The Subscriber should abort its processing as soon as possible. - * No more elements will be forthcoming and none of the Subscriber’s methods will be called hereafter. - *

- * This method is not intended to pass validation errors or similar from Publisher to Subscriber - * in order to initiate an orderly shutdown of the exchange; it is intended only for fatal - * failure conditions which make it impossible to continue processing further elements. - * @param cause An exception which describes the reason for tearing down this stream. - */ - public void onError(Throwable cause); -} diff --git a/spi/src/main/java/org/reactivestreams/spi/Subscription.java b/spi/src/main/java/org/reactivestreams/spi/Subscription.java deleted file mode 100644 index 3c8b1f47..00000000 --- a/spi/src/main/java/org/reactivestreams/spi/Subscription.java +++ /dev/null @@ -1,26 +0,0 @@ -package org.reactivestreams.spi; - -/** - * A Subscription models the relationship between a {@link org.reactivestreams.spi.Publisher Publisher} and a {@link org.reactivestreams.spi.Subscriber Subscriber}. - * The Subscriber receives a Subscription so that it can ask for elements to be delivered - * using {@link org.reactivestreams.spi.Subscription#requestMore(int) requestMore}. The Subscription can be disposed of by canceling it. - */ -public interface Subscription { - - /** - * Cancel this subscription. The {@link org.reactivestreams.spi.Publisher Publisher} to which produced this Subscription - * will eventually stop sending more elements to the {@link org.reactivestreams.spi.Subscriber Subscriber} which owns - * this Subscription. This may happen before the requested number of elements has - * been delivered, even if the Publisher would still have more elements. - */ - public void cancel(); - - /** - * Request more data from the {@link org.reactivestreams.spi.Publisher Publisher} which produced this Subscription. - * The number of requested elements is cumulative to the number requested previously. - * The Publisher may eventually publish up to the requested number of elements to - * the {@link org.reactivestreams.spi.Subscriber Subscriber} which owns this Subscription. - * @param elements The number of elements requested. - */ - public void requestMore(int elements); -} \ No newline at end of file