Skip to content

API/SPI Combination + Contract Details and Examples #37

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 38 additions & 23 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,11 @@ In summary, Reactive Streams is a standard and specification for Stream-oriented

The Reactive Streams specification consists of the following parts:

**The SPI** defines the interoperablility layer between different implementations.

**The API** specifies the types that the users of Reactive Stream libraries use.
**The API** specifies the types to implement Reactive Streams and achieve interoperablility between different implementations.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to keep calling this the SPI, since we want to express that users are not supposed to implement any of these interfaces directly or invoke anything besides Publisher.subscribe().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why wouldn't a user be able to implement a Subscriber? Yes they need to think about it (invoke subscription.request(n) correctly) but this is not difficult stuff to implement if following the interface.

I think it diminishes this standard if we don't design the types for broad use by all developers, not just "library authors".

Perhaps right now we'll all hide it behind our various library types, but if we truly intend this to become part of a future JDK then these types will be exposed and implemented by many and we should assume this from the start.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I had in mind is somewhat like java.nio.channels.spi.*: sure, some people will implement it, but the vast majority will just use readily made implementations coming with the JDK or other libraries.

When I wrote the above, I had not yet scrolled down to see that you moved things out of the spi package, which makes sense if we assume that we will never add any real end-user API, because then we would mix up different target audiences in the same package (and the NIO example shows that there is precedent in the JDK for keeping these things separate). I’m fine with that, but we should be clear about this decision. In this case the question of API vs SPI becomes purely one of how we talk about it in the documentation, to clarify the intent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some people will implement it, but the vast majority will just use readily made implementations coming with the JDK or other libraries.

I agree with this.

I chose "API" for the combined types since that feels like a generic term to me just to represent the public types we are exposing for the standard once we eliminate a division between SPI and API.

I not only eliminated the spi package, but the api package as well so the types just live directly inside org.reactivestreams.

we should be clear about this decision

Definitely, and that's what I'm proposing this pull request for, to combine the API and SPI into a single set of types that serve the combined purpose.

I fully recognize that most people will use prebuilt implementations. The types though (even if using an implementation) are very good at communicating intent and contract, and are good public interaction points. Similar to how in code we often use Iterable as a type we pass around but rarely implement it directly. This is what I'm aiming for with this.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @benjchristensen here. It is very, very likely that people will want to implement and interact with the base types as users as well as library authors. It also makes the library much more extensible if you can easily extend the base types provided by the library by simply overriding a method and providing your own twist on the functionality without having to implement an entire Reactive Streams implementation. I don't see a downside to exposing these types directly to whomever may wish to implement them in whatever way they wish.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just because something requires thought does not mean we hide it. We just need to make things clear rather than confusing and nuanced.

Also, if the Subscriber is that hard to implement we have failed. See my comment at #37 (comment) for how the Subscriber can be simplified. The Subscription is the more complicated item to implement, and if people are going to do so they need to comply with the contract.

A Subscriber should be dead simple to implement, and it really is.

If we make it the responsibility of Subscription.request to be async, then a Subscriber can directly invoke Subscription.request without event loops, trampolines etc which is currently the most nuanced complication of the spec.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A Subscriber should be dead simple to implement, and it really is.

Again, I quote the same sentence from below: "The Reactive Streams API prescribes that all processing of elements (onNext) or termination signals (onError, onComplete) happens outside of the execution stack of the Publisher."

Maybe there's still a basic misunderstanding about how this rule is meant. Our previous understanding was that all the asynchronous handoff happened inside the onX methods (and the same for request/cancel). So, it previously meant that the caller of the onX method never has to care for scheduling but it's always the responsibility of the callee to ensure proper scheduling. Has this changed?

In this previous sense, I wouldn't call it "dead simple" to implement a Subscriber in a way that complies with this rule.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think "happens outside of the execution stack of the Publisher" needs to be deleted or changed.

The only rule that a Subscriber should have is "don't block".

If it is doing synchronous transformations/filtering/etc, that's fine. If it's blocking a thread on IO, that's not fine.

I should be able to write code like this:

void onNext(T t) {
  queue.offer(transform(t));
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@benjchristensen Assume I use the RingBuffer for dispatching. In your description, the RingBuffer.tryNext() calls would happen in the Subscription and if no slots were available, then that would be the situation where one would need to apply backpressure. The Subscriber would basically be the RingBuffer's EventHandler and be under the same restrictions it is today without Reactive Streams: "don't block".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, my mental picture of that isn't quite clear ... is the RingBuffer the source of the data, or where you're buffering it in the Subscriber?

If in the Subscriber, the Subscription.request will never be larger than slots available in the ring so it would never overflow.

If in the Publisher, then the Subscription would represent the reader (tail pointer) that is following the head as it receives requests via request(n).

if no slots were available, then that would be the situation where one would need to apply back pressure

I don't see in our model how we'd find out we need to apply back pressure once no space. We are requesting up front with a known amount so we should never hit a scenario where we "run out".


***The Technology Compatibility Kit (TCK)*** is a standard test suite for conformance testing of implementations.

Implementations are free to implement additional features not covered by the specification as long as they conform to the API and SPI requirements and pass the tests in the TCK.
Implementations are free to implement additional features not covered by the specification as long as they conform to the API requirements and pass the tests in the TCK.

#### Comparison with related technologies ####

Expand All @@ -50,47 +48,64 @@ Compared to Rx, the SPI described here prescribes a mandatory, non-blocking way

Iteratees are an abstraction used for consuming a stream, often for parsing it. In this sense they are not a stream transformation or combination tool in themselves.

### SPI Components ###

The SPI consists of components that are required to be provided by Reactive Stream implementations but these interfaces should not be exposed to libraries or user code that *use* a Reactive Streams implementation. The reason for this is that the methods used on the SPI level have very strict and rather complex semantic requirements which are likely to be violated by end users.
### API Components ###

The components of the SPI are:
The API consists of the following components that are required to be provided by Reactive Stream implementations:

- Publisher
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly to the above, I think we should encourage implementations to provide users with facilities that keep them from implementing or using the SPI methods directly, meaning that I would like to keep a variation of the text that you have deleted. We can move these considerations to a separate section below, though, since you are right that this concern does not necessarily belong here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above I disagree. We should not try and hide these types. If they are not comprehensible or usable by the industry at large then we have failed.

Most will not need to implement them, just as I don't implement my own Iterable/Iterator very often, but we should not try to hide or restrict this.

It won't be a very useful standard if we intend on keeping it hidden.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, agree with Ben here. Although I do provide my own Iteratable/Iterator all the time. I would expect to do the same in the Reactive Streams implementation by providing anonymous classes rather than concrete implementations when the situation called for it.

- Subscriber
- Subscription

A *Publisher* is a provider of a potentially unbounded number of sequenced elements, publishing them according to the demand received from its Subscriber(s). A Publisher can serve multiple subscribers subscribed dynamically at various points in time. In the case of multiple subscribers the Publisher should respect the processing rates of all of its subscribers (possibly allowing for a bounded drift between them). It must eventually clean up its resources after all of its subscribers have been unsubscribed and shut down. A Publisher will typically support fanning out to multiple Subscribers in order to support the dynamic assembly of processing networks from building blocks that can freely be shared.
A *Publisher* is a provider of a potentially unbounded number of sequenced elements, publishing them according to the demand received from its Subscriber(s).

A *Subscriber* is a component that accepts a sequenced stream of elements provided by a Publisher. At any given time a Subscriber might be subscribed to at most one Publisher. It provides the callback onNext to be called by the upstream Producer, accepting an element that is to be asynchronously processed or enqueued without blocking the Producer.
The protocol of a `Publisher`/`Subscriber` relationship is defined as:

A Subscriber communicates demand to the Publisher via a *Subscription* which is passed to the Subscriber after the subscription has been established. The Subscription exposes the requestMore(int) method that is used by the Subscriber to signal demand to the Publisher. For each of its subscribers the Publisher obeys the following invariant:
```
onError | (onSubscribe onNext* (onError | onComplete)?)
```

*If N is the total number of demand tokens handed to the Publisher P by a Subscriber S during the time period up to a time T, then the number of onNext calls that are allowed to be performed by P on S before T must be less than or equal to N. The number of pending demand tokens must be tracked by the Producer separately for each of its subscribers.*
- The number of `onNext` events emitted by a `Publisher` to a `Subscriber` will at no point in time exceed the cumulative demand that has been signaled via that `Subscriber`’s `Subscription`.
- A `Publisher` can send less events that requested and end the `Subscription` by emitting `onComplete` or `onError`.
- Events sent to a `Subscriber` can only be sent sequentially (no concurrent notifications).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be interpreted as “Subscriber.onNext does not need to be thread-safe”. Is this the intention?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. My perspective is that each stream should be sequential and parallelizing a stream should be an explicit request (out of scope of this interface) to split into multiple streams.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. It does put the burden on the Publisher to ensure proper synchronization, though. That is okay, but since it might be surprising I think it deserves explicit mention.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want something like this?

  • Events sent to a Subscriber can only be sent sequentially (no concurrent notifications). This means a Publisher must ensure correct serialization of events if the data source may emit concurrently.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that works, and it goes hand in hand with the rule that a Subscriber can have at most one Subscription.

- If a `Publisher` fails it must emit an `onError`.
- If a `Publisher` terminates successfully (finite stream) it must emit an `onComplete`.
- If a Publisher signals either `onError` or `onComplete` on a `Subscriber`, that `Subscriber`’s `Subscription` must be considered canceled.
- Once a terminal state has been signaled (`onError`, `onNext`) no further events can be sent.
- Upon receiving a `Subscription.cancel` request it should stop sending events as soon as it can.
- Calling `onError` or `onComplete` is not required after having received a `Subscription.cancel`.
- The `Publisher.subscribe` method can be called as many times as wanted as long as it is with a different `Subscriber` each time. It is up to the `Publisher` whether underlying streams are shared or not. In other words, a `Publisher` can support multi-subscribe and then choose whether each `Subscription` is unicast or multicast.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we mandate that this does not throw exceptions for rejecting subscription requests, meaning that calling onError is the only legal way to actively reject a subscription?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so and that's what I had in mind. I'll make the change and then if anyone else has issues it can be further debated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good

- A `Publisher` can refuse subscriptions (calls to `subscribe`) if it is unable or unwilling to serve them (overwhelmed, fronting a single-use data source, etc) and can do so by calling `Subscriber.onError` instead of `Subscriber.onSubscribe` on the `Subscriber` instance calling `subscribe`.
- A `Publisher` should not throw an `Exception`. The only legal way to signal failure (or reject a `Subscription`) is via the `Subscriber.onError` method.
- The `Subscription.request` method must behave asynchronously (separate thread, event loop, trampoline, etc) so as to not cause a StackOverflow since `Subscriber.onNext` -> `Subscription.request` -> `Subscriber.onNext` can recurse infinitely. This allows a `Subscriber` to directly invoke `Subscription.request` and isolate the async responsibility to the `Subscription` instance which has responsibility for scheduling events.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like this! Makes code like this legit:

public void onNext(T elem) {
   if (isStillValid(elem)) {
     process(elem);
     subscription.request(1); // acknowlegement, next please
  }
  else
     subscription.cancel(); // no longer interested, stop please
}

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another possible approach for async processing is like this with an event loop:

void process() {
   eventLoop.schedule(() -> {
        T t;
        while((t = queue.poll()) != null) {
            doWork(t);
            if(queue.size() < THRESHOLD) {
                subscription.request(queue.capacity());
            }
        }
   })
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In principle we are free to choose either way to break up the potential infinite loop: allowing synchronous Publisher or allowing synchronous Subscriber. What we have currently allows synchronous Publishers and disallows synchronous Subscribers. The reasoning behind that choice was that producing a stream from a strict collection should be cheap. With this proposed change there would be a huge cost for this rather common use case, meaning that streaming a List would necessarily need to involve a task scheduler (e.g. thread pool or actor). Can you please elaborate how this cost is offset and amortized by the benefits of allowing a synchronous Subscriber?


Subscribers that do not currently have an active subscription may subscribe to a Publisher. The only guarantee for subscribers attached at different points in time is that they all observe a common suffix of the stream, i.e. they receive the same elements after a certain point in time but it is not guaranteed that they see exactly the same number of elements. This obviously only holds if the subscriber does not cancel its subscription before the stream has been terminated.

> In practice there is a difference between the guarantees that different publishers can provide for subscribers attached at different points in time. For example Publishers serving elements from a strict collection (“cold”) might guarantee that all subscribers see *exactly* the same elements (unless unsubscribed before completion) since they can replay the elements from the collection at any point in time. Other publishers might represent an ephemeral source of elements (e.g. a “hot” TCP stream) and keep only a limited output buffer to replay for future subscribers.
A *`Subscriber`* is a component that accepts a sequenced stream of elements provided by a `Publisher`. At any given time a `Subscriber` might be subscribed to at most one `Publisher`. It provides the callback `onNext` to be called by the upstream `Publisher`, accepting an element that is to be processed or enqueued without blocking the `Publisher`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For completeness we should also mention here that the Subscriber must not call any methods on Subscription from within the onNext method (i.e. beneath this stack frame), instead it must dispatch such processing asynchronously. This is necessary to avoid stack-overflowing loops of request–onNext–request.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a sentence to it for you to review.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this paragraph (see old line 65 above) the word asynchronous is gone. Before the last part of the sentence was "accepting an element that is to be asynchronously processed or enqueued without blocking the Producer". What's the reason for this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it's misleading to say synchronous or asynchronous here as it's not black or white. The only important thing is not blocking, but that does not mean synchronous processing can't happen.

It is left to later in the document under "Asynchronous Processing" and "Relationship to synchronous stream-processing" to discuss the async/sync relationship.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if we disagree or not. Later this document says "The Reactive Streams API prescribes that all processing of elements (onNext) or termination signals (onError, onComplete) happens outside of the execution stack of the Publisher." This sounds as if it would refer to some previous description and explain it further. With asynchronous gone it seems that there's nothing in this paragraph that would actually spec it. (Maybe it's just a matter of wording.)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @benjchristensen is thinking about a publisher producing a subscription that does this:

new Subscription {
  // ...
  public void request(int nr) {
    AtomicInteger requestedNr = new AtomicInteger(0);

    schedule {
      while (requestedNr.getAndDecrement > 0) 
        subcriber.onNext(something)
    }
  }
}

In the above example Subscription.request is scheduled to run asynchronously, but then individual onNext events in a batch aren't. And because onNext can only happen in response to a request call, that must be asynchronous, then the publisher either has to block its loop, waiting for request to happen, or it has to schedule the batch of onNext to happen asynchronously after request.

I think some examples would do well to shed light on such issues.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's fix this sentence => "The Reactive Streams API prescribes that all processing of elements (onNext) or termination signals (onError, onComplete) happens outside of the execution stack of the Publisher."

It's restrictive, confusing and difficult.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this clearer and more precise:

The Reactive Streams API prescribes that all processing of elements (onNext) or termination signals (onError, onComplete) do not block the Publisher. Each of the on* handlers can process the events synchronously or asynchronously.


- `Subscriber` can be used once-and-only-once to subscribe to a `Publisher`.

At any time the Publisher may signal that it is not able to provide more elements. This is done by invoking onComplete on its subscribers.

> For example a Publisher representing a strict collection signals completion to its subscriber after it provided all the elements. Now a later subscriber might still receive the whole collection before receiving onComplete.
A `Subscriber` communicates demand to the `Publisher` via a *`Subscription`* which is passed to the `Subscriber` after the subscription has been established. The `Subscription` exposes the `request(int)` method that is used by the `Subscriber` to signal demand to the `Publisher`.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the Subscriber should also be able to communicate back to the Publisher that events should stop by means of Subscription.cancel(), which has asynchronous behavior and this mention should go here as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That probably make sense.

In both of these cases I think it really means "don't do work" ... if all that is happening is setting a boolean flag (such as canceled = true) then nothing needs to be scheduled on a separate thread, but the actual work of emitting or unsubscribing should not be done.

What's the best way of describing this?


### API components ###
- A `Subscription` can be used once-and-only-once to represent a subscription by a `Subscriber` to a `Publisher`.
- Calls from a `Subscriber` to `Subscription.request(int n)` can be made directly since it is the responsibility of `Subscription` to handle async dispatching.

The purpose of the API is to provide the types that users interact with directly. SPI methods and interfaces should not be exposed expect for the purpose of writing Reactive Streams implementations.
For each of its subscribers the `Publisher` obeys the following invariant:

The API counterpart for Publisher is *Producer* and for Subscriber is *Consumer*. The combination of these two—a stream processing element with asynchronous input and output—is called *Processor*.
*If N is the total number of demand tokens handed to the `Publisher` P by a `Subscriber` S during the time period up to a time T, then the number of `onNext` calls that are allowed to be performed by P on S before T must be less than or equal to N. The number of pending demand tokens must be tracked by the `Producer` separately for each of its subscribers.*

`Subscriber`s that do not currently have an active subscription may subscribe to a `Publisher`. The only guarantee for subscribers attached at different points in time is that they all observe a common suffix of the stream, i.e. they receive the same elements after a certain point in time but it is not guaranteed that they see exactly the same number of elements. This obviously only holds if the subscriber does not cancel its subscription before the stream has been terminated.

> In practice there is a difference between the guarantees that different publishers can provide for subscribers attached at different points in time. For example Publishers serving elements from a strict collection (“cold”) might guarantee that all subscribers see *exactly* the same elements (unless unsubscribed before completion) since they can replay the elements from the collection at any point in time. Other publishers might represent an ephemeral source of elements (e.g. a “hot” TCP stream) and keep only a limited output buffer to replay for future subscribers.

The only operation supported by any Producer–Consumer pair is their ability to establish a connection for the purpose of transferring the stream of elements from Producer to Consumer; this is achieved by the method `produceTo()`. Concrete implementations of Reactive Streams are expected to offer a rich set of combinators and transformations, but these are not the subject of this specification. The reason is that implementations shall have the freedom to formulate the end-user API in an idiomatic fashion for the respective platform, language and use-case they target.
At any time the `Publisher` may signal that it is not able to provide more elements. This is done by invoking `onComplete` on its subscribers.

In addition there is one method each on Producer and Consumer to obtain a reference to the underlying Publisher or Subscriber, respectively. These are necessary for implementations, but is not to be considered end-user API.
> For example a `Publisher` representing a strict collection signals completion to its subscriber after it provided all the elements. Now a later subscriber might still receive the whole collection before receiving onComplete.

### Asynchronous processing ###

The Reactive Streams SPI prescribes that all processing of elements (onNext) or termination signals (onError, onComplete) happens outside of the execution stack of the Publisher. This is achieved by scheduling the processing to run asynchronously, possibly on a different thread. The Subscriber should make sure to minimize the amount of processing steps used to initiate this process, meaning that all its SPI-mandated methods shall return as quickly as possible.
The Reactive Streams API prescribes that all processing of elements (`onNext`) or termination signals (`onError`, `onComplete`) happens outside of the execution stack of the `Publisher`. This is achieved by scheduling the processing to run asynchronously, possibly on a different thread. The Subscriber should make sure to minimize the amount of processing steps used to initiate this process, meaning that all its API-mandated methods shall return as quickly as possible. Note that this does not mean synchronous processing is not permitted; see "Relationship to synchronous stream-processing" below.

In contrast to communicating back-pressure by blocking the publisher, a non-blocking solution needs to communicate demand through a dedicated control channel. This channel is provided by the Subscription: the subscriber controls the maximum amount of future elements it is willing receive by sending explicit demand tokens (by calling requestMore(int)).
In contrast to communicating back-pressure by blocking the publisher, a non-blocking solution needs to communicate demand through a dedicated control channel. This channel is provided by the `Subscription`: the subscriber controls the maximum amount of future elements it is willing receive by sending explicit demand tokens (by calling request(int)).

#### Relationship to synchronous stream-processing ####

Expand Down
4 changes: 2 additions & 2 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
addSbtPlugin("de.johoop" % "sbt-testng-plugin" % "3.0.0")

addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.4.0")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this plugin is now added twice ;-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh! I'll remove that :-)


addSbtPlugin("de.johoop" % "sbt-testng-plugin" % "3.0.0")
1 change: 1 addition & 0 deletions spi/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/bin
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.reactivestreams.example.multicast;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the examples be located in the spi project? Unnecessary to ship them with the jar files.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe even have their own repo at the top level so we can have examples in various languages, etc...?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The /src/examples folder would be excluded the same way /src/test is. It's just another source folder, that doesn't mean it gets packaged.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with having a top level /examples/ with sub-modules for each language.


import org.reactivestreams.Publisher;

public class MulticastExample {

/**
* Each subscribe will join an existing stream.
*
* @param args
* @throws InterruptedException
*/
public static void main(String... args) throws InterruptedException {
Publisher<Stock> dataStream = new StockPricePublisher();

dataStream.subscribe(new StockPriceSubscriber(5, 500)); // 500ms on each event, infinite
dataStream.subscribe(new StockPriceSubscriber(10, 2000)); // 2000ms on each event, infinite
Thread.sleep(5000);
dataStream.subscribe(new StockPriceSubscriber(10, 111, 20)); // 111ms on each event, take 20
Thread.sleep(5000);
dataStream.subscribe(new StockPriceSubscriber(10, 222, 20));// 222ms on each event, take 20
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package org.reactivestreams.example.multicast;

import java.util.Arrays;
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

/**
* Simulate a network connection that is firing data at you.
* <p>
* Purposefully not using the `Subscriber` and `Publisher` types to not confuse things with `StockPricePublisher`
*/
public class NeverEndingStockStream {

private static final NeverEndingStockStream INSTANCE = new NeverEndingStockStream();

private NeverEndingStockStream() {
}

// using array because it is far faster than list/set for iteration
// which is where most of the time in the tight loop will go (... well, beside object allocation)
private volatile Handler[] handlers = new Handler[0];

public static synchronized void addHandler(Handler handler) {
if (INSTANCE.handlers.length == 0) {
INSTANCE.handlers = new Handler[] { handler };
} else {
Handler[] newHandlers = new Handler[INSTANCE.handlers.length + 1];
System.arraycopy(INSTANCE.handlers, 0, newHandlers, 0, INSTANCE.handlers.length);
newHandlers[newHandlers.length - 1] = handler;
INSTANCE.handlers = newHandlers;
}
INSTANCE.startIfNeeded();
}

public static synchronized void removeHandler(Handler handler) {
// too lazy to do the array handling
HashSet<Handler> set = new HashSet<>(Arrays.asList(INSTANCE.handlers));
set.remove(handler);
INSTANCE.handlers = set.toArray(new Handler[set.size()]);
}

public static interface Handler {
public void handle(Stock event);
}

private final AtomicBoolean running = new AtomicBoolean(false);
private final AtomicLong stockIndex = new AtomicLong();

private void startIfNeeded() {
if (running.compareAndSet(false, true)) {
new Thread(new Runnable() {

@Override
public void run() {
while (handlers.length > 0) {
long l = stockIndex.incrementAndGet();
Stock s = new Stock(l);
for (Handler h : handlers) {
h.handle(s);
}
try {
// just so it is someone sane how fast this is moving
Thread.sleep(1);
} catch (InterruptedException e) {
}
}
running.set(false);
}

}).start();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.reactivestreams.example.multicast;

public class Stock {

private final long l;

public Stock(long l) {
this.l = l;
}

public long getPrice() {
return l;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package org.reactivestreams.example.multicast;

import java.util.concurrent.atomic.AtomicInteger;

import org.reactivestreams.Subscription;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Publisher;
import org.reactivestreams.example.multicast.NeverEndingStockStream.Handler;

/**
* Publisher of stock prices from a never ending stream.
* <p>
* It will share a single underlying stream with as many subscribers as it receives.
* <p>
* If the subscriber can not keep up, it will drop (different strategies could be implemented, configurable, etc).
*/
public class StockPricePublisher implements Publisher<Stock> {

@Override
public void subscribe(final Subscriber<Stock> s) {
s.onSubscribe(new Subscription() {

AtomicInteger capacity = new AtomicInteger();
EventHandler handler = new EventHandler(s, capacity);

@Override
public void request(int n) {
if (capacity.getAndAdd(n) == 0) {
// was at 0, so start up consumption again
startConsuming();
}
}

@Override
public void cancel() {
System.out.println("StockPricePublisher => Cancel Subscription");
NeverEndingStockStream.removeHandler(handler);
}

public void startConsuming() {
NeverEndingStockStream.addHandler(handler);
}

});

}

private static final class EventHandler implements Handler {
private final Subscriber<Stock> s;
private final AtomicInteger capacity;

private EventHandler(Subscriber<Stock> s, AtomicInteger capacity) {
this.s = s;
this.capacity = capacity;
}

@Override
public void handle(Stock event) {
int c = capacity.get();
if (c == 0) {
// shortcut instead of doing decrement/increment loops while no capacity
return;
}
if (capacity.getAndDecrement() > 0) {
s.onNext(event);
} else {
// we just decremented below 0 so increment back one
capacity.incrementAndGet();
}
}
}

}
Loading