From 399ed343b89750c2fadd3d23799ab38381acd7cb Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Mon, 19 May 2014 09:17:46 -0700 Subject: [PATCH] Contract Definition Results of discussions in #40, #19, #37, #41 and #46 --- README.md | 131 ++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 93 insertions(+), 38 deletions(-) diff --git a/README.md b/README.md index ff044502..7051cec6 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ The latest preview release is available on Maven Central as Handling streams of data—especially “live” data whose volume is not predetermined—requires special care in an asynchronous system. The most prominent issue is that resource consumption needs to be carefully controlled such that a fast data source does not overwhelm the stream destination. Asynchrony is needed in order to enable the parallel use of computing resources, on collaborating network hosts or multiple CPU cores within a single machine. -The main goal of Reactive Streams is to govern the exchange of stream data across an asynchronous boundary—think passing elements on to another thread or thread-pool—while ensuring that the receiving side is not forced to buffer arbitrary amounts of data. In other words, backpressure is an integral part of this model in order to allow the queues which mediate between threads to be bounded. The benefits of asynchronous processing would be negated if the communication of backpressure were synchronous (see also the [Reactive Manifesto](http://reactivemanifesto.org/)), therefore care has been taken to mandate fully non-blocking and asynchronous behavior of all aspects of a Reactive Streams implementation. +The main goal of Reactive Streams is to govern the exchange of stream data across an asynchronous boundary – think passing elements on to another thread or thread-pool — while ensuring that the receiving side is not forced to buffer arbitrary amounts of data. In other words, backpressure is an integral part of this model in order to allow the queues which mediate between threads to be bounded. The benefits of asynchronous processing would be negated if the communication of backpressure were synchronous (see also the [Reactive Manifesto](http://reactivemanifesto.org/)), therefore care has been taken to mandate fully non-blocking and asynchronous behavior of all aspects of a Reactive Streams implementation. It is the intention of this specification to allow the creation of many conforming implementations, which by virtue of abiding by the rules will be able to interoperate smoothly, preserving the aforementioned benefits and characteristics across the whole processing graph of a stream application. @@ -34,73 +34,130 @@ In summary, Reactive Streams is a standard and specification for Stream-oriented The Reactive Streams specification consists of the following parts: -**The SPI** defines the interoperablility layer between different implementations. - -**The API** specifies the types that the users of Reactive Stream libraries use. +**The API** specifies the types to implement Reactive Streams and achieve interoperablility between different implementations. ***The Technology Compatibility Kit (TCK)*** is a standard test suite for conformance testing of implementations. -Implementations are free to implement additional features not covered by the specification as long as they conform to the API and SPI requirements and pass the tests in the TCK. +Implementations are free to implement additional features not covered by the specification as long as they conform to the API requirements and pass the tests in the TCK. -#### Comparison with related technologies #### +### API Components ### -In contrast to reactive streams described in this document, a Future represents exactly one element (or a failure) that is produced asynchronosly while streams can provide a potentially unbounded number of elements. +The API consists of the following components that are required to be provided by Reactive Stream implementations: -Compared to Rx, the SPI described here prescribes a mandatory, non-blocking way to handle back-pressure and requires the processing of an element by a dowstream component to be dispatched asynchronously. + - Publisher + - Subscriber + - Subscription -Iteratees are an abstraction used for consuming a stream, often for parsing it. In this sense they are not a stream transformation or combination tool in themselves. +A *Publisher* is a provider of a potentially unbounded number of sequenced elements, publishing them according to the demand received from its Subscriber(s). -### SPI Components ### +In response to a call to `Publisher.subscribe(Subscriber)` the possible invocation sequences for methods on the `Subscriber` are given by the following protocol: -The SPI consists of components that are required to be provided by Reactive Stream implementations but these interfaces should not be exposed to libraries or user code that *use* a Reactive Streams implementation. The reason for this is that the methods used on the SPI level have very strict and rather complex semantic requirements which are likely to be violated by end users. +``` +onError | (onSubscribe onNext* (onError | onComplete)?) +``` -The components of the SPI are: - - Publisher - - Subscriber - - Subscription +NOTE: The specifications below use binding words in CAPLOCKS from https://www.ietf.org/rfc/rfc2119.txt + +#### Subscriber ([Code](https://github.com/reactive-streams/reactive-streams/blob/master/api/src/main/java/org/reactivestreams/Subscriber.java)) + +```java +public interface Subscriber { + public void onSubscribe(Subscription s); + public void onNext(T t); + public void onError(Throwable t); + public void onComplete(); +} +```` -A *Publisher* is a provider of a potentially unbounded number of sequenced elements, publishing them according to the demand received from its Subscriber(s). A Publisher can serve multiple subscribers subscribed dynamically at various points in time. In the case of multiple subscribers the Publisher should respect the processing rates of all of its subscribers (possibly allowing for a bounded drift between them). It must eventually clean up its resources after all of its subscribers have been unsubscribed and shut down. A Publisher will typically support fanning out to multiple Subscribers in order to support the dynamic assembly of processing networks from building blocks that can freely be shared. +- A `Subscriber` MUST NOT block a `Publisher` thread. +- A `Subscriber` MUST signal demand via `Subscription.request` to receive notifications. +- A `Subscriber` MAY behave synchronously or asynchronously but SHOULD NOT synchronously perform heavy computations in its methods (`onNext`, `onError`, `onComplete`, `onSubscribe`). -A *Subscriber* is a component that accepts a sequenced stream of elements provided by a Publisher. At any given time a Subscriber might be subscribed to at most one Publisher. It provides the callback onNext to be called by the upstream Producer, accepting an element that is to be asynchronously processed or enqueued without blocking the Producer. -A Subscriber communicates demand to the Publisher via a *Subscription* which is passed to the Subscriber after the subscription has been established. The Subscription exposes the requestMore(int) method that is used by the Subscriber to signal demand to the Publisher. For each of its subscribers the Publisher obeys the following invariant: +#### Publisher ([Code](https://github.com/reactive-streams/reactive-streams/blob/master/api/src/main/java/org/reactivestreams/Publisher.java)) -*If N is the total number of demand tokens handed to the Publisher P by a Subscriber S during the time period up to a time T, then the number of onNext calls that are allowed to be performed by P on S before T must be less than or equal to N. The number of pending demand tokens must be tracked by the Producer separately for each of its subscribers.* +```java +public interface Publisher { + public void subscribe(Subscriber s); +} +```` -Subscribers that do not currently have an active subscription may subscribe to a Publisher. The only guarantee for subscribers attached at different points in time is that they all observe a common suffix of the stream, i.e. they receive the same elements after a certain point in time but it is not guaranteed that they see exactly the same number of elements. This obviously only holds if the subscriber does not cancel its subscription before the stream has been terminated. +- The number of `onNext` events emitted by a `Publisher` to a `Subscriber` MUST NOT exceed the cumulative demand that has been signaled via that `Subscriber`’s `Subscription`. +- A `Publisher` MAY send less events than requested and terminate the `Subscription` by calling `onComplete` or `onError`. +- Events sent to a `Subscriber` MUST be sent sequentially (no concurrent notifications). +- If a `Publisher` fails it MUST emit an `onError`. +- If a `Publisher` terminates successfully (finite stream) it MUST emit an `onComplete`. +- If a Publisher signals either `onError` or `onComplete` on a `Subscriber`, that `Subscriber`’s `Subscription` MUST be considered canceled. +- Once a terminal state has been signaled (`onError`, `onComplete`) it is REQUIRED that no further events can be sent. +- Upon receiving a `Subscription.cancel` request it SHOULD stop sending events as soon as it can. +- `Subscription`'s which have been canceled SHOULD NOT receive subsequent `onError` or `onComplete` events, but implementations will not be able to strictly guarantee this in all cases due to the intrinsic race condition between actions taken concurrently by `Publisher` and `Subscriber`. +- A `Publisher` SHOULD NOT throw an `Exception`. The only legal way to signal failure (or reject a `Subscription`) is via the `Subscriber.onError` method. +- The `Subscriber.onSubscribe` method on a given `Subscriber` instance MUST NOT be called more than once. +- The `Publisher.subscribe` method MAY be called as many times as wanted but MUST be with a different `Subscriber` each time. +- A `Publisher` MAY support multi-subscribe and choose whether each `Subscription` is unicast or multicast. +- A `Publisher` MAY reject calls to its `subscribe` method if it is unable or unwilling to serve them (e.g. because it is overwhelmed or bounded by a finite number of underlying resources, etc...). If rejecting it MUST do this by calling `onError` on the `Subscriber` passed to `Publisher.subscribe` instead of calling `onSubscribe`". -> In practice there is a difference between the guarantees that different publishers can provide for subscribers attached at different points in time. For example Publishers serving elements from a strict collection (“cold”) might guarantee that all subscribers see *exactly* the same elements (unless unsubscribed before completion) since they can replay the elements from the collection at any point in time. Other publishers might represent an ephemeral source of elements (e.g. a “hot” TCP stream) and keep only a limited output buffer to replay for future subscribers. -At any time the Publisher may signal that it is not able to provide more elements. This is done by invoking onComplete on its subscribers. -> For example a Publisher representing a strict collection signals completion to its subscriber after it provided all the elements. Now a later subscriber might still receive the whole collection before receiving onComplete. +#### Subscription ([Code](https://github.com/reactive-streams/reactive-streams/blob/master/api/src/main/java/org/reactivestreams/Subscription.java)) -### API components ### +```java +public interface Subscription { + public void request(int n); + public void cancel(); +} +```` -The purpose of the API is to provide the types that users interact with directly. SPI methods and interfaces should not be exposed expect for the purpose of writing Reactive Streams implementations. +- A `Subscription` can be used once-and-only-once to represent a subscription by a `Subscriber` to a `Publisher`. +- Calls from a `Subscriber` to `Subscription.request(int n)` can be made directly since it is the responsibility of `Subscription` to handle async dispatching. +- The `Subscription.request` method MUST assume that it will be invoked synchronously and MUST NOT allow unbounded recursion such as `Subscriber.onNext` -> `Subscription.request` -> `Subscriber.onNext`. +- The `Subscription.request` method SHOULD NOT synchronously perform heavy computations. +- The `Subscription.cancel` method MUST assume that it will be invoked synchronously and SHOULD NOT synchronously perform heavy computations. -The API counterpart for Publisher is *Producer* and for Subscriber is *Consumer*. The combination of these two—a stream processing element with asynchronous input and output—is called *Processor*. -The only operation supported by any Producer–Consumer pair is their ability to establish a connection for the purpose of transferring the stream of elements from Producer to Consumer; this is achieved by the method `produceTo()`. Concrete implementations of Reactive Streams are expected to offer a rich set of combinators and transformations, but these are not the subject of this specification. The reason is that implementations shall have the freedom to formulate the end-user API in an idiomatic fashion for the respective platform, language and use-case they target. -In addition there is one method each on Producer and Consumer to obtain a reference to the underlying Publisher or Subscriber, respectively. These are necessary for implementations, but is not to be considered end-user API. -### Asynchronous processing ### +### Asynchronous vs Synchronous Processing ### -The Reactive Streams SPI prescribes that all processing of elements (onNext) or termination signals (onError, onComplete) happens outside of the execution stack of the Publisher. This is achieved by scheduling the processing to run asynchronously, possibly on a different thread. The Subscriber should make sure to minimize the amount of processing steps used to initiate this process, meaning that all its SPI-mandated methods shall return as quickly as possible. +The Reactive Streams API prescribes that all processing of elements (`onNext`) or termination signals (`onError`, `onComplete`) MUST NOT *block* the `Publisher`. However, each of the `on*` handlers can process the events synchronously or asynchronously. -In contrast to communicating back-pressure by blocking the publisher, a non-blocking solution needs to communicate demand through a dedicated control channel. This channel is provided by the Subscription: the subscriber controls the maximum amount of future elements it is willing receive by sending explicit demand tokens (by calling requestMore(int)). +Take this example: -#### Relationship to synchronous stream-processing #### +``` +nioSelectorThreadOrigin map(f) filter(p) consumeTo(toNioSelectorOutput) +``` -This document describes asynchronous, non-blocking backpressure boundaries but in between those boundaries any kind of synchronous stream processing model is permitted. This is useful for performance optimization (eliminating inter-thread synchronization) and it conveniently transports backpressure implicitly (the calling method cannot continue while the call lasts). As an example consider a section consisting of three connected Processors, A, B and C: +It has an async origin and an async destination. Let's assume that both origin and destination are selector event loops. The `Subscription.request(n)` must be chained from the destination to the origin. This is now where each implementation can choose how do do this. - (...) --> A[S1 --> S2] --> B[S3 --> S4 --> S5] --> C[S6] --> (...) +The following uses the pipe `|` character to signal async boundaries (queue and schedule) and `R#` to represent resources (possibly threads). + +``` +nioSelectorThreadOrigin | map(f) | filter(p) | consumeTo(toNioSelectorOutput) +-------------- R1 ---- | - R2 - | -- R3 --- | ---------- R4 ---------------- +``` + +In this example each of the 3 consumers, `map`, `filter` and `consumeTo` asynchronously schedule the work. It could be on the same event loop (trampoline), separate threads, whatever. + +``` +nioSelectorThreadOrigin map(f) filter(p) | consumeTo(toNioSelectorOutput) +------------------- R1 ----------------- | ---------- R2 ---------------- +``` + +Here it is only the final step that asynchronously schedules, by adding work to the NioSelectorOutput event loop. The `map` and `filter` steps are synchronously performed on the origin thread. + +Or another implementation could fuse the operations to the final consumer: + +``` +nioSelectorThreadOrigin | map(f) filter(p) consumeTo(toNioSelectorOutput) +--------- R1 ---------- | ------------------ R2 ------------------------- +``` + +All of these variants are "asynchronous streams". They all have their place and each has different tradeoffs including performance and implementation complexity. + +The Reactive Streams contract allows implementations the flexibility to manage resources and scheduling and mix asynchronous and synchronous processing within the bounds of a non-blocking, asynchronous, push-based stream. -Processor B is implemented in terms of three synchronous steps S3, S4 and S5. When communicating with its upstream Producer A, or its downstream Subscriber C it obeys the asynchronous, back-pressure aware requirements of the SPI, but internally it drives the synchronous stream of S3, S4, S5. -> Please note that processing usually happens pipelined between A, B and C: assuming a stream of elements (E1, E2, E3) A might start processing E2 while C still processes E1. On the other hand inside A execution can be completely synchronous, so E3 might be only processed by S1 until E2 has left S2. ### Subscriber controlled queue bounds ### @@ -119,8 +176,6 @@ These bounds must be respected by a publisher independent of whether the source Subscribers signaling a demand for one element after the reception of an element effectively implement a Stop-and-Wait protocol where the demand signal is equivalent to acknowledgement. By providing demand for multiple elements the cost of acknowledgement is amortized. It is worth noting that the subscriber is allowed to signal demand at any point in time, allowing it to avoid unnecessary delays between the publisher and the subscriber (i.e. keeping its input buffer filled without having to wait for full round-trips). -> Systems that use a signal to notify the publisher to suspend publishing cannot guarantee bounded queues. Since there is a delay between the time at which the signal has been raised and when it is processed, there is a window of time during which an arbitrary number of elements can be passed to the subscriber. - ## Legal This project is a collaboration between engineers from Netflix, Twitter, RedHat, Pivotal, Typesafe and many others. The code is offered to the Public Domain in order to allow free use by interested parties who want to create compatible implementations. For details see `COPYING`.