Skip to content

Reactive IPC Functional Scope #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
benjchristensen opened this issue Mar 6, 2015 · 6 comments
Open

Reactive IPC Functional Scope #3

benjchristensen opened this issue Mar 6, 2015 · 6 comments

Comments

@benjchristensen
Copy link
Contributor

Discussion to establish scope of client/server/protocol functionality for Reactive IPC.

@benjchristensen
Copy link
Contributor Author

tldr; A user should be able to use only Reactive IPC to run a server or client.

My perspective is that one should be able to use Reactive IPC directly to create and run a client or server. Reactive IPC should not just be an SPI that is useless by itself. In other words, without further abstractions on top I should be able to use Reactive IPC APIs, transports and Reactive Streams callbacks.

For example, I should be able to do the following:

  • instantiate a server with a handler on a given host and port
  • accept traffic and write responses
  • receive callbacks via Publisher/Subscriber

On the client side I would similarly expect to be able to:

  • execute a request
  • receive callbacks via Publisher/Subscriber

Layers on top are to expose different stream compositions approaches (such as RxJava), usability opinions, etc.

@benjchristensen
Copy link
Contributor Author

For example, I want to be able to do similar to what RxNetty offers (ignoring the API design, just the functionality):

ReactiveIPC.createHttpServer(8999, (req, resp) -> resp.writeStringAndFlush("Hello")).start();

Further opinions would all layer on top of these basic capabilities. For example, Netlix Ribbon and Karyon would hide all of this from users, as would Spring Boot and Spring Web, all of which would have different opinions of how to start servers or execute requests.

@smaldini
Copy link
Contributor

smaldini commented Mar 9, 2015

I think writeAndFlush should map on the Reactive Stream contract directly:

On the input side (registering a Subscriber to a request Publisher):
Close -> Subscription.cancel()
Request N IO Read -> Subscription.request(long n), Long.MAX -> unbounded read
Input Ready -> Subscriber.onSubscribe
Input Read -> Subscriber.onNext
Input Error -> Subscriber.onError
Input Closed (reset by peer) -> Subscriber.onComplete

On the output side:
Output Close -> might be cancelling upstream and so close connection
Output ready to write N Data -> Subscription.request(long n), Long.MAX -> unbounded
Output Start (possibly Headers) -> Subscriber.onSubscribe
Output Write -> Subscriber.onNext
Output Flush -> Subscriber.onComplete (use of windows for batch flush), cancel upstream
Output Error -> Subscriber.onError

@jbrisbin
Copy link
Contributor

To get the discussion going, the initial commit I pushed adds just two abstractions to represent TCP communication: Connection and ConnectionPublisher. Since each protocol will have abstractions that represent its basic constructs I went with the TCP naming of "connection" rather than Netty's more general "channel" abstraction. But honestly I'm fine with either one and I don't really have a preference.

I did not implement the connection-level read other than to simply use the pending value as an indication of whether to handle data or not. The initial commit is just a means to an end and the implementation code is really just to try and figure out what commonalities are going to exist that need to be in core.

I also added a Buffer abstraction in core. I put it in the javadoc but it should be noted that a Buffer in ripc terms does not mean just a variation on byte[] or ByteBuffer. The only (partial) implementation that exists right now is one based on Netty's ByteBuf, but the Buffer abstraction itself doesn't care what the underlying data actually is. It could be a Buffer<Frame> or a Buffer<Pojo>. This will provide simple read-and-convert functionality for common things like length-field-based encoding that uses Snappy or Zip compression, etc... that's going to be nearly identical for all use cases.

@benjchristensen
Copy link
Contributor Author

Thanks for submitting code ... will review and then discuss further.

@NiteshKant
Copy link
Contributor

I think writeAndFlush should map on the Reactive Stream contract directly:

Are you suggesting onComplete flushing the writes?

Output Flush -> Subscriber.onComplete (use of windows for batch flush), cancel upstream

If yes, then this isn't addressing all the usescases. This would mean that all writes are buffered in the channel outbound buffer, till the stream completes. For infinite streams, never, for slow streams, causes buffer bloat.

We would atleast need a semantic like:

write(Observable<W> msgs, Func1<W, Boolean> flushSelector);

which gets a feedback from a flushSelector about when to flush (invoked after every element emission). A simplistic version being flushOnEach where flushSelector always returns true.

@jbrisbin I have questions about the implementation and let me create separate issues to have a focussed discussion on those different points.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants