-
Notifications
You must be signed in to change notification settings - Fork 534
Multicast Requirement #19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
I agree with @benjchristensen here :) |
I fully agree with the part about not mandating multicast: it is a complication that does not pull its own weight (due to there being so many different useful variants that picking a default is too limiting). The remaining question is whether or not it should be allowed to call What you have in mind is the Rx way of the Subscription being the “active” entity in the sense of holding the state relevant between Observable and Observer, but I would prefer this spec to be more general and allowing the Publisher to be the active (asynchronous) entity. Thinking about the user-visible contract I prefer very clear semantics:
This behavior matches Rx practices as far as I can see, the difference is that in Rx the Subject and any other Observable are not distinguished in the type system. In this spec I would propose to reserve Publisher for single-output streams, have an API element that explicitly supports fan-out (à la Rx Subject) and keep the high-level combinators DSL outside the scope as we currently have (i.e. the transformation methods on Observable are library specific and this spec does not say anything about them or their semantics). |
👍 @rkuhn! |
I think a If a I would not add anything to this spec for fan-out, that's for libraries to do. |
There is a relationship between subscribing to a Publisher more than once and fan-out, which divides Publishers into two categories:
As I said above, the semantics must be clear: if I get a Publisher (e.g. as a method argument) then I must know what that means. Saying “it can be used multiple times unless it cannot” violates this constraint. In the second case above there cannot be reuse since the point of this discussion is that we agree that having a fan-out logic in every Publisher is a bad idea. This is a difference to Rx Observable because for Publisher we would have to define how to merge the back pressure streams when splitting the data streams (which we cannot do in a one size fits all fashion). Therefore the only consistent choice is to disallow a Publisher from being reused. Having a source of data which can support multiple sinks is perfectly reasonable, but I don’t think it should be covered by this specification—at least not in the first version. This means that e.g. Observable should have a “toPublisher” method which you can use to get a back pressure aware outlet that can be connected to any Reactive Streams Subscriber, and you would implement it such that you get a new one every time. Would that not solve this issue in a quite natural fashion? |
This just means we're forcing people to use a factory pattern for handing out Rx Observables work this way very elegantly. If I have an A The ability to subscribe multiple times to a Multicast means it is sharing a stream. Ability to subscribe multiple times to a Multicast immediately means that a Here are examples ... 1) Stock Stream - hot, always changingPublisher: Two independent streams are flowing to The 2) Catalog Stream - cold, starts from beginning each timePublisher: In this case the 3) File Stream - cold, loads each timePublisher: In this case the Multicast vs Multiple SubscribeA public class AStreamExample implements Publisher<String> {
public static void main(String[] args) {
AStreamExample stream = new AStreamExample();
MySubscriber s1 = new MySubscriber();
MySubscriber s2 = new MySubscriber();
stream.subscribe(s1);
stream.subscribe(s2);
}
@Override
public void subscribe(final Subscriber<String> subscriber) {
// you wouldn't really implement it like this ... but you get the point
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("Subscribed on thread: " + Thread.currentThread());
BooleanSubscription s = new BooleanSubscription();
subscriber.onSubscribe(s);
// ignoring unsubscribe and requestMore behavior for this simple example
subscriber.onNext("1");
subscriber.onNext("2");
subscriber.onComplete();
}
}).run();
}
} A It is not required to multicast. In fact, it is assumed it will unicast. If a backing stream naturally multicasts (a hot stream of events that never stops) then it could choose as an implementation decision to do multicasting. This ties back to the 3 example use cases above with "hot" and "cold" sources. If we didn't allow multi-subscribe, then the code would need to look like this: public static void main(String[] args) {
MySubscriber s1 = new MySubscriber();
MySubscriber s2 = new MySubscriber();
new AStreamExample().subscribe(s1);
new AStreamExample().subscribe(s2);
} This seems trivial in this case, but put it behind an API and it becomes more clear: public static void main(String[] args) {
MySubscriber s1 = new MySubscriber();
MySubscriber s2 = new MySubscriber();
getStream().subscribe(s1);
getStream().subscribe(s2);
} Do I have to call It gets worse once someone starts passing the stream around. public void doStuff(AStreamExample stream) {
stream.subscribe();
doMoreStuff(stream);
}
public void doMoreStuff(AStreamExample stream) {
// can I subscribe? what if someone else already did?
stream.subscribe();
} Perhaps my original statement was unclear, but I'm not saying a I suggest we remove the multicast requirement for a |
@benjchristensen, thanks for your elaboration, now it is perfectly clear what we are talking about: you are equating Publisher with Observable, perhaps because of similarities in method names and also due to the existence of Subscription as a name shared between Rx and Reactive Streams. To be very blunt and concise: these names have misled you, and I am very sorry about that. The Publisher/Subscriber pair is designed only for one purpose: to transport exactly one stream of elements across an asynchronous boundary (without unbounded buffering etc.). Therefore a Publisher delivers exactly one stream, which is why multiple subscriptions and multicast are identical in this case. Thinking about it some more, it would probably have been better to leave out the Subscription altogether—we had it since we started out from the position of building in multicast as a convenience feature that we have come to realize is detrimental—which would lead us to equate Publisher with an Rx Subscription, leaving the details of what you want to publish out of the scope of this specification. What you want is to have an easy path towards using Reactive Streams as a way to express remote Observables, and what we want is to use it as a way to push data to an Actor without blowing up message queues, and as far as I know everyone agrees on the underlying basic problem set. I’ll try out what the above proposal would look like, and if it makes sense (i.e. passes my plausibility test) then we can continue the discussion on the resulting pull request. Again, I’m sorry that it took so long for me to realize what the root cause of the misunderstanding was. |
Roland, this is not about Observables (though that heavily influences my perspective as I've spent almost 2 years building "reactive stream" applications in production). Let me equate it with In code, I can pass an Since it's pull based, it creates the The Now we flip to a The point I'm making is two-fold:
Therefore, please put aside comparisons to or use cases of Actors and Observables and consider this being an interface in the JDK for decades to come that everyone will implement. In that case, it will no longer be used purely for interop (as it will be now since it's a 3rd-party dependency). If it becomes part of the JDK it will be implemented directly by classes and be the common interface, like Code such as the following should be safe to write: doStuff() {
Publisher p = new PublisherImpl();
doMoreStuff(p)
p.subscribe(subscriber);
}
doMoreStuff(Publisher p) {
p.subscribe(subscriber);
} If we don't allow multi-subscribe behavior this will instead need to be written with a 3rd level such as: doStuff() {
PublisherFactory p = new PublisherFactoryImpl();
doMoreStuff(p)
p.getPublisher().subscribe(subscriber);
}
doMoreStuff(PublisherFactory p) {
p.getPublisher().subscribe(subscriber);
} |
@benjchristensen What about streams that can naturally only be used once? Like a stream of network data for an already opened network connection? Given your suggestion would it be possible to get a IMO either, you would have to allow that this Producer would instantly call |
It's up to the See the 3 use cases I listed above at #19 (comment) |
Yes, I've seen them. That's why I ask. I don't think an open connection fits into any of those three categories. An already open network connection (similar to an already generated Iterator) has very different backpressure needs than something that publishes events. "Reuse" is the thing that doesn't work easily because the backpressure demand of all possible consumers have to be coordinated somehow. So, that's the question: can an open network connection (an iterator, an InputStream) be a |
There are two cases where an open network stream applies.
Assume a "hot" stream where back pressure is not relevant, it's stock prices, mouse events, system metrics or some other thing that continues to fire at you. You can't say "stop", all you can say is "I can't take any more". In that case it's up to the developer to choose whether they want to drop, sample, debounce, buffer (rarely useful in this case). The "requestMore" behavior from a In a "hot" case like this, a Summary:
A "cold" source is very different. In this case the Each Summary:
In short, the |
I think I agree with @benjchristensen, as from the subscriber's point of view it shouldn't matter if a Publisher is hot or cold. I like this in Rx Observables. |
Related to this issue is that I think we should be using the The |
@benjchristensen Well, technically the method is called |
I think I'm identifying more with @benjchristensen 's comments here. I like the example of It also seems to me that we might benefit from a One might argue that simply attempting to do work and finding out after the fact that none was done would be sufficient but in many cases that won't work. e.g. when doing least-busy kinds of routing. At the very least we need a way to determine if no work was done so we can decide to do something about it. Even if it's as simple as |
Whatever it's called ... here is the implementation of public Iterator<E> iterator() {
return new Itr();
} |
@benjchristensen The name is important (otherwise we have had waaay too many naming discussions already, don't you agree?!) My point was that "nobody" would expect a getter to return a new instance each time. And it seems like we agree there?
Publisher.subscribe already creates a new |
@jbrisbin The analogy you are making goes in the wrong direction: the Subscriber does not query the Publisher, it just signals capacity to receive more elements (or “demand”, if we agree on the meaning à la “demand in the market”). A query would be synchronous; if not on the type level then on a semantic level. |
@benjchristensen We are still discussing about different things, I think: the prime problem we want to solve is how to pass data across an asynchronous boundary with non-blocking back pressure. Since this is a bidirectional endeavor, allowing multiple participants on one side of the fence by inversion means allowing multiple participants on the other as well. Splitting a stream means merging the back pressure, merging streams means splitting the back pressure. Now, I think it is entirely reasonable to ask the question of “how do I obtain a source for such asynchronous exchange”, but that question is outside the scope of the initial problem. We should first solve the case of connecting both ends of this async pipe. The reason why I persevere in this case is that, plainly speaking, not every iterator is born of an iterable, and your answer of “just drop data” in the other case (which you called “hot”) is by all means not the only conceivable one. I might well have an incoming TCP stream which does support back pressure at its source, but which I want to consume in parallel in multiple places. There are many strategies of how to connect multiple Subscribers to a logical “Source”, wherefore it just places useless burden on all Publishers if they must implement one of two specific choices. So, coming back for a second try: what does not work for you with the following proposal? trait Publisher[T] { // corresponds to Rx Subscription with back pressure
def subscribe(_: Subscriber[T]): Unit
def signalAdditionalDemand(N: Int): Unit
def cancel(): Unit
}
trait Subscriber[T] {
def onSubscribe(_: Publisher[T]): Unit
def onNext(_: T): Unit
def onError(_: Throwable): Unit
def onComplete(): Unit
}
// and that would be ALL, nothing else in Reactive Streams (remark: in this case we will want to change the names because PubSub is not 1:1, but that is a discussion to be had afterwards) |
I really like how simple and clean this is and it also solves #25. It does however require a 3rd factory type to generate I would suggest this change that makes public interface Publisher<T> {
public void subscribe(Subscriber<T> s);
}
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onCompleted();
}
public interface Subscription {
public void signalAdditionalDemand(int n);
public void cancel();
} I'm still not 100% convinced the It allows the Here is code using those 3 types: import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class Test {
public static void main(String... args) throws InterruptedException {
Publisher<Integer> dataStream = getData();
dataStream.subscribe(new MySubscriber("A"));
Thread.sleep(750);
dataStream.subscribe(new MySubscriber("B"));
}
static Publisher<Integer> getData() {
return new MyDataPublisher();
}
static class MyDataPublisher implements Publisher<Integer> {
@Override
public void subscribe(final Subscriber<Integer> s) {
AtomicInteger i = new AtomicInteger();
Subscription subscription = new Subscription() {
AtomicInteger capacity = new AtomicInteger();
@Override
public void signalAdditionalDemand(int n) {
System.out.println("signalAdditionalDemand => " + n);
if (capacity.getAndAdd(n) == 0) {
// start sending again if it wasn't already running
send();
}
}
private void send() {
System.out.println("send => " + capacity.get());
// this would normally use an eventloop, actor, whatever
new Thread(() -> {
do {
s.onNext(i.incrementAndGet());
} while (capacity.decrementAndGet() > 0);
}).start();
}
@Override
public void cancel() {
capacity.set(-1);
}
};
s.onSubscribe(subscription);
}
}
static class MySubscriber implements Subscriber<Integer> {
final int BUFFER_SIZE = 10;
private final ArrayBlockingQueue<Integer> buffer = new ArrayBlockingQueue<>(BUFFER_SIZE);
private volatile boolean terminated = false;
private final String token;
MySubscriber(String token) {
this.token = token;
}
@Override
public void onSubscribe(Subscription s) {
System.out.println("onSubscribe => request " + BUFFER_SIZE);
s.signalAdditionalDemand(BUFFER_SIZE);
startAsyncWork(s);
}
@Override
public void onNext(Integer t) {
buffer.add(t);
}
@Override
public void onError(Throwable t) {
terminated = true;
throw new RuntimeException(t);
}
@Override
public void onCompleted() {
terminated = true;
}
private void startAsyncWork(Subscription s) {
System.out.println("**** Start new worker thread");
/* don't write real code like this! just for quick demo */
new Thread(() -> {
while (!terminated) {
Integer v = buffer.poll();
try {
Thread.sleep(100);
} catch (Exception e) {
e.printStackTrace();
}
if (buffer.size() < 3) {
s.signalAdditionalDemand(BUFFER_SIZE - buffer.size());
}
if (v != null) {
System.out.println(token + " => Did stuff with v: " + v);
}
}
}).start();
}
}
} |
The output of the above code is:
|
I do like how this makes things clean and simple as well. I'm playing with a reactive Buffer so I'll be trying this out @benjchristensen Is this pushed to your fork yet or somehow otherwise available for me to test against? /cc @smaldini |
Not a fan of |
No, what is pasted above is the entirety of the code. I purposefully did not make anything depend on RxJava so as to keep this completely separate and clean.
Yes, I'm not sold yet on that or the Going to followup in a few minutes with documented APIs that communicate the contract. |
With poorly written docs to try and communicate the contracts: public interface Publisher<T> {
/**
* Request {@link Subscription} to start streaming data.
* <p>
* This is a "factory method" and can be called multiple times, each time starting a new {@link Subscription}.
* <p>
* Each {@link Subscription} will work for only a single {@link Subscriber}.
* <p>
* A {@link Subscriber} should only subscribe once to a single {@link Publisher}.
*
* @param s
*/
public void subscribe(Subscriber<T> s);
}
/**
* Will receive call to {@link #onSubscribe(Subscription)} once after passing an instance of {@link Subscriber} to {@link Publisher#subscribe(Subscriber)}.
* <p>
* No further notifications will be received until {@link Subscription#signalAdditionalDemand(int)} is called.
* <p>
* After signaling demand:
* <ul>
* <li>One or more invocations of {@link #onNext(Object)} up to the maximum number defined by {@link Subscription#signalAdditionalDemand(int)}</li>
* <li>Single invocation of {@link #onError(Throwable)} or {@link #onCompleted()} which signals a terminal state after which no further events will be sent.
* </ul>
* <p>
* Demand can be signalled via {@link Subscription#signalAdditionalDemand(int)} whenever the {@link Subscriber} instance is capable of handling more.
*
* @param <T>
*/
public interface Subscriber<T> {
/**
* Invoked after calling {@link Publisher#subscribe(Subscriber)}.
* <p>
* No data will start flowing until {@link Subscription#signalAdditionalDemand(int)} is invoked.
* <p>
* It is the resonsibility of this {@link Subscriber} instance to call {@link Subscription#signalAdditionalDemand(int)} whenever more data is wanted.
* <p>
* The {@link Publisher} will send notifications only in response to {@link Subscription#signalAdditionalDemand(int)}.
*
* @param s
* {@link Subscription} that allows requesting data via {@link Subscription#signalAdditionalDemand(int)}
*/
public void onSubscribe(Subscription s);
/**
* Data notification sent by the {@link Publisher} in response to requests to {@link Subscription#signalAdditionalDemand(int)}.
*
* @param t
*/
public void onNext(T t);
/**
* Failed terminal state.
* <p>
* No further events will be sent even if {@link Subscription#signalAdditionalDemand(int)} is invoked again.
*
* @param t
*/
public void onError(Throwable t);
/**
* Successful terminal state.
* <p>
* No further events will be sent even if {@link Subscription#signalAdditionalDemand(int)} is invoked again.
*/
public void onCompleted();
}
/**
* A {@link Subscription} represents a one-to-one lifecycle of a {@link Subscriber} subscribing to a {@link Publisher}.
* <p>
* It can only be used once by a single {@link Subscriber}.
* <p>
* It is used to both signal desire for data and cancel demand (and allow resource cleanup).
*
*/
public interface Subscription {
/**
* No events will be sent by a {@link Publisher} until demand is signalled via this method.
* <p>
* It can be called however often and whenever needed.
* <p>
* Whatever has been signalled can be sent by the {@link Publisher} so only signal demand for what can be safely handled.
*
* @param n
*/
public void signalAdditionalDemand(int n);
/**
* Request the {@link Publisher} to stop sending data and clean up resources.
* <p>
* Data may still be sent to meet previously signalled demand after calling cancel as this request is asynchronous.
*/
public void cancel();
} |
Ben, this is starting to look a lot like TCP windowing. subscribe takes a window size argument that is used to create/size a buffer |
I have submitted a pull request that I propose we merge to end this particular thread of discussion: #37 |
@benjchristensen damn it ... it seems like I deleted your comment (summary) by mistake because clicking on the wrong button :( Can someone just copy and paste it from the emails ? Sorry for the mess... It seems like I can not "revert" my change . |
Anyway... I really like what @benjchristensen proposed in his summary and I think it makes the contract quite clear. So I would be quite happy to have the proposal merged in. I also like the naming that @mariusaeriksen proposed here, but as the @benjchristensen pointed out it may be better to think about names after we agree on something. Thanks again to all of you for the hard work on this as I could not keep up with all of it over the last weeks :( |
Roland is mid air afaik and will reply as soon as he can. |
I'm actually not feeling the warm fuzzies about the alternative naming above. To me it reads very IO-oriented which I don't think fits every use case. Although I caveat that by saying: calling things "sources" and "sinks" does have its advantages and is the standardized terminology used in Spring XD. I'm flexible. But I definitely vote for merging the simplification changes (and Gradle build? :D) @benjchristensen proposed so we can get to work using it. |
Looking over the summary that @benjchristensen made (before it disappeared), it seems to hang together quite well. There are some niggly bits with the terms, but the behavior works, I think. It would be good to put the summary back up, if possible.
|
Here's a resurrection of @benjchristensen's comment from github's email: Since no responses over the weekend and this thread is very long and hard to read, I'd like to summarize and ask for folks to weigh in. The proposal is as follows: Contract
TypesNaming of classes and methods are not part of what is being discussed here. That can be argued over after we agree upon behavior :-) package org.reactivestreams;
public interface Publisher<T> {
/**
* Request {@link Subscription} to start streaming data.
* <p>
* This is a "factory method" and can be called multiple times, each time starting a new {@link Subscription}.
* <p>
* Each {@link Subscription} will work for only a single {@link Subscriber}.
* <p>
* A {@link Subscriber} should only subscribe once to a single {@link Publisher}.
*
* @param s
*/
public void subscribe(Subscriber<T> s);
} package org.reactivestreams;
/**
* A {@link Subscription} represents a one-to-one lifecycle of a {@link Subscriber} subscribing to a {@link Publisher}.
* <p>
* It can only be used once by a single {@link Subscriber}.
* <p>
* It is used to both signal desire for data and cancel demand (and allow resource cleanup).
*
*/
public interface Subscription {
/**
* No events will be sent by a {@link Publisher} until demand is signalled via this method.
* <p>
* It can be called however often and whenever needed.
* <p>
* Whatever has been signalled can be sent by the {@link Publisher} so only signal demand for what can be safely handled.
*
* @param n
*/
public void signalAdditionalDemand(int n);
/**
* Request the {@link Publisher} to stop sending data and clean up resources.
* <p>
* Data may still be sent to meet previously signalled demand after calling cancel as this request is asynchronous.
*/
public void cancel();
} package org.reactivestreams;
/**
* Will receive call to {@link #onSubscribe(Subscription)} once after passing an instance of {@link Subscriber} to {@link Publisher#subscribe(Subscriber)}.
* <p>
* No further notifications will be received until {@link Subscription#signalAdditionalDemand(int)} is called.
* <p>
* After signaling demand:
* <ul>
* <li>One or more invocations of {@link #onNext(Object)} up to the maximum number defined by {@link Subscription#signalAdditionalDemand(int)}</li>
* <li>Single invocation of {@link #onError(Throwable)} or {@link #onCompleted()} which signals a terminal state after which no further events will be sent.
* </ul>
* <p>
* Demand can be signalled via {@link Subscription#signalAdditionalDemand(int)} whenever the {@link Subscriber} instance is capable of handling more.
*
* @param <T>
*/
public interface Subscriber<T> {
/**
* Invoked after calling {@link Publisher#subscribe(Subscriber)}.
* <p>
* No data will start flowing until {@link Subscription#signalAdditionalDemand(int)} is invoked.
* <p>
* It is the resonsibility of this {@link Subscriber} instance to call {@link Subscription#signalAdditionalDemand(int)} whenever more data is wanted.
* <p>
* The {@link Publisher} will send notifications only in response to {@link Subscription#signalAdditionalDemand(int)}.
*
* @param s
* {@link Subscription} that allows requesting data via {@link Subscription#signalAdditionalDemand(int)}
*/
public void onSubscribe(Subscription s);
/**
* Data notification sent by the {@link Publisher} in response to requests to {@link Subscription#signalAdditionalDemand(int)}.
*
* @param t
*/
public void onNext(T t);
/**
* Failed terminal state.
* <p>
* No further events will be sent even if {@link Subscription#signalAdditionalDemand(int)} is invoked again.
*
* @param t
*/
public void onError(Throwable t);
/**
* Successful terminal state.
* <p>
* No further events will be sent even if {@link Subscription#signalAdditionalDemand(int)} is invoked again.
*/
public void onCompleted();
} |
Thx
|
Thanks for your feedback @tmontgomery @jbrisbin @mariusaeriksen and @normanmaurer Definitely there are things to continue debating (naming, etc). If you all agree (including @rkuhn once he's available again), I suggest we merge #37 and start new issues to discuss the next round of topics that build on top of what we've agreed upon so far. |
What does that mean? In an asynchronous context, you cannot guarantee that events won't reach the Subscriber concurrently, unless the Subscriber applies back-pressure by means of
While I understand the need for this, maybe it's the Publisher that should ensure that dispatching the next
What this section is basically saying is that the Publisher should respect |
Also related to this ... what happens if we have a Subscriber that an onNext like: public void onNext(T elem) {
process(elem);
dispatchRequestEvent(100);
} I don't see anything in the contract of the Subscriber that disallows this and things get more complicated if the Publisher is allowed to send onNext events following a TL;DR, I'd like to call public void onNext(T elem) {
if (isStillValid(elem)) {
process(elem);
subscription.request(1); // acknowlegement, next please
}
else
subscription.cancel(); // no longer interested, stop please
} |
The discussion for this is now at #41 where the README is being revised. |
@alexandru please refer to #46 for further discussion on the asynchronous semantics of the interfaces. Since this has been split up into multiple topics, and in the light of @benjchristensen’s latest comment, I therefore close this issue. |
Currently the spec states "A Publisher can serve multiple subscribers subscribed dynamically at various points in time. In the case of multiple subscribers the Publisher should respect the processing rates of all of its subscribers (possibly allowing for a bounded drift between them)."
I think this is a mistake to complicate the spec and require implementations to support multicasting and therefore management of subscriptions over time. In fact, I think it should expressly stick to unicast (new lifecycle per
Subscription
).Multicasting techniques should be layered on top by libraries, not required of the
Publisher
instances themselves.For example, each
Subscriber
could result in a new network connection, open file, etc. This would be a basic implementation.In Rx this greatly simplifies things and is a good separation of concern. Multicasting can be added and done with different behaviors such as replaying all of history, a portion of history, the last value only, or ignoring history and just starting from now onwards, etc.
In other words, someone providing me a
Publisher
should not concern themselves with multiple subscriptions, how I want to multicast or other such things. If I subscribe it should start a new lifecycle, emit the data as I request it and stop when I unsubscribe.This keeps the mental model clear, the
Publisher
implementations simple and allows libraries a simple contract to interface with.The text was updated successfully, but these errors were encountered: