-
Notifications
You must be signed in to change notification settings - Fork 28
Inherit IObservable, IObserver for IPublisher, ISubscriber #10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Hi @buybackoff! How do you propose to deal with the contracts/spec incompatibilities for the methods? (especially for ISubscriber's signals) Wouldn't AsIObservable on ISubscriber and an AsIObserver on Publisher be less ambiguous? |
Aliases could probably work. There is mainly Cancel vs Dispose difference.
|
@buybackoff How would that be a problem, you mean? |
I mean, there are only two differences: Subscribe signature is void in RS
|
@buybackoff as I understand it, the reactive streams interfaces target interoperability between compliant implementations, but they are not necessarily intended to be exposed to users of those implementations. They should be minimal, so they don't limit the choices of implementors. In akka streams we need a non-generic version of I'd rather not go down that path again. Implementors can easily wrap or adapt these minimalistic interfaces in order to provide additional functionality. |
Well, interfaces support multiple inheritance. IPublisher<> could inherit
|
Yes, but it's quite invasive, in the sense that it increases the burden and constraints on the implementors. And the benefit is minimal, imho. |
But without binary compatibility we lose ability to use everything that
|
You can easily adapt observables to reactive streams interfaces and vice-versa I would think. See this as an example: https://github.com/akarnokd/RxAdvancedFlow/blob/c1eb769a06d4118bd340cec8e2558bc3a9ae1240/RxAdvancedFlow/Flowable.cs#L2241 |
What about implicit operators in some base classes? I mean, it would be
|
@buybackoff I would also prefer to have a simple library that provides the interop API and leave these interfaces as they are, the same way the JVM handles this problem. Maybe I'm missing something but wouldn't be the only advantage a saved call to ToObservable,... ? |
You are right that it is easy to turn Publishers/Subscribers into On Thu, Jun 23, 2016 at 5:20 PM, Marc Piechura [email protected]
|
Fair point, maybe @viktorklang can explain the reasons why these new interfaces were created instead of using the ones from RX. I think it's not only the backpressure but also the fact that RX was designed as push based model while the reactive streams provide a dynamic push/pull behavior via the demand which is send upstream. |
The problem with the |
Could you please elaborate on 10-100x perf difference? We have agreed that conversion is an easy thing to do even without inheritance and could close this issue, but I do not understand this perf point. |
Sure, the following program benchmarks the most basic operation: loop and observe 1M integers: using Reactor.Core;
using System;
using System.Reactive.Linq;
namespace ConsoleApplication1
{
class Program
{
static int sink;
static void Rx()
{
Observable.Range(1, 1000000).Subscribe(v => { sink = v; });
}
static void Rs()
{
Flux.Range(1, 1000000).Subscribe(v => { sink = v; });
}
static void Throughput(string name, Action a)
{
var start = DateTimeOffset.UtcNow;
var end = start.AddSeconds(1);
long ops = 0;
while (end > DateTimeOffset.UtcNow)
{
a();
ops++;
}
Console.Write(name);
Console.Write(": ");
Console.Write(ops * 1000 / (DateTimeOffset.UtcNow - start).TotalMilliseconds);
Console.WriteLine(" ops/s");
}
static void Main(string[] args)
{
Throughput("Reactor.Core", Rs);
Throughput("Rx.NET", Rx);
Console.ReadKey();
}
}
} The results on my weak laptop with Celeron 1005:
No wonder it has so much overhead: it schedules itself one-by-one. |
This can be closed @viktorklang, there's agreement this is not necessary. |
I believe
IPublisher
andISubscriber
should inherit fromIObservable
andIObserver
. Also,ISubscription
should implementIDisposable
andDispose()
should be used instead ofCancel()
. I played with reactive streams recently here and realized that a standalone implementation of RX Streams makes it impossible to use existing Rx.NET library. However, ifIPublisher
inheritsIObservable
, we could do type check and adjust consumers behavior accordingly. So, if a consumer supports reactive streams, it should useRequest(n)
, otherwiseRequest(Int.MaxValue)
should be implicitly called so that IPublisher behaves the same way as IObservable currently does. Here is an unfinished example of how to fallback to oldIObservable
behavior whenIObserver
is notISubscriber
. I believe I have seen similar patterns is Java implementation or discussions.IObservable
/IObserver
are a part of the standard library, so keeping binary compatibility is quite important.The text was updated successfully, but these errors were encountered: