-
Notifications
You must be signed in to change notification settings - Fork 534
Remove Processor Interface #22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
It is crucial to be able to represent a detached "stage" (having an inlet and an outlet), and since Java has a, erm, "rustic" type system, you cannot represent |
Please provide some pseudo code as I don't see how the interop goals have anything to do with detached stages. |
I will feed the thread with some reactor code which implements Processor. Sent from my iPhone
|
Pseudo:
|
Sorry I'm so slow, but I'm still not seeing what that example has to do with interop between libraries. Can you give me an example of how Akka -> Rx -> Reactor would need |
I'm thinking: Rx -> AnyGenericReactiveStreamsImpl -> Rx as in:
As mentioned before, the only reason it is needed is because Scala is the only(?) jvm-lang with a typesystem that supports saying Subscriber with Publisher without requiring that to be a concrete type, i.e. interface Processor extends Subscriber with Publisher. The alternative encoding would be to do something runtime-fail-y like:
Thoughts? |
Would it be correct to say that the only reason this would be needed is if we are wanting to define interoperable transformations? If so, that's beyond the original scope of allowing interop of streams but moves into interoperable stream processing. |
@benjchristensen No it's just a consequence of a Publisher also being able to be a Subscriber. |
I think I understand what Ben means: passing around a Processor means that I give someone a piece of hose to fit it into a pipeline, which emphasizes the transformation being passed around. We have such a case in our TCP code, when you connect outwards you get back a Processor since you can view the server end as a transformation you send ByteStrings through. It would not be a big difference to say that you get a pair of Publisher and Subscriber instead, I think. The main question here is whether or not we will see many incompatible types being created in implementations, and I think the answer is yes: if you provide a DSL for creating some asynchronous processing steps, then the return type of the “create me the machine” method must return both interfaces at once (with Scala being the only possible exception). By not providing this type in the SPI we will foster the creation of many copies that only inter-operate single-sidedly (i.e. you can pass an Akka Processor as either a Publisher or Subscriber, but there is no shared Processor). I would argue for keeping this unification since most implementations will otherwise create one. |
Reopening as this discussion has been revived: #56 (comment) |
|
I envision it like this, where only the For example in RxJava I could do this: Observable.from(reactiveStream).lift(processor_operator_function).subscribe(function) The The original In Rx a public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> versus a public abstract class Subject<T, R> extends Observable<R> implements Observer<T>
... or in reactive-stream terminology ...
public interface Subject<T, R> extends Publisher<R>, Subscriber<T> This public interface Processor<R, T> {
public Subscriber<? super T> call(final Subscriber<? super R> o);
} But then, for this to be standardized across all libraries (the only point of including At that point I feel like we are dictating far too much about how implementations will allow processing of data, or whether they'll even allow user defined operators, or only the ones provided by their library. Thus the |
Cheers Ben for the re-open and the comment. From an operator perspective it makes sense, however I think this is not strictly speaking an operator or maybe it is a Reactive Operator. I called lift because functionally speaking this seems to achieve the same goal somehow. Processor<Integer,Integer> identityProcessor =
Action<Integer,Integer>.map(integer -> integer)
.distinctUntilChanged()
.scan(Tuple2<Integer, Integer>::getT1)
.filter(integer -> integer >= 0)
.combine();
//combine is creating an Action (implements Processor) where its subscribe delegates to its downstream Publisher
// and where its onNext/onError/onSubscribe/onComplete delegates to its upstream Subscriber
Streams
.defer(Arrays.asList(1,2,3))
.connect(identityProcessor) //could be AkkaFlow, Subject (?), Action
.subscribe(reactOnIdentityProcessorSubscriber); In my mind the purpose is to have the flexibility to package head and tail of a Stream under a single object and in a coherent way across implementations. I stand corrected tho, having played with it the past couple of month it looked like it fitted well for a relative free cost. Since it is maybe more niche than Publisher/Subscriber, could it be optional for a given implementation ? I am pretty sure this could be appreciated by a few tho, especially in distributed environment where you could hide the request-reply / reply-to pattern under this simple interface. |
@benjchristensen, you said:
In Scala we can declare a type If two different Java implementations of Reactive Streams have their own types which both look like |
There is a way to mimic public <T, Processor extends Subscriber<T> & Pulisher<T>> void quux(Processor p) {} |
I didn't know that. Thanks, and sorry for raising irrelevant objections. |
👍 on adding Processor. @igstan's trick is very neat but it doesn't work out in practice: Exhibit A:
Arguments for Processor (needs counter-arguments):
Looking forward to seeing these arguments shot down. Cheers, |
Well I'm inline with Viktor on this. +1 Sent from my iPhone
|
Ben, as much as I understand where you are coming from, you are neglecting an important detail: the JVM’s intrinsic type system (i.e. not even Java’s but the byte-code one) is fully nominal, which means that implementing both Subscriber and Publisher does not magically make this a Processor-like thing that can be shared. Every library that introduces such a type by necessity makes a new one that is not compatible with any of the others. The (very useful) operation that you then cannot write generically is I am convinced that we will see multiple libraries creating and using variations of this type, and even if RxJava will never use it this will still be a desirable addition. Towards your description of Rx Operators: this is not as generically useful, it is more a detail of how you choose to design your DSL (which means that I fully agree with your assessment that Rx’s Operator is not a good fit for Reactive Streams in general). A TCP client connection would be perfectly represented as a Processor[ByteString, ByteString], because that makes it obvious that we are dealing with two asynchronous boundaries (which is indisputable in this case) while using the Operator terminology does not fit this bill. |
Ah, so if I understand correctly, this is about supporting a method that takes a My point was that it is fully possible in Java to just have a type that But if we want to support having a method like this: void doStuff(Type that extends Publisher, Subscriber) then yes, Java can't do that and we'd need a type that unites them. @rkuhn I'm convinced that the type is useful if trying to do the above. |
Are you thinking it would be optional for libraries to implement this? If it is required, must the |
No, the The |
@benjchristensen, isn't
If so, the method doesn't need to be included in the spec. |
Regarding concurrency ...
What do you mean about demonstrating this? Just have two threads call |
Sure ... it'll take some time to write the code matching this spec, but we spent weeks figuring this out in the refactoring from RxJava 0.16 -> 0.17 when we migrated away from exactly what this |
What do you mean by this? What "6 Processor" arguments have not been contested? |
On Thu, May 22, 2014 at 6:05 PM, Ben Christensen
Then I totally agree! :)
Cheers, |
On Thu, May 22, 2014 at 6:07 PM, Ben Christensen
def safeSubscriber[T](us: Subscriber[T], logger: Logger): Subscriber[T] = override def onNext(element: T) = try us.onNext(element) catch { override def onError(throwable: Throwable) = try us.onError(throwable) … and so on and so forth
Cheers, |
@benjchristensen I think I caused a misunderstanding:
I may have misused the term 'cold publisher'. I meant the following concept: A publisher that never drops any elements. It waits for the subscribers to request more before publishing each element. If there are multiple subscribers, it waits for all of them (so the slowest one determines the overall speed). If there are no subscribers, it waits for someone to subscribe before publishing at all. This is basically a description of the behavior of a part of my implementation. I think it emerges naturally from the weaker concept of a 'cold publisher'. Suppose you have a function that creates a The rest is a natural outcome of the desire not to drop any data once the stream is active. Given this behavior, subscription order doesn't matter. Except for the case of multicast where the second subscriber may miss the first elements already passed to the first subscriber. The only way I know to make that deterministic is either to introduce an explicit start() method on your Publisher, or a 'broadcast' processor which is created when all subscribers are already available, so they're all effectively subscribed at once. |
On Thu, May 22, 2014 at 6:09 PM, Ben Christensen
Cheers, |
Those 6 I am fine with. |
I guess we are leaning towards +1 then. I don't think this will be a risky operation. Shall we make it 0.4 ? I'll PR in a minute. Sent from my iPhone
|
I was wrong in some of my assertions earlier. I spent a while writing code and performance tests and came up with this: https://github.com/benjchristensen/StreamProcessor Take a look at the README there for the full information, and the links to the code. The summary is:
Based on these results, the Sorry about being mistaken earlier, hopefully this data and experiment correctly presents the options. |
@danarmak Take a look at these 3 approaches to implementing a public <R> APublisher<R> process(Supplier<Processor<T, R>> supplier) {
return new APublisher<R>((s) -> {
Processor<T, R> p = supplier.get();
p.subscribe(s);
f.accept(p);
});
}
/**
* This will blow up because it happens in the wrong order
*
* @param p
* @return
*/
public <R> Publisher<R> processSimple(Processor<T, R> p) {
subscribe(p);
return p;
}
/**
* This works for a single subscription, but not when subscribed to multiple times because the `Processor` instance gets reused
*
* @param p
* @return
*/
public <R> Publisher<R> process(Processor<T, R> p) {
return new APublisher<R>((s) -> {
p.subscribe(s);
f.accept(p);
});
} I use Test.java to play with each of these and show the problems with the latter 2 and how the 1st ( |
I'm fine adding the type. We are not ready to define how a Publisher would expose a mechanism for using it at this time (if ever). |
@benjchristensen your Your code is a bit different because your |
Yes, if everything is synchronous it would work regardless of order. Once it goes async (the other publisher impl I provide in the example) things change. And it would change dramatically again if every Processor was async and decoupled the subscription (including object allocation). I tried to start with the simplest and most efficient implementation and then start adding complexity, such as an async publisher. I have not yet created an async processor. What would you change or do as an example to test a scenario that you're thinking is more general or applicable? I'd like to capture some of these cases and show the performance impact of different choices. |
Agree Sent from my iPhone
|
@benjchristensen the generalization that immediately comes to mind is mappings that are not 1-to-1. Even a synchronous 1-to-n (e.g. emit 3 items for each 1 input) requires intercepting and modifying the The other common generalization is I think asynchronicity, where the mapping function returns a The most general equivalent of a
The An implementation of The future returned by This is a If the implementation of If you're interested, the corresponding state machines for Producers and Consumers look as follows (again, simplifying):
|
I'm sorry, I don't fully understand how these examples fit what we're discussing here, as I don't see where
Yes, those use cases require a different implementation that the 1-to-1 scenario, but they can all be implemented on top of the defined |
I'm sorry I can't link to the actual code, I'm still waiting for the internal legal review, but I hope to publish a usable library in a week or so. I've already migrated some of our own company's code to this implementation. We needed a stream abstraction in Scala, couldn't wait for akka-streams, and I decided to make it implement Reactive Streams too. |
Ah, that helps to know that they extend from What do you mean by "on top of Futures"? Are you implementing streams where each The method signatures suggest the latter. If so, has the object allocation of creating a I look forward to seeing the code so I can better understand. |
I'm not sure what you mean by onNext being produced. The next element to be sent to onNext is produced by the
It's definitely an issue. I reuse Futures of type Unit and Boolean where I can, but I have to create a Future wrapper for each element that passes through produce/emit. The immediate goal of my library is getting a full implementation of Reactive Streams (with many constructors, combinators, etc) up and working as quickly as possible, because we really need it in-house and can't wait for akka-streams or Rx to be stable. Using Futures is the easiest and most solid / well supported route to implementation. It also gives a lot of flexibility since I can choose the Our in-house use of streams almost always begins and ends with IO, and the stream elements are relatively large, so I have reason to hope even the current performance will be sufficient. The obvious way to improve the performance of my implementation is to add support for synchronous constructors and combinators, which would replace many unnecessarily asynchronous components I write at present. A synchronous This does involve a user-visible change: I will have to introduce a public type representing these synchronous transformations to be passed around. This will make code more complex, because some code will need a way to accept either a synchronous transformation or an asynchronous |
it looks to me like there is consensus that Processor makes sense and should stick around, can we move to close this Issue? Speak now or forever hold your peace. |
Go for it :) Sent from my iPhone
|
@smaldini See the last linked commit in this Issue. |
cheers @reactive-streams/contributors |
Fixes (re-adds) the processor interface as per consensus in Issue #22
What is the purpose of
Processor
being in the API?Based on my reading of the code and understanding of the intents of this API, I suggest removing the
Processor
class: https://github.com/reactive-streams/reactive-streams/blob/master/spi/src/main/java/org/reactivestreams/api/Processor.javaIt does not seem relevant to the purpose of this project. Libraries such as Akka, RxJava, Reactor, Finagle etc can and will layer logic on top of the streams of data and integrate via the
Subscriber
class to receive and transform data.The text was updated successfully, but these errors were encountered: