Skip to content

Inherit IObservable, IObserver for IPublisher, ISubscriber #10

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
buybackoff opened this issue Jun 22, 2016 · 17 comments
Closed

Inherit IObservable, IObserver for IPublisher, ISubscriber #10

buybackoff opened this issue Jun 22, 2016 · 17 comments

Comments

@buybackoff
Copy link

buybackoff commented Jun 22, 2016

I believe IPublisher and ISubscriber should inherit from IObservable and IObserver. Also, ISubscription should implement IDisposable and Dispose() should be used instead of Cancel(). I played with reactive streams recently here and realized that a standalone implementation of RX Streams makes it impossible to use existing Rx.NET library. However, if IPublisher inherits IObservable, we could do type check and adjust consumers behavior accordingly. So, if a consumer supports reactive streams, it should use Request(n), otherwise Request(Int.MaxValue) should be implicitly called so that IPublisher behaves the same way as IObservable currently does. Here is an unfinished example of how to fallback to old IObservable behavior when IObserver is not ISubscriber. I believe I have seen similar patterns is Java implementation or discussions.

IObservable/IObserver are a part of the standard library, so keeping binary compatibility is quite important.

@viktorklang
Copy link
Contributor

Hi @buybackoff!

How do you propose to deal with the contracts/spec incompatibilities for the methods? (especially for ISubscriber's signals)

Wouldn't AsIObservable on ISubscriber and an AsIObserver on Publisher be less ambiguous?

@buybackoff
Copy link
Author

Aliases could probably work. There is mainly Cancel vs Dispose difference.
On Jun 22, 2016 6:15 PM, "Viktor Klang (√)" [email protected]
wrote:

Hi @buybackoff https://github.com/buybackoff!

How do you propose to deal with the contracts/spec incompatibilities for
the methods? (especially for ISubscriber's signals)

Wouldn't AsIObservable on ISubscriber and an AsIObserver on Publisher be
less ambiguous?


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#10 (comment),
or mute the thread
https://github.com/notifications/unsubscribe/AAd5fEnR85YFWI4tAyxJFRkuIO8pRK5gks5qOVGDgaJpZM4I73d_
.

@viktorklang
Copy link
Contributor

@buybackoff How would that be a problem, you mean?

@buybackoff
Copy link
Author

I mean, there are only two differences: Subscribe signature is void in RS
but returns IDisposable in .NET. And Dispose works like Cancel, so one
could be an alias to another. Then subscription has an additional method
Request, and Subscriber has an additional method OnSubscribe. Everything
else is identical. Return value of Subscribe could be ignored is this is
the same instance that is passed into OnSubscribe method.
On Jun 22, 2016 6:29 PM, "Viktor Klang (√)" [email protected]
wrote:

@buybackoff https://github.com/buybackoff How would that be a problem,
you mean?


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#10 (comment),
or mute the thread
https://github.com/notifications/unsubscribe/AAd5fEeTFj7HwIU5FYzqjnaWnq9A6kbRks5qOVTdgaJpZM4I73d_
.

@cconstantin
Copy link
Contributor

cconstantin commented Jun 22, 2016

@buybackoff as I understand it, the reactive streams interfaces target interoperability between compliant implementations, but they are not necessarily intended to be exposed to users of those implementations. They should be minimal, so they don't limit the choices of implementors.

In akka streams we need a non-generic version of IPublisher and ISubscriber in order to get around variance conflicts, and in an early iteration of reactive-streams-dotnet we had IPublisher<> inherit from IPublisher. This in turn complicated the implementation of an adapter library to RX.NET.

I'd rather not go down that path again.

Implementors can easily wrap or adapt these minimalistic interfaces in order to provide additional functionality.

@buybackoff
Copy link
Author

Well, interfaces support multiple inheritance. IPublisher<> could inherit
both from IPublisher and IObservable.
On Jun 22, 2016 6:36 PM, "Chris Constantin" [email protected]
wrote:

@buybackoff https://github.com/buybackoff as I understand it, the
reactive streams interfaces target interoperability between compliant
implementations, but they are not necessarily intended to be exposed to
users of those implementations. They should be minimal, so they don't limit
the choices of implementors.

In akka streams we need a non-generic version of IPublisher and
ISubscriber in order to get around variance conflicts, and in an early
iteration of reactive-streams-dotnet we had IPublisher<> inherit from
IPublisher. This in turn complicated the implementation of an adapter
library to RX.NET.

I'd rather not go down that path again.


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#10 (comment),
or mute the thread
https://github.com/notifications/unsubscribe/AAd5fIG7WopX6ON-xK8cpsuYEIygCUGHks5qOVaMgaJpZM4I73d_
.

@cconstantin
Copy link
Contributor

cconstantin commented Jun 22, 2016

Yes, but it's quite invasive, in the sense that it increases the burden and constraints on the implementors. And the benefit is minimal, imho.

@buybackoff
Copy link
Author

But without binary compatibility we lose ability to use everything that
depends on IObservable, and fallback to implementations that do not support
Request and backpressure is easy to do in base classes.
On Jun 22, 2016 6:41 PM, "Chris Constantin" [email protected]
wrote:

Yes, but it's quite invasive, in the sense that it increases the burden
and constraints on the implementors.


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#10 (comment),
or mute the thread
https://github.com/notifications/unsubscribe/AAd5fAWhLMGmQbH8y1G0BeWru7Q182Ebks5qOVeqgaJpZM4I73d_
.

@cconstantin
Copy link
Contributor

You can easily adapt observables to reactive streams interfaces and vice-versa I would think. See this as an example: https://github.com/akarnokd/RxAdvancedFlow/blob/c1eb769a06d4118bd340cec8e2558bc3a9ae1240/RxAdvancedFlow/Flowable.cs#L2241

@buybackoff
Copy link
Author

What about implicit operators in some base classes? I mean, it would be
nice to have a reference implementation as a part of this project, and that
implementation should be interoperable with existing Rx stuff.
On Jun 22, 2016 7:04 PM, "Chris Constantin" [email protected]
wrote:

You can easily adapt observables to reactive streams interfaces and
vice-versa I would think. See this as an example:
https://github.com/akarnokd/RxAdvancedFlow/blob/c1eb769a06d4118bd340cec8e2558bc3a9ae1240/RxAdvancedFlow/Flowable.cs#L2241


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#10 (comment),
or mute the thread
https://github.com/notifications/unsubscribe/AAd5fFMOXKQ8kJVEkxHtTVJfBjzsoeaCks5qOVz9gaJpZM4I73d_
.

@marcpiechura
Copy link
Contributor

@buybackoff
A example implementation will be available as well as the TCK when #9 is merged.

I would also prefer to have a simple library that provides the interop API and leave these interfaces as they are, the same way the JVM handles this problem.
Because we would also need to update the TCK to verify that who ever implements the interfaces handles the interop correctly, especially the fact that RX.Net doesn't provide backpressure.
Also I don't like the fact that the .Net interfaces would be different to the ones from the other languages, that would complicate ports from existing Reactive Streams implementations.

Maybe I'm missing something but wouldn't be the only advantage a saved call to ToObservable,... ?

@buybackoff
Copy link
Author

You are right that it is easy to turn Publishers/Subscribers into
Observables/Observers. I am confused by the fact that they are - by the
very meaning and semantics - the same, with just backpressure added,
therefore it is very natural that the former just extends the later, not
replaces or provides an alternative.

On Thu, Jun 23, 2016 at 5:20 PM, Marc Piechura [email protected]
wrote:

@buybackoff https://github.com/buybackoff
A example implementation will be available as well as the TCK when #9
#9 is
merged.

I would also prefer to have a simple library that provides the interop API
and leave these interfaces as they are, the same way the JVM handles this
problem.
Because we would also need to update the TCK to verify that who ever
implements the interfaces handles the interop correctly, especially the
fact that RX.Net doesn't provide backpressure.
Also I don't like the fact that the .Net interfaces would be different to
the ones from the other languages, that would complicate ports from
existing Reactive Streams implementations.

Maybe I'm missing something but wouldn't be the only advantage a saved
call to ToObservable,... ?


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#10 (comment),
or mute the thread
https://github.com/notifications/unsubscribe/AAd5fFZZM_BO57ERDDGILdWN5EuSnXY4ks5qOpYXgaJpZM4I73d_
.

@marcpiechura
Copy link
Contributor

Fair point, maybe @viktorklang can explain the reasons why these new interfaces were created instead of using the ones from RX.

I think it's not only the backpressure but also the fact that RX was designed as push based model while the reactive streams provide a dynamic push/pull behavior via the demand which is send upstream.

@akarnokd
Copy link
Contributor

The problem with the IObservable.Subscribe is that it doesn't allow synchronous cancellation of the stream and forces all sorts of async swing-arounds in Rx.NET, degrading performance by 10-100x in some cases. Here is an example how to covert from an IObserable and how to appear as IObservable. Also Rx.NET doesn't support backpressure.

@buybackoff
Copy link
Author

Could you please elaborate on 10-100x perf difference? We have agreed that conversion is an easy thing to do even without inheritance and could close this issue, but I do not understand this perf point.

@akarnokd
Copy link
Contributor

Sure, the following program benchmarks the most basic operation: loop and observe 1M integers:

using Reactor.Core;
using System;
using System.Reactive.Linq;

namespace ConsoleApplication1
{
    class Program
    {
        static int sink;

        static void Rx()
        {
            Observable.Range(1, 1000000).Subscribe(v => { sink = v; });
        }

        static void Rs()
        {
            Flux.Range(1, 1000000).Subscribe(v => { sink = v; });
        }

        static void Throughput(string name, Action a)
        {
            var start = DateTimeOffset.UtcNow;
            var end = start.AddSeconds(1);

            long ops = 0;

            while (end > DateTimeOffset.UtcNow)
            {
                a();
                ops++;
            }

            Console.Write(name);
            Console.Write(": ");
            Console.Write(ops * 1000 / (DateTimeOffset.UtcNow - start).TotalMilliseconds);
            Console.WriteLine(" ops/s");
        }

        static void Main(string[] args)
        {
            Throughput("Reactor.Core", Rs);
            Throughput("Rx.NET", Rx);
            Console.ReadKey();
        }
    }
}

The results on my weak laptop with Celeron 1005:

Reactor.Core: 46,62431525497 ops/s
Rx.NET: 0,594849955642039 ops/s

No wonder it has so much overhead: it schedules itself one-by-one.

@cconstantin
Copy link
Contributor

This can be closed @viktorklang, there's agreement this is not necessary.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants