Skip to content

Remove Processor Interface #22

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
benjchristensen opened this issue Apr 15, 2014 · 54 comments
Closed

Remove Processor Interface #22

benjchristensen opened this issue Apr 15, 2014 · 54 comments

Comments

@benjchristensen
Copy link
Contributor

What is the purpose of Processor being in the API?

Based on my reading of the code and understanding of the intents of this API, I suggest removing the Processor class: https://github.com/reactive-streams/reactive-streams/blob/master/spi/src/main/java/org/reactivestreams/api/Processor.java

It does not seem relevant to the purpose of this project. Libraries such as Akka, RxJava, Reactor, Finagle etc can and will layer logic on top of the streams of data and integrate via the Subscriber class to receive and transform data.

@viktorklang
Copy link
Contributor

It is crucial to be able to represent a detached "stage" (having an inlet and an outlet), and since Java has a, erm, "rustic" type system, you cannot represent Consumer[T] with Producer[T], which means that you need an interface that extends both.
Hence, Processor.

@benjchristensen
Copy link
Contributor Author

Please provide some pseudo code as I don't see how the interop goals have anything to do with detached stages.

@smaldini
Copy link
Contributor

I will feed the thread with some reactor code which implements Processor.

Sent from my iPhone

On 15 Apr 2014, at 17:12, Ben Christensen [email protected] wrote:

Please provide some pseudo code as I don't see how the interop goals have anything to do with detached stages.


Reply to this email directly or view it on GitHub.

@viktorklang
Copy link
Contributor

Pseudo:

   def wrap[T](p: Processor[T]): Processor[T] = doSomethingBefore() produceTo p produceTo doSomethingAfter()

@benjchristensen
Copy link
Contributor Author

Sorry I'm so slow, but I'm still not seeing what that example has to do with interop between libraries.

Can you give me an example of how Akka -> Rx -> Reactor would need Processor?

@viktorklang
Copy link
Contributor

I'm thinking:

Rx -> AnyGenericReactiveStreamsImpl -> Rx

as in:

  def splice[T](anyGenericReactiveStreamsImpl: Processor[T]) =
    doRxyThings() subscribe anyGenericReactiveStreamsImpl subscribe doMoarRxyThings()

As mentioned before, the only reason it is needed is because Scala is the only(?) jvm-lang with a typesystem that supports saying Subscriber with Publisher without requiring that to be a concrete type, i.e. interface Processor extends Subscriber with Publisher.

The alternative encoding would be to do something runtime-fail-y like:

   def splice[T](genericSubscriber: Subscriber[T], genericPublisher: Publisher[T]) = {
       assert(genericSubscriber == genericPublisher, "You broke it")
       doRxyThings() subscribe genericSubscriber
       genericPublisher subscribe doMoarRxyThings()
    }

Thoughts?

@benjchristensen
Copy link
Contributor Author

Would it be correct to say that the only reason this would be needed is if we are wanting to define interoperable transformations? If so, that's beyond the original scope of allowing interop of streams but moves into interoperable stream processing.

@viktorklang
Copy link
Contributor

@benjchristensen No it's just a consequence of a Publisher also being able to be a Subscriber.

@rkuhn
Copy link
Member

rkuhn commented Apr 17, 2014

I think I understand what Ben means: passing around a Processor means that I give someone a piece of hose to fit it into a pipeline, which emphasizes the transformation being passed around. We have such a case in our TCP code, when you connect outwards you get back a Processor since you can view the server end as a transformation you send ByteStrings through. It would not be a big difference to say that you get a pair of Publisher and Subscriber instead, I think.

The main question here is whether or not we will see many incompatible types being created in implementations, and I think the answer is yes: if you provide a DSL for creating some asynchronous processing steps, then the return type of the “create me the machine” method must return both interfaces at once (with Scala being the only possible exception).

By not providing this type in the SPI we will foster the creation of many copies that only inter-operate single-sidedly (i.e. you can pass an Akka Processor as either a Publisher or Subscriber, but there is no shared Processor).

I would argue for keeping this unification since most implementations will otherwise create one.

@benjchristensen
Copy link
Contributor Author

This conversation has long since been superseded by discussions in #37 and #19

@benjchristensen
Copy link
Contributor Author

Reopening as this discussion has been revived: #56 (comment)

@benjchristensen
Copy link
Contributor Author

@smaldini

I am regretting the loss of Processor but we can leave without it for now. Not sure why that one has been ditched tho.

@benjchristensen

Why does Processor need to be part of the spec? Each library will have different ways of applying operators, processors, whatever. Interop will occur naturally for a Processor of any library that implements the correct Subscriber/Publisher interfaces.

@smaldini

It's just that it is very convenient for combining libraries together or modularising streams, nothing about forcing a given transformation. If a library provide an arbitrary operation, I'd just like to be able to lift a current publisher with it and still retrieve it's output, like a logical RequestReply pattern (in fact it would look like this way in tcp application of the spec).

@benjchristensen
Copy link
Contributor Author

I envision it like this, where only the reactiveStream is defined by the spec:

For example in RxJava I could do this:

Observable.from(reactiveStream).lift(processor_operator_function).subscribe(function)

The lift capability of Rx is not something I expect other libraries to adopt, nor should we impose it on the spec.

The original Processor as defined in v0.3 is basically what Rx calls a Subject, as it implements both Subscriber and Publisher. It is nothing more than that. If that's all it is, we don't need a special type for that, anything that happens to implement both of those magically becomes this "thing" (which I view as a Subject and not a Processor).

In Rx a Processor is very different than a Subject. This is the signature:

public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>>

versus a Subject:

public abstract class Subject<T, R> extends Observable<R> implements Observer<T>
... or in reactive-stream terminology ...
public interface Subject<T, R> extends Publisher<R>, Subscriber<T>

This Operator type is more interesting than Subject as it is something new, not just a combination of Publisher and Subscriber. So if we are going to make a Processor part of the interface, I'd argue it needs to be like the Operator above. We wouldn't want to rely on a function type, and the JVM doesn't have a standard for that, so we'd need to just create a full definition such as:

public interface Processor<R, T> {
    public Subscriber<? super T> call(final Subscriber<? super R> o);
}

But then, for this to be standardized across all libraries (the only point of including Processor in the spec), we'd need to add a lift (ignore name bike shedding for now) method to Publisher.

At that point I feel like we are dictating far too much about how implementations will allow processing of data, or whether they'll even allow user defined operators, or only the ones provided by their library.

Thus the Subject-style Processor is simple enough as to not need a definition, and the Operator-stye Processor requires dictating too much of the implementation. For these reasons I don't think we should include a Processor interface in the spec, even though both are useful and important to library implementations.

@smaldini
Copy link
Contributor

Cheers Ben for the re-open and the comment. From an operator perspective it makes sense, however I think this is not strictly speaking an operator or maybe it is a Reactive Operator. I called lift because functionally speaking this seems to achieve the same goal somehow.
A Processor is a function combinator (like the mathematical counterpart) and is a way to compose streams, not to define an operation in particular. In fact it can hide N operators under the hood, but the point is that this combination is also fulfilling the contract. I would also dub it "Reactive Server" or "Reactive Blackbox" or "(f o g)(x) = f(g(x))" as its purpose is to hide the implementation, offering reactive input (onNext etc) and reactive output (subscribe()).

Processor<Integer,Integer> identityProcessor =
                                Action<Integer,Integer>.map(integer -> integer)
                .distinctUntilChanged()
                .scan(Tuple2<Integer, Integer>::getT1)
                .filter(integer -> integer >= 0)
                .combine();
                //combine is creating an Action (implements Processor) where its subscribe delegates to its downstream Publisher
                // and where its onNext/onError/onSubscribe/onComplete delegates to its upstream Subscriber

Streams
    .defer(Arrays.asList(1,2,3))
    .connect(identityProcessor) //could be AkkaFlow, Subject (?), Action
        .subscribe(reactOnIdentityProcessorSubscriber);

In my mind the purpose is to have the flexibility to package head and tail of a Stream under a single object and in a coherent way across implementations. I stand corrected tho, having played with it the past couple of month it looked like it fitted well for a relative free cost. Since it is maybe more niche than Publisher/Subscriber, could it be optional for a given implementation ? I am pretty sure this could be appreciated by a few tho, especially in distributed environment where you could hide the request-reply / reply-to pattern under this simple interface.

@danarmak
Copy link

@benjchristensen, you said:

The original Processor as defined in v0.3 is basically what Rx calls a Subject, as it implements both Subscriber and Publisher. It is nothing more than that. If that's all it is, we don't need a special type for that, anything that happens to implement both of those magically becomes this "thing"

In Scala we can declare a type Subscriber with Publisher, but not in Java, so how would implementations "magically become this thing"?

If two different Java implementations of Reactive Streams have their own types which both look like interface Processor extends Subscriber extends Publisher, they will still not be interoperable directly but need trivial wrappers. That trivial inconvenience may be a small reason to include such an interface in the spec.

@igstan
Copy link

igstan commented May 22, 2014

There is a way to mimic Subscriber with Publisher in Java using multiple bounds. It's more limited though. It's impossible to use this trick to declare the type of a field without adding a new generic type to the class.

public <T, Processor extends Subscriber<T> & Pulisher<T>> void quux(Processor p) {}

@danarmak
Copy link

I didn't know that. Thanks, and sorry for raising irrelevant objections.

@viktorklang
Copy link
Contributor

👍 on adding Processor. @igstan's trick is very neat but it doesn't work out in practice:

Exhibit A:

✗ cat Foo.java
public class Foo {
  public interface Pig<T> {}
  public interface Dog<t> {}

  public static <T, Pigdog extends Pig<T> & Dog<T>> Pigdog foo(Pigdog pd) { return pd; }

  public static void main(Strin[] args) {
    Pig<Integer> & Dog<Integer> pd = foo(null);
  }
}

✗ javac Foo.java
Foo.java:8: error: not a statement
    Pig<Integer> & Dog<Integer> pd = foo(null);
       ^
Foo.java:8: error: ';' expected
    Pig<Integer> & Dog<Integer> pd = foo(null);
                ^
2 errors

Arguments for Processor (needs counter-arguments):

  1. Java's type system is not sophisticated enough to represent Publisher<T> with Subscriber<R> without introducing a new, concrete, type. (see Exhibit A)
  2. Processor<T,R> is as ubiquitous as Function<T,R>, it is a means of abstracting/composing detached processing stages.
  3. We already agree that many implementations will have to implement Processor—with poor to no possibility of offering generic combinators over Processors (since they will not have the same JVM type)
  4. It is easier (quicker adoption) to deprecate the Processor in Reactive Streams if deemed needed than to get all other implementations to deprecate their Processor implementations and switch to Reactive Streams Processor if we decide to introduce it later.
  5. Having Processor in the spec makes it possible for us to add specification to it if needed, to avoid diverging behaviors if implementations define their own Processor type.
  6. It is 1 line of Java code—extremely cheap to maintain.

Looking forward to seeing these arguments shot down.

Cheers,

@smaldini
Copy link
Contributor

Well I'm inline with Viktor on this. +1

Sent from my iPhone

On 22 May 2014, at 10:49, Viktor Klang (√) [email protected] wrote:

on adding Processor. @igstan's trick is very neat but sadly it doesn't really work out in practice:

Exhibit A:

✗ cat Foo.java
public class Foo {
public interface Pig {}
public interface Dog {}

public static <T, Pigdog extends Pig & Dog> Pigdog foo(Pigdog pd) { return pd; }

public static void main(Strin[] args) {
Pig & Dog pd = foo(null);
}
}

✗ javac Foo.java
Foo.java:8: error: not a statement
Pig & Dog pd = foo(null);
^
Foo.java:8: error: ';' expected
Pig & Dog pd = foo(null);
^
2 errors
Arguments for Processor (needs counter-arguments):

  1. Java's type system is not sophisticated enough to represent Publisher with Subscriber without introducing a new, concrete, type. (see Exhibit A)
  2. Processor is as ubiquitous as Function, it is a means of abstracting/composing detached processing stages.
  3. We already agree that many implementations will have to implement Processor—with poor to no possibility of offering generic combinators over Processors (since they will not have the same JVM type)
  4. It is easier (quicker adoption) to deprecate the Processor in Reactive Streams if deemed needed than to get all other implementations to deprecate their Processor implementations and switch to Reactive Streams Processor if we decide to introduce it later.
  5. Having Processor in the spec makes it possible for us to add specification to it if needed, to avoid diverging behaviors if implementations define their own Processor type.
  6. It is 1 line of Java code—extremely cheap to maintain.

Looking forward to seeing these arguments shot down.

Cheers,


Reply to this email directly or view it on GitHub.

@rkuhn
Copy link
Member

rkuhn commented May 22, 2014

Ben, as much as I understand where you are coming from, you are neglecting an important detail: the JVM’s intrinsic type system (i.e. not even Java’s but the byte-code one) is fully nominal, which means that implementing both Subscriber and Publisher does not magically make this a Processor-like thing that can be shared. Every library that introduces such a type by necessity makes a new one that is not compatible with any of the others.

The (very useful) operation that you then cannot write generically is def append(p: Processor[T, U]): Publisher[U] (on a Publisher[T]), because a Reactor.Processor cannot stand in for an Akka.Processor even though they have the exact same structure—due to their different names.

I am convinced that we will see multiple libraries creating and using variations of this type, and even if RxJava will never use it this will still be a desirable addition.

Towards your description of Rx Operators: this is not as generically useful, it is more a detail of how you choose to design your DSL (which means that I fully agree with your assessment that Rx’s Operator is not a good fit for Reactive Streams in general). A TCP client connection would be perfectly represented as a Processor[ByteString, ByteString], because that makes it obvious that we are dealing with two asynchronous boundaries (which is indisputable in this case) while using the Operator terminology does not fit this bill.

@benjchristensen
Copy link
Contributor Author

so how would implementations "magically become this thing"?

Ah, so if I understand correctly, this is about supporting a method that takes a Processor. If that's the case, then yes, Java can't do that.

My point was that it is fully possible in Java to just have a type that implements Publisher, Subscriber so we didn't need a type to do that.

But if we want to support having a method like this:

void doStuff(Type that extends Publisher, Subscriber)

then yes, Java can't do that and we'd need a type that unites them.

@rkuhn I'm convinced that the type is useful if trying to do the above.

@benjchristensen
Copy link
Contributor Author

Are you thinking it would be optional for libraries to implement this? If it is required, must the Publisher have a method like process(Processor p)?

@benjchristensen
Copy link
Contributor Author

obvious that we are dealing with two asynchronous boundaries

No, the Processor does not do that. It can be fully synchronous as per decisions in #46 and often will be (such as filter and map).

The Operator and Processor types both support synchronous and asynchronous processing.

@danarmak
Copy link

@benjchristensen, isn't publisher.process(processor) equivalent to:

publisher.subscribe(processor);
return processor // As the next Publisher

If so, the method doesn't need to be included in the spec.

@benjchristensen
Copy link
Contributor Author

Regarding concurrency ...

Can you demonstrate this problem? Following the contract/spec is not unique
to Processor.

What do you mean about demonstrating this? Just have two threads call onNext. I'm not saying the contract doesn't cover it, it's just not good design to make the spec so easy to break. I know this from the experience of watching people try and use the Rx Subject type. It is a sharp, double-edge sword. And this Processor signature is the exact same thing as `Subject.

@benjchristensen
Copy link
Contributor Author

subscribe from a Subscriber and then subscribe upwards. This means extra
object allocations and complexity.

Can you prove that? I don't see how that follows its definition currently.

Sure ... it'll take some time to write the code matching this spec, but we spent weeks figuring this out in the refactoring from RxJava 0.16 -> 0.17 when we migrated away from exactly what this Processor type signature requires.

@benjchristensen
Copy link
Contributor Author

Also, no one has contested my 6
Processor arguments yet, I'm looking forward to having them put to the
test, so to say.

What do you mean by this? What "6 Processor" arguments have not been contested?

@viktorklang
Copy link
Contributor

On Thu, May 22, 2014 at 6:05 PM, Ben Christensen
[email protected]:

It's pretty good to just have it a part of the spec so that if someone
does a .NET impl they also have to retain it?

Sorry, by "spec" I meant the contract. It can definitely live as a type.
There just aren't any requirements or rules that we need other than the
fact that this type exists.

Then I totally agree! :)


Reply to this email directly or view it on GitHubhttps://github.com//issues/22#issuecomment-43908734
.

Cheers,

@viktorklang
Copy link
Contributor

On Thu, May 22, 2014 at 6:07 PM, Ben Christensen
[email protected]:

Regarding concurrency ...

Can you demonstrate this problem? Following the contract/spec is not unique
to Processor.

What do you mean about demonstrating this? Just have two threads call
onNext. I'm not saying the contract doesn't cover it, it's just not good
design to make the spec so easy to break. I know this from the experience
of watching people try and use the Rx Subject type. It is a sharp,
double-edge sword. And this Processor signature is the exact same thing
as `Subject.

Implementing the spec properly requires reading and understanding the spec,
If you implement a Publisher there are rules to follow, this is besides the
point of Processor. To make it easy/simple for end-users to implement
custom Processors/Publishers/Subscribers the libraries can offer
shims/formulas.

def safeSubscriber[T](us: Subscriber[T], logger: Logger): Subscriber[T] =
new Subscriber {
override def onSubscribe(subscription: Subscription) = try
us.onSubscribe(subscription) catch {
case NonFatal(t) => onError(new IllegalStateException("Reactive Streams
Specification violation: onSubscribe threw an exception", t))
}

override def onNext(element: T) = try us.onNext(element) catch {
case NonFatal(t) => onError(new IllegalStateException("Reactive Streams
Specification violation: onNext threw an exception", t))
}

override def onError(throwable: Throwable) = try us.onError(throwable)
catch {
case NonFatal(t) => logger.error(t, "Reactive Streams Specification
violation: onError for {} threw an exception while handling {}.", us,
throwable)
}

… and so on and so forth
}


Reply to this email directly or view it on GitHubhttps://github.com//issues/22#issuecomment-43909024
.

Cheers,

@danarmak
Copy link

@benjchristensen I think I caused a misunderstanding:

Yes, in a hot stream subscription order doesn't matter. In a cold stream though it does matter. If I start streaming a file and have subscribed at the top already, but the bottom of the sequence is not yet subscribed, I'll lose data.

I may have misused the term 'cold publisher'. I meant the following concept:

A publisher that never drops any elements. It waits for the subscribers to request more before publishing each element. If there are multiple subscribers, it waits for all of them (so the slowest one determines the overall speed). If there are no subscribers, it waits for someone to subscribe before publishing at all.

This is basically a description of the behavior of a part of my implementation. I think it emerges naturally from the weaker concept of a 'cold publisher'.

Suppose you have a function that creates a Publisher[ByteString] representing a file. If it starts publishing right away, you'll miss data before anyone can subscribe. So either it should not publish until the first subscriber is subscribed (what I do), or the function that creates the Publisher must take the subscriber as a parameter (I think that is less elegant).

The rest is a natural outcome of the desire not to drop any data once the stream is active. Given this behavior, subscription order doesn't matter.

Except for the case of multicast where the second subscriber may miss the first elements already passed to the first subscriber. The only way I know to make that deterministic is either to introduce an explicit start() method on your Publisher, or a 'broadcast' processor which is created when all subscribers are already available, so they're all effectively subscribed at once.

@viktorklang
Copy link
Contributor

On Thu, May 22, 2014 at 6:09 PM, Ben Christensen
[email protected]:

Also, no one has contested my 6
Processor arguments yet, I'm looking forward to having them put to the
test, so to say.

What do you mean by this? What "6 Processor" arguments have not been
contested?

#22 (comment)


Reply to this email directly or view it on GitHubhttps://github.com//issues/22#issuecomment-43909281
.

Cheers,

@benjchristensen
Copy link
Contributor Author

Looking forward to seeing these arguments shot down.

Those 6 I am fine with.

@smaldini
Copy link
Contributor

I guess we are leaning towards +1 then. I don't think this will be a risky operation. Shall we make it 0.4 ? I'll PR in a minute.

Sent from my iPhone

On 22 May 2014, at 17:32, Ben Christensen [email protected] wrote:

Looking forward to seeing these arguments shot down.

Those 6 I am fine with.


Reply to this email directly or view it on GitHub.

@benjchristensen
Copy link
Contributor Author

I was wrong in some of my assertions earlier. I spent a while writing code and performance tests and came up with this: https://github.com/benjchristensen/StreamProcessor

Take a look at the README there for the full information, and the links to the code.

The summary is:

  • process (using Processor) is slightly faster (27.6m vs 26.7m ops/second) than lift (using Operator) when doing a single transformation.
  • lift is slightly faster (15.4m vs 13.3m ops/second) than process when doing multi-step transformations.

Based on these results, the Processor interface actually outperforms the Operator definition in common cases. The Operator signature does seem to have value in the cases where multiple transformations are chained together (common in Rx and one of the drivers to our adoption of this as of v0.17).

Sorry about being mistaken earlier, hopefully this data and experiment correctly presents the options.

@benjchristensen
Copy link
Contributor Author

@danarmak Take a look at these 3 approaches to implementing a process method. I'm sure there are others, but I came up with these 3:

    public <R> APublisher<R> process(Supplier<Processor<T, R>> supplier) {
        return new APublisher<R>((s) -> {
            Processor<T, R> p = supplier.get();
            p.subscribe(s);
            f.accept(p);
        });
    }

   /**
     * This will blow up because it happens in the wrong order
     * 
     * @param p
     * @return
     */
    public <R> Publisher<R> processSimple(Processor<T, R> p) {
        subscribe(p);
        return p;
    }

    /**
     * This works for a single subscription, but not when subscribed to multiple times because the `Processor` instance gets reused
     * 
     * @param p
     * @return
     */
    public <R> Publisher<R> process(Processor<T, R> p) {
        return new APublisher<R>((s) -> {
            p.subscribe(s);
            f.accept(p);
        });
    }

I use Test.java to play with each of these and show the problems with the latter 2 and how the 1st (process(Supplier<Processor<T, R>> supplier)) works in all the use cases.

@benjchristensen
Copy link
Contributor Author

I guess we are leaning towards +1 then. I don't think this will be a risky operation. Shall we make it 0.4 ? I'll PR in a minute.

I'm fine adding the type. We are not ready to define how a Publisher would expose a mechanism for using it at this time (if ever).

@danarmak
Copy link

@benjchristensen your FiniteSynchronousLongPublisher is unicast, and it starts publishing only once the subscriber calls request() (i.e. it doesn't drop items before there's a subscriber). If every Processor down the line also worked like this, then subscription order wouldn't matter - elements won't be lost even if you subscribe top to bottom, because no elements will be sent until the final Consumer is connected which sends the first ever request(n). This is the solution I adopted.

Your code is a bit different because your MapProcessor passes along the Subscription from upstream, it doesn't generate one itself. That makes it less general - for instance it can't represent a mapping that isn't one-to-one, because it doesn't intercept the calls to request(n). And for this lightweight implementation to work, it requires subscription in bottom-to-top order. I don't think that impacts the general case; I'm pretty sure it's possible to implement a lightweight Processor in a top-to-bottom (or an unordered) subscription paradigm.

@benjchristensen
Copy link
Contributor Author

Yes, if everything is synchronous it would work regardless of order. Once it goes async (the other publisher impl I provide in the example) things change. And it would change dramatically again if every Processor was async and decoupled the subscription (including object allocation).

I tried to start with the simplest and most efficient implementation and then start adding complexity, such as an async publisher. I have not yet created an async processor.

What would you change or do as an example to test a scenario that you're thinking is more general or applicable? I'd like to capture some of these cases and show the performance impact of different choices.

@smaldini
Copy link
Contributor

Agree

Sent from my iPhone

On 22 May 2014, at 20:22, Ben Christensen [email protected] wrote:

I guess we are leaning towards +1 then. I don't think this will be a risky operation. Shall we make it 0.4 ? I'll PR in a minute.

I'm fine adding the type. We are not ready to define how a Publisher would expose a mechanism for using it at this time (if ever).

Reply to this email directly or view it on GitHub.

@danarmak
Copy link

@benjchristensen the generalization that immediately comes to mind is mappings that are not 1-to-1. Even a synchronous 1-to-n (e.g. emit 3 items for each 1 input) requires intercepting and modifying the request(n) calls, and either buffering the output produced on each step, or making the transformation function return a continuation or be a mutable state machine.

The other common generalization is I think asynchronicity, where the mapping function returns a Future of some kind.

The most general equivalent of a Processor that I wrote looks like this (simplified, my code isn't online yet):

abstract class Pipe[T] extends Source[T] with Sink[T] {
  protected def emit(output: Option[T]): Future[Unit] = ... // Implementation omitted
  protected def process(input: Option[T]): Future[Unit]
}

The Option[T] types follow the rule that None signifies end-of-input, corresponding to onComplete.

An implementation of Pipe needs to implement the process method. It can call emit zero or more times, but not concurrently. emit takes care of sending the data to subscribers; it returns a Future because it may need to wait for the subscriber to request more data. emit can only be called by process or the continuations created by process.

The future returned by process should complete when it has finished processing the input and emitting all output. It is guaranteed that process will be called sequentially wrt. the futures it returns. An event loop (made up of mapped Futures) links input, output (demand) and process.

This is a Future-based state machine, and a pretty powerful tool for building processors which aren't one-to-one and aren't necessarily synchronous. But the downside is that you have to create and possibly schedule a Future for each input and each output item.

If the implementation of process is known to be synchronous, the loop can be made a bit more efficient. Maybe explicitly passing around continuations would be faster than mapping on futures that are created as completed promises. I haven't tried writing an optimized solution along those lines yet.

If you're interested, the corresponding state machines for Producers and Consumers look as follows (again, simplifying):

trait Source[T] {
  protected def produce(): Future[Option[T]]
}

trait Sink[T] {
  protected def process(input: Option[T]) : Future[Unit]
}

produce is called to generate the next output element. process is just like Pipe.process except there's no emit. Both have the guarantee of not being called concurrently.

@benjchristensen
Copy link
Contributor Author

I'm sorry, I don't fully understand how these examples fit what we're discussing here, as I don't see where Publisher and Subscriber fit into the types and code shown. Is this a generalization on top of Reactive Streams, or separate from it?

mappings that are not 1-to-1. Even a synchronous 1-to-n (e.g. emit 3 items for each 1 input) requires intercepting and modifying the request(n) calls, and either buffering the output produced on each step, or making the transformation function return a continuation or be a mutable state machine.

Yes, those use cases require a different implementation that the 1-to-1 scenario, but they can all be implemented on top of the defined Publisher/Subscriber types, and with or without the Processor/Operator types we're discussing in this issue.

@danarmak
Copy link

Source extends Publisher, Sink extends Subscriber, and Pipe extends Processor. They are intended as a complete implementation of Reactive Streams on top of Scala Futures, and they provide some additional guarantees beyond those of R.S., but those aren't important here.

I'm sorry I can't link to the actual code, I'm still waiting for the internal legal review, but I hope to publish a usable library in a week or so. I've already migrated some of our own company's code to this implementation. We needed a stream abstraction in Scala, couldn't wait for akka-streams, and I decided to make it implement Reactive Streams too.

@benjchristensen
Copy link
Contributor Author

Ah, that helps to know that they extend from Publisher and Subscriber.

What do you mean by "on top of Futures"? Are you implementing streams where each onNext is produced by a Future? Or just that they are the return types to represent when an async method has completed?

The method signatures suggest the latter. If so, has the object allocation of creating a Future per onNext not proven to be an issue? I found it to be far too expensive when I explored that route.

I look forward to seeing the code so I can better understand.

@danarmak
Copy link

Are you implementing streams where each onNext is produced by a Future?

I'm not sure what you mean by onNext being produced. The next element to be sent to onNext is produced by the produce function, which returns a Future.

If so, has the object allocation of creating a Future per onNext not proven to be an issue?

It's definitely an issue. I reuse Futures of type Unit and Boolean where I can, but I have to create a Future wrapper for each element that passes through produce/emit.

The immediate goal of my library is getting a full implementation of Reactive Streams (with many constructors, combinators, etc) up and working as quickly as possible, because we really need it in-house and can't wait for akka-streams or Rx to be stable. Using Futures is the easiest and most solid / well supported route to implementation. It also gives a lot of flexibility since I can choose the ExecutionContext for each Source and Sink separately at run time.

Our in-house use of streams almost always begins and ends with IO, and the stream elements are relatively large, so I have reason to hope even the current performance will be sufficient.

The obvious way to improve the performance of my implementation is to add support for synchronous constructors and combinators, which would replace many unnecessarily asynchronous components I write at present. A synchronous Source.map wouldn't create a new Source, in the sense of a component that can accept subscriptions independently from the original Source. It would just take a synchronous function, possibly with mutable state support, store it in the original Source implementation and call it synchronously after the original produce.

This does involve a user-visible change: I will have to introduce a public type representing these synchronous transformations to be passed around. This will make code more complex, because some code will need a way to accept either a synchronous transformation or an asynchronous Pipe. I don't know what the common interface of these two should be. I also don't want the library to grow into something as large and feature-full as Rx; I want a relatively simple implementation of Reactive Streams. The future will show whether in the long term this implementation is needed after both akka-streams and Rx Reactive Streams are ready for production.

@viktorklang
Copy link
Contributor

it looks to me like there is consensus that Processor makes sense and should stick around, can we move to close this Issue?

Speak now or forever hold your peace.

@smaldini
Copy link
Contributor

Go for it :)

Sent from my iPhone

On 26 May 2014, at 10:17, Viktor Klang (√) [email protected] wrote:

it looks to me like there is consensus that Processor makes sense and should stick around, can we move to close this Issue?

Speak now or forever hold your peace.


Reply to this email directly or view it on GitHub.

@viktorklang
Copy link
Contributor

@smaldini See the last linked commit in this Issue.

@smaldini
Copy link
Contributor

cheers @reactive-streams/contributors

viktorklang added a commit that referenced this issue May 30, 2014
Fixes (re-adds) the processor interface as per consensus in Issue #22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants