diff --git a/api/.gitignore b/api/.gitignore new file mode 100644 index 00000000..5e56e040 --- /dev/null +++ b/api/.gitignore @@ -0,0 +1 @@ +/bin diff --git a/spi/build.sbt b/api/build.sbt similarity index 91% rename from spi/build.sbt rename to api/build.sbt index 6ebe1a9a..22421082 100644 --- a/spi/build.sbt +++ b/api/build.sbt @@ -1,4 +1,4 @@ -name := "reactive-streams-spi" +name := "reactive-streams-api" javacOptions in compile ++= Seq("-encoding", "UTF-8", "-source", "1.6", "-target", "1.6", "-Xlint:unchecked", "-Xlint:deprecation") diff --git a/api/src/examples/java/org/reactivestreams/example/multicast/MulticastExample.java b/api/src/examples/java/org/reactivestreams/example/multicast/MulticastExample.java new file mode 100644 index 00000000..7c7b2a26 --- /dev/null +++ b/api/src/examples/java/org/reactivestreams/example/multicast/MulticastExample.java @@ -0,0 +1,23 @@ +package org.reactivestreams.example.multicast; + +import org.reactivestreams.Publisher; + +public class MulticastExample { + + /** + * Each subscribe will join an existing stream. + * + * @param args + * @throws InterruptedException + */ + public static void main(String... args) throws InterruptedException { + Publisher dataStream = new StockPricePublisher(); + + dataStream.subscribe(new StockPriceSubscriber(5, 500)); // 500ms on each event, infinite + dataStream.subscribe(new StockPriceSubscriber(10, 2000)); // 2000ms on each event, infinite + Thread.sleep(5000); + dataStream.subscribe(new StockPriceSubscriber(10, 111, 20)); // 111ms on each event, take 20 + Thread.sleep(5000); + dataStream.subscribe(new StockPriceSubscriber(10, 222, 20));// 222ms on each event, take 20 + } +} diff --git a/api/src/examples/java/org/reactivestreams/example/multicast/NeverEndingStockStream.java b/api/src/examples/java/org/reactivestreams/example/multicast/NeverEndingStockStream.java new file mode 100644 index 00000000..0086e8c1 --- /dev/null +++ b/api/src/examples/java/org/reactivestreams/example/multicast/NeverEndingStockStream.java @@ -0,0 +1,75 @@ +package org.reactivestreams.example.multicast; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Simulate a network connection that is firing data at you. + *

+ * Purposefully not using the `Subscriber` and `Publisher` types to not confuse things with `StockPricePublisher` + */ +public class NeverEndingStockStream { + + private static final NeverEndingStockStream INSTANCE = new NeverEndingStockStream(); + + private NeverEndingStockStream() { + } + + // using array because it is far faster than list/set for iteration + // which is where most of the time in the tight loop will go (... well, beside object allocation) + private volatile Handler[] handlers = new Handler[0]; + + public static synchronized void addHandler(Handler handler) { + if (INSTANCE.handlers.length == 0) { + INSTANCE.handlers = new Handler[] { handler }; + } else { + Handler[] newHandlers = new Handler[INSTANCE.handlers.length + 1]; + System.arraycopy(INSTANCE.handlers, 0, newHandlers, 0, INSTANCE.handlers.length); + newHandlers[newHandlers.length - 1] = handler; + INSTANCE.handlers = newHandlers; + } + INSTANCE.startIfNeeded(); + } + + public static synchronized void removeHandler(Handler handler) { + // too lazy to do the array handling + HashSet set = new HashSet<>(Arrays.asList(INSTANCE.handlers)); + set.remove(handler); + INSTANCE.handlers = set.toArray(new Handler[set.size()]); + } + + public static interface Handler { + public void handle(Stock event); + } + + private final AtomicBoolean running = new AtomicBoolean(false); + private final AtomicLong stockIndex = new AtomicLong(); + + private void startIfNeeded() { + if (running.compareAndSet(false, true)) { + new Thread(new Runnable() { + + @Override + public void run() { + while (handlers.length > 0) { + long l = stockIndex.incrementAndGet(); + Stock s = new Stock(l); + for (Handler h : handlers) { + h.handle(s); + } + try { + // just so it is someone sane how fast this is moving + Thread.sleep(1); + } catch (InterruptedException e) { + } + } + running.set(false); + } + + }).start(); + } + } + +} diff --git a/api/src/examples/java/org/reactivestreams/example/multicast/Stock.java b/api/src/examples/java/org/reactivestreams/example/multicast/Stock.java new file mode 100644 index 00000000..431e60de --- /dev/null +++ b/api/src/examples/java/org/reactivestreams/example/multicast/Stock.java @@ -0,0 +1,15 @@ +package org.reactivestreams.example.multicast; + +public class Stock { + + private final long l; + + public Stock(long l) { + this.l = l; + } + + public long getPrice() { + return l; + } + +} diff --git a/api/src/examples/java/org/reactivestreams/example/multicast/StockPricePublisher.java b/api/src/examples/java/org/reactivestreams/example/multicast/StockPricePublisher.java new file mode 100644 index 00000000..78aa3e6f --- /dev/null +++ b/api/src/examples/java/org/reactivestreams/example/multicast/StockPricePublisher.java @@ -0,0 +1,73 @@ +package org.reactivestreams.example.multicast; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.reactivestreams.Subscription; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Publisher; +import org.reactivestreams.example.multicast.NeverEndingStockStream.Handler; + +/** + * Publisher of stock prices from a never ending stream. + *

+ * It will share a single underlying stream with as many subscribers as it receives. + *

+ * If the subscriber can not keep up, it will drop (different strategies could be implemented, configurable, etc). + */ +public class StockPricePublisher implements Publisher { + + @Override + public void subscribe(final Subscriber s) { + s.onSubscribe(new Subscription() { + + AtomicInteger capacity = new AtomicInteger(); + EventHandler handler = new EventHandler(s, capacity); + + @Override + public void request(int n) { + if (capacity.getAndAdd(n) == 0) { + // was at 0, so start up consumption again + startConsuming(); + } + } + + @Override + public void cancel() { + System.out.println("StockPricePublisher => Cancel Subscription"); + NeverEndingStockStream.removeHandler(handler); + } + + public void startConsuming() { + NeverEndingStockStream.addHandler(handler); + } + + }); + + } + + private static final class EventHandler implements Handler { + private final Subscriber s; + private final AtomicInteger capacity; + + private EventHandler(Subscriber s, AtomicInteger capacity) { + this.s = s; + this.capacity = capacity; + } + + @Override + public void handle(Stock event) { + int c = capacity.get(); + if (c == 0) { + // shortcut instead of doing decrement/increment loops while no capacity + return; + } + if (capacity.getAndDecrement() > 0) { + s.onNext(event); + } else { + // we just decremented below 0 so increment back one + capacity.incrementAndGet(); + } + } + } + +} diff --git a/api/src/examples/java/org/reactivestreams/example/multicast/StockPriceSubscriber.java b/api/src/examples/java/org/reactivestreams/example/multicast/StockPriceSubscriber.java new file mode 100644 index 00000000..e0a37813 --- /dev/null +++ b/api/src/examples/java/org/reactivestreams/example/multicast/StockPriceSubscriber.java @@ -0,0 +1,78 @@ +package org.reactivestreams.example.multicast; + +import java.util.concurrent.ArrayBlockingQueue; + +import org.reactivestreams.Subscription; +import org.reactivestreams.Subscriber; + +public class StockPriceSubscriber implements Subscriber { + + private final ArrayBlockingQueue buffer; + private final int delayPerStock; + private volatile boolean terminated = false; + private final int take; + + public StockPriceSubscriber(int bufferSize, int delayPerStock, int take) { + this.buffer = new ArrayBlockingQueue<>(bufferSize); + this.delayPerStock = delayPerStock; + this.take = take; + } + + public StockPriceSubscriber(int bufferSize, int delayPerStock) { + this(bufferSize, delayPerStock, -1); + } + + @Override + public void onSubscribe(Subscription s) { + System.out.println("StockPriceSubscriber.onSubscribe => request " + buffer.remainingCapacity()); + s.request(buffer.remainingCapacity()); + startAsyncWork(s); + } + + @Override + public void onNext(Stock t) { + buffer.add(t); + } + + @Override + public void onError(Throwable t) { + terminated = true; + throw new RuntimeException(t); + } + + @Override + public void onComplete() { + terminated = true; + } + + private void startAsyncWork(final Subscription s) { + System.out.println("StockPriceSubscriber => Start new worker thread"); + /* don't write real code like this! just for quick demo */ + new Thread(new Runnable() { + public void run() { + int received = 0; + + while (!terminated) { + Stock v = buffer.poll(); + try { + Thread.sleep(delayPerStock); + } catch (Exception e) { + e.printStackTrace(); + } + if (buffer.size() < 3) { + s.request(buffer.remainingCapacity()); + } + if (v != null) { + received++; + System.out.println("StockPriceSubscriber[" + delayPerStock + "] => " + v.getPrice()); + if (take > 0 && received >= take) { + s.cancel(); + terminated = true; + } + } + } + } + }).start(); + } + +} diff --git a/api/src/examples/java/org/reactivestreams/example/unicast/InfiniteIncrementNumberPublisher.java b/api/src/examples/java/org/reactivestreams/example/unicast/InfiniteIncrementNumberPublisher.java new file mode 100644 index 00000000..450e0089 --- /dev/null +++ b/api/src/examples/java/org/reactivestreams/example/unicast/InfiniteIncrementNumberPublisher.java @@ -0,0 +1,52 @@ +package org.reactivestreams.example.unicast; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.reactivestreams.Subscription; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Publisher; + +class InfiniteIncrementNumberPublisher implements Publisher { + + @Override + public void subscribe(final Subscriber s) { + + final AtomicInteger i = new AtomicInteger(); + + Subscription subscription = new Subscription() { + + AtomicInteger capacity = new AtomicInteger(); + + @Override + public void request(int n) { + System.out.println("signalAdditionalDemand => " + n); + if (capacity.getAndAdd(n) == 0) { + // start sending again if it wasn't already running + send(); + } + } + + private void send() { + System.out.println("send => " + capacity.get()); + // this would normally use an eventloop, actor, whatever + new Thread(new Runnable() { + + public void run() { + do { + s.onNext(i.incrementAndGet()); + } while (capacity.decrementAndGet() > 0); + } + }).start(); + } + + @Override + public void cancel() { + capacity.set(-1); + } + + }; + + s.onSubscribe(subscription); + + } +} \ No newline at end of file diff --git a/api/src/examples/java/org/reactivestreams/example/unicast/NumberSubscriberThatHopsThreads.java b/api/src/examples/java/org/reactivestreams/example/unicast/NumberSubscriberThatHopsThreads.java new file mode 100644 index 00000000..f0d594fa --- /dev/null +++ b/api/src/examples/java/org/reactivestreams/example/unicast/NumberSubscriberThatHopsThreads.java @@ -0,0 +1,64 @@ +package org.reactivestreams.example.unicast; + +import java.util.concurrent.ArrayBlockingQueue; + +import org.reactivestreams.Subscription; +import org.reactivestreams.Subscriber; + +class NumberSubscriberThatHopsThreads implements Subscriber { + + final int BUFFER_SIZE = 10; + private final ArrayBlockingQueue buffer = new ArrayBlockingQueue<>(BUFFER_SIZE); + private volatile boolean terminated = false; + private final String token; + + NumberSubscriberThatHopsThreads(String token) { + this.token = token; + } + + @Override + public void onSubscribe(Subscription s) { + System.out.println("onSubscribe => request " + BUFFER_SIZE); + s.request(BUFFER_SIZE); + startAsyncWork(s); + } + + @Override + public void onNext(Integer t) { + buffer.add(t); + } + + @Override + public void onError(Throwable t) { + terminated = true; + throw new RuntimeException(t); + } + + @Override + public void onComplete() { + terminated = true; + } + + private void startAsyncWork(final Subscription s) { + System.out.println("**** Start new worker thread"); + /* don't write real code like this! just for quick demo */ + new Thread(new Runnable() { + public void run() { + while (!terminated) { + Integer v = buffer.poll(); + try { + Thread.sleep(100); + } catch (Exception e) { + e.printStackTrace(); + } + if (buffer.size() < 3) { + s.request(BUFFER_SIZE - buffer.size()); + } + if (v != null) { + System.out.println(token + " => Did stuff with v: " + v); + } + } + } + }).start(); + } +} \ No newline at end of file diff --git a/api/src/examples/java/org/reactivestreams/example/unicast/UnicastExample.java b/api/src/examples/java/org/reactivestreams/example/unicast/UnicastExample.java new file mode 100644 index 00000000..59c7e197 --- /dev/null +++ b/api/src/examples/java/org/reactivestreams/example/unicast/UnicastExample.java @@ -0,0 +1,21 @@ +package org.reactivestreams.example.unicast; + +import org.reactivestreams.Publisher; + +public class UnicastExample { + + /** + * Each subscribe will start a new stream starting at 0. + * + * @param args + * @throws InterruptedException + */ + public static void main(String... args) throws InterruptedException { + Publisher dataStream = new InfiniteIncrementNumberPublisher(); + + dataStream.subscribe(new NumberSubscriberThatHopsThreads("A")); + Thread.sleep(2000); + dataStream.subscribe(new NumberSubscriberThatHopsThreads("B")); + } + +} diff --git a/api/src/main/java/org/reactivestreams/Publisher.java b/api/src/main/java/org/reactivestreams/Publisher.java new file mode 100644 index 00000000..3e094965 --- /dev/null +++ b/api/src/main/java/org/reactivestreams/Publisher.java @@ -0,0 +1,20 @@ +package org.reactivestreams; + +public interface Publisher { + + /** + * Request {@link Publisher} to start streaming data. + *

+ * This is a "factory method" and can be called multiple times, each time starting a new {@link Subscription}. + *

+ * Each {@link Subscription} will work for only a single {@link Subscriber}. + *

+ * A {@link Subscriber} should only subscribe once to a single {@link Publisher}. + *

+ * If the {@link Publisher} rejects the subscription attempt or otherwise fails it will + * signal the error via {@link Subscriber#onError}. + * + * @param s + */ + public void subscribe(Subscriber s); +} diff --git a/api/src/main/java/org/reactivestreams/Subscriber.java b/api/src/main/java/org/reactivestreams/Subscriber.java new file mode 100644 index 00000000..4d28b44a --- /dev/null +++ b/api/src/main/java/org/reactivestreams/Subscriber.java @@ -0,0 +1,55 @@ +package org.reactivestreams; + +/** + * Will receive call to {@link #onSubscribe(Subscription)} once after passing an instance of {@link Subscriber} to {@link Publisher#subscribe(Subscriber)}. + *

+ * No further notifications will be received until {@link Subscription#request(int)} is called. + *

+ * After signaling demand: + *

+ *

+ * Demand can be signalled via {@link Subscription#request(int)} whenever the {@link Subscriber} instance is capable of handling more. + * + * @param + */ +public interface Subscriber { + /** + * Invoked after calling {@link Publisher#subscribe(Subscriber)}. + *

+ * No data will start flowing until {@link Subscription#request(int)} is invoked. + *

+ * It is the responsibility of this {@link Subscriber} instance to call {@link Subscription#request(int)} whenever more data is wanted. + *

+ * The {@link Publisher} will send notifications only in response to {@link Subscription#request(int)}. + * + * @param s + * {@link Subscription} that allows requesting data via {@link Subscription#request(int)} + */ + public void onSubscribe(Subscription s); + + /** + * Data notification sent by the {@link Publisher} in response to requests to {@link Subscription#request(int)}. + * + * @param t + */ + public void onNext(T t); + + /** + * Failed terminal state. + *

+ * No further events will be sent even if {@link Subscription#request(int)} is invoked again. + * + * @param t + */ + public void onError(Throwable t); + + /** + * Successful terminal state. + *

+ * No further events will be sent even if {@link Subscription#request(int)} is invoked again. + */ + public void onComplete(); +} diff --git a/api/src/main/java/org/reactivestreams/Subscription.java b/api/src/main/java/org/reactivestreams/Subscription.java new file mode 100644 index 00000000..9c665bdd --- /dev/null +++ b/api/src/main/java/org/reactivestreams/Subscription.java @@ -0,0 +1,32 @@ +package org.reactivestreams; + +/** + * A {@link Subscription} represents a one-to-one lifecycle of a {@link Subscriber} subscribing to a {@link Publisher}. + *

+ * It can only be used once by a single {@link Subscriber}. + *

+ * It is used to both signal desire for data and cancel demand (and allow resource cleanup). + * + */ +public interface Subscription { + /** + * No events will be sent by a {@link Publisher} until demand is signaled via this method. + *

+ * It can be called however often and whenever needed. + *

+ * Whatever has been requested can be sent by the {@link Publisher} so only signal demand for what can be safely handled. + *

+ * A {@link Publisher} can send less than is requested if the stream ends but + * then must emit either {@link Subscriber#onError(Throwable)} or {@link Subscriber#onComplete()}. + * + * @param n + */ + public void request(int n); + + /** + * Request the {@link Publisher} to stop sending data and clean up resources. + *

+ * Data may still be sent to meet previously signalled demand after calling cancel as this request is asynchronous. + */ + public void cancel(); +} diff --git a/build.sbt b/build.sbt index c9ec75fa..c7d2c901 100644 --- a/build.sbt +++ b/build.sbt @@ -10,9 +10,9 @@ homepage in ThisBuild := Some(url("http://www.reactive-streams.org/")) publishTo in ThisBuild := Some("releases" at "https://oss.sonatype.org/service/local/staging/deploy/maven2") -lazy val spi = project +lazy val api = project -lazy val tck = project.dependsOn(spi) +lazy val tck = project.dependsOn(api) publishArtifact := false // for this aggregate project diff --git a/spi/src/main/java/org/reactivestreams/api/Consumer.java b/spi/src/main/java/org/reactivestreams/api/Consumer.java deleted file mode 100644 index d9696edd..00000000 --- a/spi/src/main/java/org/reactivestreams/api/Consumer.java +++ /dev/null @@ -1,21 +0,0 @@ -package org.reactivestreams.api; - -import org.reactivestreams.spi.Subscriber; - -/** - * A Consumer is the logical sink of elements of a given type. - * The underlying implementation is done by way of a {@link org.reactivestreams.spi.Subscriber Subscriber}. - * This interface is the user-level API for a sink while a Subscriber is the SPI. - *

- * Implementations of this interface will typically offer domain- or language-specific - * methods for transforming or otherwise interacting with the stream of elements. - */ -public interface Consumer { - - /** - * Get the underlying {@link org.reactivestreams.spi.Subscriber Subscriber} for this Consumer. This method should only be used by - * implementations of this API. - * @return the underlying subscriber for this consumer - */ - public Subscriber getSubscriber(); -} \ No newline at end of file diff --git a/spi/src/main/java/org/reactivestreams/api/Processor.java b/spi/src/main/java/org/reactivestreams/api/Processor.java deleted file mode 100644 index da522cd6..00000000 --- a/spi/src/main/java/org/reactivestreams/api/Processor.java +++ /dev/null @@ -1,10 +0,0 @@ -package org.reactivestreams.api; - -/** - * A Processor is a stand-alone representation of a transformation for - * elements from In to Out types. Implementations of this API will provide - * factory methods for creating Processors and connecting them to - * {@link org.reactivestreams.api.Producer Producer} and {@link org.reactivestreams.api.Consumer Consumer}. - */ -public interface Processor extends Consumer, Producer { -} diff --git a/spi/src/main/java/org/reactivestreams/api/Producer.java b/spi/src/main/java/org/reactivestreams/api/Producer.java deleted file mode 100644 index 3e8b5640..00000000 --- a/spi/src/main/java/org/reactivestreams/api/Producer.java +++ /dev/null @@ -1,38 +0,0 @@ -package org.reactivestreams.api; - -import org.reactivestreams.spi.Publisher; - -/** - * A Producer is the logical source of elements of a given type. - * The underlying implementation is done by way of a {@link org.reactivestreams.spi.Publisher Publisher}. - * This interface is the user-level API for a source while a Publisher is the - * SPI. - *

- * Implementations of this interface will typically offer domain- or language-specific - * methods for transforming or otherwise interacting with the produced stream of elements. - */ -public interface Producer { - - /** - * Get the underlying {@link org.reactivestreams.spi.Publisher Publisher} for this Producer. This method should only be used by - * implementations of this API. - * @return the underlying publisher for this producer - */ - public Publisher getPublisher(); - - /** - * Connect the given consumer to this producer. This means that the - * Subscriber underlying the {@link org.reactivestreams.api.Consumer Consumer} subscribes to this Producer’s - * underlying {@link org.reactivestreams.spi.Publisher Publisher}, which will initiate the transfer of the produced - * stream of elements from producer to consumer until either of three things - * happen: - *

- *

- * @param consumer The consumer to register with this producer. - */ - public void produceTo(Consumer consumer); -} diff --git a/spi/src/main/java/org/reactivestreams/spi/Publisher.java b/spi/src/main/java/org/reactivestreams/spi/Publisher.java deleted file mode 100644 index 7a4a2154..00000000 --- a/spi/src/main/java/org/reactivestreams/spi/Publisher.java +++ /dev/null @@ -1,16 +0,0 @@ -package org.reactivestreams.spi; - -/** - * A Publisher is a source of elements of a given type. One or more {@link org.reactivestreams.spi.Subscriber Subscriber} may be connected - * to this Publisher in order to receive the published elements, contingent on availability of these - * elements as well as the presence of demand signaled by the Subscriber via {@link org.reactivestreams.spi.Subscription#requestMore(int) requestMore}. - */ -public interface Publisher { - - /** - * Subscribe the given {@link org.reactivestreams.spi.Subscriber Subscriber} to this Publisher. A Subscriber can at most be subscribed once - * to a given Publisher, and to at most one Publisher in total. - * @param subscriber The subscriber to register with this publisher. - */ - public void subscribe(Subscriber subscriber); -} diff --git a/spi/src/main/java/org/reactivestreams/spi/Subscriber.java b/spi/src/main/java/org/reactivestreams/spi/Subscriber.java deleted file mode 100644 index c900229c..00000000 --- a/spi/src/main/java/org/reactivestreams/spi/Subscriber.java +++ /dev/null @@ -1,43 +0,0 @@ -package org.reactivestreams.spi; - -/** - * A Subscriber receives elements from a {@link org.reactivestreams.spi.Publisher Publisher} based on the {@link org.reactivestreams.spi.Subscription Subscription} it has. - * The Publisher may supply elements as they become available, the Subscriber signals demand via - * {@link org.reactivestreams.spi.Subscription#requestMore(int) requestMore} and elements from when supply and demand are both present. - */ -public interface Subscriber { - - /** - * The {@link org.reactivestreams.spi.Publisher Publisher} generates a {@link org.reactivestreams.spi.Subscription Subscription} upon {@link org.reactivestreams.spi.Publisher#subscribe(org.reactivestreams.spi.Subscriber) subscribe} and passes - * it on to the Subscriber named there using this method. The Publisher may choose to reject - * the subscription request by calling {@link #onError onError} instead. - * @param subscription The subscription which connects this subscriber to its publisher. - */ - public void onSubscribe(Subscription subscription); - - /** - * The {@link org.reactivestreams.spi.Publisher Publisher} calls this method to pass one element to this Subscriber. The element - * must not be null. The Publisher must not call this method more often than - * the Subscriber has signaled demand for via the corresponding {@link org.reactivestreams.spi.Subscription Subscription}. - * @param element The element that is passed from publisher to subscriber. - */ - public void onNext(T element); - - /** - * The {@link org.reactivestreams.spi.Publisher Publisher} calls this method in order to signal that it terminated normally. - * No more elements will be forthcoming and none of the Subscriber’s methods will be called hereafter. - */ - public void onComplete(); - - /** - * The {@link org.reactivestreams.spi.Publisher Publisher} calls this method to signal that the stream of elements has failed - * and is being aborted. The Subscriber should abort its processing as soon as possible. - * No more elements will be forthcoming and none of the Subscriber’s methods will be called hereafter. - *

- * This method is not intended to pass validation errors or similar from Publisher to Subscriber - * in order to initiate an orderly shutdown of the exchange; it is intended only for fatal - * failure conditions which make it impossible to continue processing further elements. - * @param cause An exception which describes the reason for tearing down this stream. - */ - public void onError(Throwable cause); -} diff --git a/spi/src/main/java/org/reactivestreams/spi/Subscription.java b/spi/src/main/java/org/reactivestreams/spi/Subscription.java deleted file mode 100644 index 3c8b1f47..00000000 --- a/spi/src/main/java/org/reactivestreams/spi/Subscription.java +++ /dev/null @@ -1,26 +0,0 @@ -package org.reactivestreams.spi; - -/** - * A Subscription models the relationship between a {@link org.reactivestreams.spi.Publisher Publisher} and a {@link org.reactivestreams.spi.Subscriber Subscriber}. - * The Subscriber receives a Subscription so that it can ask for elements to be delivered - * using {@link org.reactivestreams.spi.Subscription#requestMore(int) requestMore}. The Subscription can be disposed of by canceling it. - */ -public interface Subscription { - - /** - * Cancel this subscription. The {@link org.reactivestreams.spi.Publisher Publisher} to which produced this Subscription - * will eventually stop sending more elements to the {@link org.reactivestreams.spi.Subscriber Subscriber} which owns - * this Subscription. This may happen before the requested number of elements has - * been delivered, even if the Publisher would still have more elements. - */ - public void cancel(); - - /** - * Request more data from the {@link org.reactivestreams.spi.Publisher Publisher} which produced this Subscription. - * The number of requested elements is cumulative to the number requested previously. - * The Publisher may eventually publish up to the requested number of elements to - * the {@link org.reactivestreams.spi.Subscriber Subscriber} which owns this Subscription. - * @param elements The number of elements requested. - */ - public void requestMore(int elements); -} \ No newline at end of file diff --git a/tck/bin/.gitignore b/tck/bin/.gitignore new file mode 100644 index 00000000..571ee510 --- /dev/null +++ b/tck/bin/.gitignore @@ -0,0 +1 @@ +/org diff --git a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java index cf009d13..94d00a43 100644 --- a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java @@ -1,18 +1,17 @@ package org.reactivestreams.tck; -import org.reactivestreams.api.Processor; -import org.reactivestreams.spi.Publisher; -import org.reactivestreams.spi.Subscriber; -import org.reactivestreams.spi.Subscription; +import java.util.HashSet; +import java.util.Set; + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; import org.reactivestreams.tck.TestEnvironment.ManualPublisher; import org.reactivestreams.tck.TestEnvironment.ManualSubscriber; import org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport; import org.reactivestreams.tck.TestEnvironment.Promise; import org.testng.annotations.Test; -import java.util.HashSet; -import java.util.Set; - public abstract class IdentityProcessorVerification { private final TestEnvironment env; @@ -76,13 +75,13 @@ public Publisher createErrorStatePublisher() { /** * This is the main method you must implement in your test incarnation. - * It must create a Processor, which simply forwards all stream elements from its upstream + * It must create a ReactiveSubject, which simply forwards all stream elements from its upstream * to its downstream. It must be able to internally buffer the given number of elements. */ - public abstract Processor createIdentityProcessor(int bufferSize); + public abstract ReactiveSubject createIdentityReactiveSubject(int bufferSize); /** - * Helper method required for running the Publisher rules against a Processor. + * Helper method required for running the Publisher rules against a ReactiveSubject. * It must create a Publisher for a stream with exactly the given number of elements. * If `elements` is zero the produced stream must be infinite. * The stream must not produce the same element twice (in case of an infinite stream this requirement @@ -104,13 +103,13 @@ public Publisher createErrorStatePublisher() { ////////////////////// PUBLISHER RULES VERIFICATION /////////////////////////// - // A Processor + // A ReactiveSubject // must obey all Publisher rules on its producing side public Publisher createPublisher(int elements) { - Processor processor = createIdentityProcessor(testBufferSize); + ReactiveSubject processor = createIdentityReactiveSubject(testBufferSize); Publisher pub = createHelperPublisher(elements); - pub.subscribe(processor.getSubscriber()); - return processor.getPublisher(); // we run the PublisherVerification against this + pub.subscribe(processor); + return processor; // we run the PublisherVerification against this } // A Publisher @@ -119,9 +118,9 @@ public Publisher createPublisher(int elements) { public void mustSupportAPendingElementCountUpToLongMaxValue() throws Exception { new TestSetup(env, testBufferSize) {{ TestEnvironment.ManualSubscriber sub = newSubscriber(); - sub.requestMore(Integer.MAX_VALUE); - sub.requestMore(Integer.MAX_VALUE); - sub.requestMore(2); // if the Subscription only keeps an int counter without overflow protection it will now be at zero + sub.request(Integer.MAX_VALUE); + sub.request(Integer.MAX_VALUE); + sub.request(2); // if the Subscription only keeps an int counter without overflow protection it will now be at zero final T x = sendNextTFromUpstream(); expectNextElement(sub, x); @@ -158,15 +157,15 @@ public void mustStartProducingWithTheOldestStillAvailableElementForASubscriber() public void mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError() throws Exception { new TestSetup(env, testBufferSize) {{ ManualSubscriberWithErrorCollection sub1 = new ManualSubscriberWithErrorCollection(env); - env.subscribe(processor.getPublisher(), sub1); + env.subscribe(processor, sub1); ManualSubscriberWithErrorCollection sub2 = new ManualSubscriberWithErrorCollection(env); - env.subscribe(processor.getPublisher(), sub2); + env.subscribe(processor, sub2); - sub1.requestMore(1); + sub1.request(1); expectRequestMore(); final T x = sendNextTFromUpstream(); expectNextElement(sub1, x); - sub1.requestMore(1); + sub1.request(1); // sub1 now has received and element and has 1 pending // sub2 has not yet requested anything @@ -187,11 +186,11 @@ public void mustNotCallOnCompleteOrOnErrorMoreThanOncePerSubscriber() { ////////////////////// SUBSCRIBER RULES VERIFICATION /////////////////////////// - // A Processor + // A ReactiveSubject // must obey all Subscriber rules on its consuming side public Subscriber createSubscriber(final SubscriberVerification.SubscriberProbe probe) { - Processor processor = createIdentityProcessor(testBufferSize); - processor.getPublisher().subscribe( + ReactiveSubject processor = createIdentityReactiveSubject(testBufferSize); + processor.subscribe( new Subscriber() { public void onSubscribe(final Subscription subscription) { probe.registerOnSubscribe( @@ -201,7 +200,7 @@ public void triggerShutdown() { } public void triggerRequestMore(int elements) { - subscription.requestMore(elements); + subscription.request(elements); } public void triggerCancel() { @@ -223,12 +222,12 @@ public void onError(Throwable cause) { } }); - return processor.getSubscriber(); // we run the SubscriberVerification against this + return processor; // we run the SubscriberVerification against this } ////////////////////// OTHER SPEC RULE VERIFICATION /////////////////////////// - // A Processor + // A ReactiveSubject // must cancel its upstream Subscription if its last downstream Subscription has been cancelled @Test public void mustCancelItsUpstreamSubscriptionIfItsLastDownstreamSubscriptionHasBeenCancelled() throws Exception { @@ -241,23 +240,23 @@ public void mustCancelItsUpstreamSubscriptionIfItsLastDownstreamSubscriptionHasB }}; } - // A Processor + // A ReactiveSubject // must immediately pass on `onError` events received from its upstream to its downstream @Test public void mustImmediatelyPassOnOnErrorEventsReceivedFromItsUpstreamToItsDownstream() throws Exception { new TestSetup(env, testBufferSize) {{ ManualSubscriberWithErrorCollection sub = new ManualSubscriberWithErrorCollection(env); - env.subscribe(processor.getPublisher(), sub); + env.subscribe(processor, sub); Exception ex = new RuntimeException("Test exception"); sendError(ex); - sub.expectError(ex); // "immediately", i.e. without a preceding requestMore + sub.expectError(ex); // "immediately", i.e. without a preceding request env.verifyNoAsyncErrors(); }}; } - // A Processor + // A ReactiveSubject // must be prepared to receive incoming elements from its upstream even if a downstream subscriber has not requested anything yet @Test public void mustBePreparedToReceiveIncomingElementsFromItsUpstreamEvenIfADownstreamSubscriberHasNotRequestedYet() throws Exception { @@ -268,7 +267,7 @@ public void mustBePreparedToReceiveIncomingElementsFromItsUpstreamEvenIfADownstr final T y = sendNextTFromUpstream(); sub.expectNone(50); - sub.requestMore(2); + sub.request(2); sub.expectNext(x); sub.expectNext(y); @@ -417,7 +416,7 @@ public void mustProduceTheSameElementsInTheSameSequenceForAllItsSubscribers() th public void mustRequestFromUpstreamForElementsThatHaveBeenRequestedLongAgo() throws Exception { new TestSetup(env, testBufferSize) {{ TestEnvironment.ManualSubscriber sub1 = newSubscriber(); - sub1.requestMore(20); + sub1.request(20); int totalRequests = expectRequestMore(); final T x = sendNextTFromUpstream(); @@ -442,7 +441,7 @@ public void mustRequestFromUpstreamForElementsThatHaveBeenRequestedLongAgo() thr expectNextElement(sub1, z); sub2.expectNone(); // since sub2 hasn't requested anything yet - sub2.requestMore(1); + sub2.request(1); expectNextElement(sub2, z); if (totalRequests == 3) { @@ -465,7 +464,7 @@ public void mustUnblockTheStreamIfABlockingSubscriptionHasBeenCancelled() throws TestEnvironment.ManualSubscriber sub1 = newSubscriber(); TestEnvironment.ManualSubscriber sub2 = newSubscriber(); - sub1.requestMore(testBufferSize + 1); + sub1.request(testBufferSize + 1); int pending = 0; int sent = 0; final T[] tees = (T[]) new Object[testBufferSize]; @@ -500,19 +499,19 @@ public abstract class TestSetup extends ManualPublisher { private TestEnvironment.ManualSubscriber tees; // gives us access to an infinite stream of T values private Set seenTees = new HashSet(); - final Processor processor; + final ReactiveSubject processor; final int testBufferSize; public TestSetup(TestEnvironment env, int testBufferSize) throws InterruptedException { super(env); this.testBufferSize = testBufferSize; tees = env.newManualSubscriber(createHelperPublisher(0)); - processor = createIdentityProcessor(testBufferSize); - subscribe(processor.getSubscriber()); + processor = createIdentityReactiveSubject(testBufferSize); + subscribe(processor); } public TestEnvironment.ManualSubscriber newSubscriber() throws InterruptedException { - return env.newManualSubscriber(processor.getPublisher()); + return env.newManualSubscriber(processor); } public T nextT() throws InterruptedException { diff --git a/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java b/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java index 7d0babd7..a6e18b4b 100644 --- a/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java @@ -1,10 +1,7 @@ package org.reactivestreams.tck; -import org.reactivestreams.spi.Publisher; -import org.reactivestreams.spi.Subscription; -import org.reactivestreams.tck.support.Optional; -import org.testng.SkipException; -import org.testng.annotations.Test; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; import java.lang.ref.ReferenceQueue; import java.lang.ref.WeakReference; @@ -14,9 +11,16 @@ import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; -import static org.reactivestreams.tck.TestEnvironment.*; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertTrue; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscription; +import org.reactivestreams.tck.TestEnvironment.Latch; +import org.reactivestreams.tck.TestEnvironment.ManualSubscriber; +import org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport; +import org.reactivestreams.tck.TestEnvironment.Promise; +import org.reactivestreams.tck.TestEnvironment.TestSubscriber; +import org.reactivestreams.tck.support.Optional; +import org.testng.SkipException; +import org.testng.annotations.Test; public abstract class PublisherVerification { @@ -212,7 +216,7 @@ public void run(Publisher pub) throws InterruptedException { }); } - // Subscription::requestMore(Int) + // Subscription::request(Int) // when Subscription is cancelled // must ignore the call @Test @@ -222,17 +226,17 @@ public void subscriptionRequestMoreWhenCancelledMustIgnoreTheCall() throws Throw public void run(Publisher pub) throws InterruptedException { ManualSubscriber sub = env.newManualSubscriber(pub); sub.subscription.value().cancel(); - sub.subscription.value().requestMore(1); // must not throw + sub.subscription.value().request(1); // must not throw } }); } - // Subscription::requestMore(Int) + // Subscription::request(Int) // when Subscription is not cancelled // must register the given number of additional elements to be produced to the respective subscriber // A Publisher // must not call `onNext` - // more times than the total number of elements that was previously requested with Subscription::requestMore by the corresponding subscriber + // more times than the total number of elements that was previously requested with Subscription::request by the corresponding subscriber @Test public void subscriptionRequestMoreMustResultInTheCorrectNumberOfProducedElements() throws Throwable { activePublisherTest(5, new PublisherTestRun() { @@ -241,21 +245,21 @@ public void run(Publisher pub) throws InterruptedException { ManualSubscriber sub = env.newManualSubscriber(pub); - sub.expectNone("Publisher " + pub + " produced value before the first `requestMore`: "); - sub.requestMore(1); - sub.nextElement("Publisher " + pub + " produced no element after first `requestMore`"); + sub.expectNone("Publisher " + pub + " produced value before the first `request`: "); + sub.request(1); + sub.nextElement("Publisher " + pub + " produced no element after first `request`"); sub.expectNone("Publisher " + pub + " produced unrequested: "); - sub.requestMore(1); - sub.requestMore(2); - sub.nextElements(3, env.defaultTimeoutMillis(), "Publisher " + pub + " produced less than 3 elements after two respective `requestMore` calls"); + sub.request(1); + sub.request(2); + sub.nextElements(3, env.defaultTimeoutMillis(), "Publisher " + pub + " produced less than 3 elements after two respective `request` calls"); sub.expectNone("Publisher " + pub + "produced unrequested "); } }); } - // Subscription::requestMore(Int) + // Subscription::request(Int) // when Subscription is not cancelled // must throw a `java.lang.IllegalArgumentException` if the argument is <= 0 @Test @@ -267,21 +271,21 @@ public void run(Publisher pub) throws Throwable { final ManualSubscriber sub = env.newManualSubscriber(pub); env.expectThrowingOf( IllegalArgumentException.class, - "Calling `requestMore(-1)` a subscription to " + pub + " did not fail with an `IllegalArgumentException`", + "Calling `request(-1)` a subscription to " + pub + " did not fail with an `IllegalArgumentException`", new Runnable() { @Override public void run() { - sub.subscription.value().requestMore(-1); + sub.subscription.value().request(-1); } }); env.expectThrowingOf( IllegalArgumentException.class, - "Calling `requestMore(0)` a subscription to " + pub + " did not fail with an `IllegalArgumentException`", + "Calling `request(0)` a subscription to " + pub + " did not fail with an `IllegalArgumentException`", new Runnable() { @Override public void run() { - sub.subscription.value().requestMore(0); + sub.subscription.value().request(0); } }); sub.cancel(); @@ -323,7 +327,7 @@ public void onNext(T element) { } }; env.subscribe(pub, sub); - sub.requestMore(Integer.MAX_VALUE); + sub.request(Integer.MAX_VALUE); sub.cancel(); Thread.sleep(env.defaultTimeoutMillis()); @@ -349,7 +353,7 @@ public void onSubscriptionCancelThePublisherMustEventuallyDropAllReferencesToThe public WeakReference> apply(Publisher pub) throws Exception { ManualSubscriber sub = env.newManualSubscriber(pub); WeakReference> ref = new WeakReference>(sub, queue); - sub.requestMore(1); + sub.request(1); sub.nextElement(); sub.cancel(); return ref; @@ -393,25 +397,25 @@ public void run(Publisher pub) throws InterruptedException { ManualSubscriber sub2 = env.newManualSubscriber(pub); ManualSubscriber sub3 = env.newManualSubscriber(pub); - sub1.requestMore(1); + sub1.request(1); T x1 = sub1.nextElement("Publisher " + pub + " did not produce the requested 1 element on 1st subscriber"); - sub2.requestMore(2); + sub2.request(2); List y1 = sub2.nextElements(2, "Publisher " + pub + " did not produce the requested 2 elements on 2nd subscriber"); - sub1.requestMore(1); + sub1.request(1); T x2 = sub1.nextElement("Publisher " + pub + " did not produce the requested 1 element on 1st subscriber"); - sub3.requestMore(3); + sub3.request(3); List z1 = sub3.nextElements(3, "Publisher " + pub + " did not produce the requested 3 elements on 3rd subscriber"); - sub3.requestMore(1); + sub3.request(1); T z2 = sub3.nextElement("Publisher " + pub + " did not produce the requested 1 element on 3rd subscriber"); - sub3.requestMore(1); + sub3.request(1); T z3 = sub3.nextElement("Publisher " + pub + " did not produce the requested 1 element on 3rd subscriber"); sub3.requestEndOfStream("Publisher " + pub + " did not complete the stream as expected on 3rd subscriber"); - sub2.requestMore(3); + sub2.request(3); List y2 = sub2.nextElements(3, "Publisher " + pub + " did not produce the requested 3 elements on 2nd subscriber"); sub2.requestEndOfStream("Publisher " + pub + " did not complete the stream as expected on 2nd subscriber"); - sub1.requestMore(2); + sub1.request(2); List x3 = sub1.nextElements(2, "Publisher " + pub + " did not produce the requested 2 elements on 1st subscriber"); - sub1.requestMore(1); + sub1.request(1); T x4 = sub1.nextElement("Publisher " + pub + " did not produce the requested 1 element on 1st subscriber"); sub1.requestEndOfStream("Publisher " + pub + " did not complete the stream as expected on 1st subscriber"); diff --git a/tck/src/main/java/org/reactivestreams/tck/ReactiveSubject.java b/tck/src/main/java/org/reactivestreams/tck/ReactiveSubject.java new file mode 100644 index 00000000..4a7ec5a1 --- /dev/null +++ b/tck/src/main/java/org/reactivestreams/tck/ReactiveSubject.java @@ -0,0 +1,8 @@ +package org.reactivestreams.tck; + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; + +public interface ReactiveSubject extends Publisher, Subscriber { + +} diff --git a/tck/src/main/java/org/reactivestreams/tck/SubscriberVerification.java b/tck/src/main/java/org/reactivestreams/tck/SubscriberVerification.java index 9969f122..91e85789 100644 --- a/tck/src/main/java/org/reactivestreams/tck/SubscriberVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/SubscriberVerification.java @@ -1,12 +1,15 @@ package org.reactivestreams.tck; -import org.reactivestreams.spi.Publisher; -import org.reactivestreams.spi.Subscriber; -import org.reactivestreams.spi.Subscription; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.reactivestreams.tck.TestEnvironment.Latch; +import org.reactivestreams.tck.TestEnvironment.ManualPublisher; +import org.reactivestreams.tck.TestEnvironment.ManualSubscriber; +import org.reactivestreams.tck.TestEnvironment.Promise; +import org.reactivestreams.tck.TestEnvironment.Receptacle; import org.testng.annotations.Test; -import static org.reactivestreams.tck.TestEnvironment.*; - public abstract class SubscriberVerification { private final TestEnvironment env; @@ -84,7 +87,7 @@ public void mustNotAcceptAnOnSubscribeEventIfItAlreadyHasAnActiveSubscription() // try to subscribe another time, if the subscriber calls `probe.registerOnSubscribe` the test will fail sub().onSubscribe( new Subscription() { - public void requestMore(int elements) { + public void request(int elements) { env.flop(String.format("Subscriber %s illegally called `subscription.requestMore(%s)`", sub(), elements)); } diff --git a/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java b/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java index fe0c79e1..23c885ad 100644 --- a/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java +++ b/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java @@ -1,9 +1,6 @@ package org.reactivestreams.tck; -import org.reactivestreams.spi.Publisher; -import org.reactivestreams.spi.Subscriber; -import org.reactivestreams.spi.Subscription; -import org.reactivestreams.tck.support.Optional; +import static org.testng.Assert.fail; import java.util.LinkedList; import java.util.List; @@ -12,7 +9,10 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import static org.testng.Assert.fail; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.reactivestreams.tck.support.Optional; public class TestEnvironment { public static final int TEST_BUFFER_SIZE = 16; @@ -182,8 +182,8 @@ public void onComplete() { received.complete(); } - public void requestMore(int elements) { - subscription.value().requestMore(elements); + public void request(int elements) { + subscription.value().request(elements); } public T requestNextElement() throws InterruptedException { @@ -199,7 +199,7 @@ public T requestNextElement(String errorMsg) throws InterruptedException { } public T requestNextElement(long timeoutMillis, String errorMsg) throws InterruptedException { - requestMore(1); + request(1); return nextElement(timeoutMillis, errorMsg); } @@ -212,7 +212,7 @@ public Optional requestNextElementOrEndOfStream(long timeoutMillis) throws In } public Optional requestNextElementOrEndOfStream(long timeoutMillis, String errorMsg) throws InterruptedException { - requestMore(1); + request(1); return nextElementOrEndOfStream(timeoutMillis, errorMsg); } @@ -229,12 +229,12 @@ public void requestEndOfStream(String errorMsg) throws InterruptedException { } public void requestEndOfStream(long timeoutMillis, String errorMsg) throws InterruptedException { - requestMore(1); + request(1); expectCompletion(timeoutMillis, errorMsg); } public List requestNextElements(int elements, long timeoutMillis, String errorMsg) throws InterruptedException { - requestMore(elements); + request(elements); return nextElements(elements, timeoutMillis, errorMsg); } @@ -343,7 +343,7 @@ public void subscribe(Subscriber s) { Subscription subs = new Subscription() { @Override - public void requestMore(int elements) { + public void request(int elements) { requests.add(elements); } @@ -379,7 +379,7 @@ public int nextRequestMore() throws InterruptedException { } public int nextRequestMore(long timeoutMillis) throws InterruptedException { - return requests.next(timeoutMillis, "Did not receive expected `requestMore` call"); + return requests.next(timeoutMillis, "Did not receive expected `request` call"); } public int expectRequestMore() throws InterruptedException { @@ -389,7 +389,7 @@ public int expectRequestMore() throws InterruptedException { public int expectRequestMore(long timeoutMillis) throws InterruptedException { int requested = nextRequestMore(timeoutMillis); if (requested <= 0) { - env.flop(String.format("Requests cannot be zero or negative but received requestMore(%s)", requested)); + env.flop(String.format("Requests cannot be zero or negative but received request(%s)", requested)); return 0; // keep compiler happy } else return requested; @@ -402,7 +402,7 @@ public void expectExactRequestMore(int expected) throws InterruptedException { public void expectExactRequestMore(int expected, long timeoutMillis) throws InterruptedException { int requested = expectRequestMore(timeoutMillis); if (requested != expected) - env.flop(String.format("Received `requestMore(%d)` on upstream but expected `requestMore(%d)`", requested, expected)); + env.flop(String.format("Received `request(%d)` on upstream but expected `request(%d)`", requested, expected)); } public void expectNoRequestMore() throws InterruptedException { @@ -410,7 +410,7 @@ public void expectNoRequestMore() throws InterruptedException { } public void expectNoRequestMore(long timeoutMillis) throws InterruptedException { - requests.expectNone(timeoutMillis, "Received an unexpected call to: requestMore"); + requests.expectNone(timeoutMillis, "Received an unexpected call to: request"); } public void expectCancelling() throws InterruptedException {