Skip to content

Proposal for TCP server at all layers (core, transport & rxjava1) #19

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 28, 2015
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 40 additions & 10 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,6 @@ allprojects {
"-Xlint:-rawtypes" // TODO enable and fix warnings
]

compileJava {
sourceCompatibility = 1.7
targetCompatibility = 1.7
}

compileTestJava {
sourceCompatibility = 1.8
targetCompatibility = 1.8
Expand Down Expand Up @@ -87,9 +82,29 @@ project('ripc-protocol-tcp') {
dependencies {
// ripc-core
compile project(":ripc-core")
compile "org.slf4j:slf4j-api:1.7.6"
testCompile 'io.reactivex:rxjava:1.0.8'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO We shouldn't mix layers. We should provide mocks and stick to RS APIs in layers other than composition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aah this was a left-over from me having the example (as a test) inside ripc-protocol-tcp demonstrating the usage. This is a test dep though. I will remove it, it isn't used.

testCompile 'io.reactivex:rxjava-reactive-streams:0.5.0'
}
}

project('ripc-transport-netty4-examples') {
description = 'Reactive IPC TCP Component examples'

compileTestJava {
sourceCompatibility = 1.8
targetCompatibility = 1.8
}

dependencies {
// ripc-core
compile project(":ripc-transport-netty4")
compile 'io.reactivex:rxjava:1.0.8'
compile 'io.reactivex:rxjava-reactive-streams:0.5.0'
compile 'org.slf4j:slf4j-simple:1.7.6'
}
}

project('ripc-transport-netty4') {
description = 'Reactive IPC Netty 4.x Transport Implementation'
dependencies {
Expand All @@ -98,17 +113,32 @@ project('ripc-transport-netty4') {

// Netty
compile "io.netty:netty-all:$nettyVersion"
compile "org.slf4j:slf4j-api:1.7.6"
}
}

project('ripc-composition-rxjava1') {
project('ripc-rxjava1') {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taking out the reference to "composition" doesn't tell me what this module's responsibility is now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason to remove "composition" was to not indicate that it is just for functional composition. In the other issue, we are discussing to rename the layer to "adapter" (or adaptor), so when we do that we can make the name change part of the project.

description = 'Reactive IPC Composition Layer Implementation'
dependencies {
// ripc-tcp
compile project(":ripc-transport-netty4")
compile project(":ripc-protocol-tcp")
compile 'io.reactivex:rxjava:1.0.8'
compile 'io.reactivex:rxjava-reactive-streams:0.5.0'
}
}

// RxJava 1.0
compile "io.reactivex:rxjava:$rxjava1Version"
project('ripc-rxjava1-examples') {
description = 'Reactive IPC Composition Layer examples'

compileTestJava {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should add compileJava compat for 1.7.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I did not change the build file much from the version currently in the master. Let me look at it again to see what else should be modified/added.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have made changes to the build file, have a look.

sourceCompatibility = 1.8
targetCompatibility = 1.8
}

dependencies {
// ripc-tcp
compile project(":ripc-rxjava1")
compile project(":ripc-transport-netty4-examples")
}
}

Expand All @@ -125,7 +155,7 @@ configure(rootProject) {
it.tasks.getByName("jar")
}
}
options.memberLevel = org.gradle.external.javadoc.JavadocMemberLevel.PROTECTED
options.memberLevel = JavadocMemberLevel.PROTECTED
options.author = true
options.header = rootProject.description
//options.overview = "src/api/overview.html"
Expand Down
11 changes: 0 additions & 11 deletions ripc-core/src/main/java/io/ripc/core/Consumer.java

This file was deleted.

9 changes: 0 additions & 9 deletions ripc-core/src/main/java/io/ripc/core/Function.java

This file was deleted.

26 changes: 0 additions & 26 deletions ripc-core/src/main/java/io/ripc/core/NamedDaemonThreadFactory.java

This file was deleted.

21 changes: 0 additions & 21 deletions ripc-core/src/main/java/io/ripc/core/Specification.java

This file was deleted.

9 changes: 0 additions & 9 deletions ripc-core/src/main/java/io/ripc/core/Supplier.java

This file was deleted.

9 changes: 0 additions & 9 deletions ripc-core/src/main/java/io/ripc/core/io/BufferSupplier.java

This file was deleted.

3 changes: 0 additions & 3 deletions ripc-core/src/main/java/io/ripc/core/package-info.java

This file was deleted.

48 changes: 48 additions & 0 deletions ripc-core/src/main/java/io/ripc/internal/Publishers.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package io.ripc.internal;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/**
* Temporary utility class for creating and transforming {@link Publisher}s.
*/
public class Publishers {

public static <T> Publisher<T> just(final T value) {
return new Publisher<T>() {
@Override
public void subscribe(final Subscriber<? super T> s) {
s.onSubscribe(new Subscription() {
@Override
public void request(long n) {
s.onNext(value);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be a positive demand check here per the spec. The Specifications class was intended to be the helper that aggregated these checks. This will be an ongoing issue (proper adherence to the spec) any time we implement helpers in the core.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, just to be clear, the implementation is not feature complete. It was a strawman to start discussion in light of all layers being implemented.

The Specifications class was intended to be the helper that aggregated these checks.

Not sure if having an independent Specifications class helps in this case. How does it makes sure that everyone abides by the spec?

s.onComplete();
}

@Override
public void cancel() {
}
});
}
};
}

public static <T> Publisher<T> error(final Throwable t) {
return new Publisher<T>() {
@Override
public void subscribe(final Subscriber<? super T> s) {
s.onSubscribe(new Subscription() {
@Override
public void request(long n) {
s.onError(t);
}

@Override
public void cancel() {
}
});
}
};
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.ripc.core.io;
package io.ripc.io;

import java.nio.ByteBuffer;
import java.nio.charset.CharsetDecoder;
Expand Down
3 changes: 0 additions & 3 deletions ripc-core/src/test/java/io/ripc/core/package-info.java

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.ripc.protocol.tcp;

import org.reactivestreams.Publisher;

/**
* An abstraction for a TCP connection.
*
* @param <R> The type of objects read from this connection.
* @param <W> The type of objects written to this connection.
*/
public interface TcpConnection<R, W> extends Publisher<R> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why having a separate tcp and ripc-core ? I would suggest just removing all Tcp* prefixes and merge core and tcp protocol. Looking at the artifacts there is nothing specific yet to Tcp. Same for Server and Client, does it have to be in separate module ? Even in a case of File streaming I could see the same abstraction working as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why having a separate tcp and ripc-core

Have a look at issue #18 where we are suggesting a different project structure.
ripc-core would have protocol neutral classes and different protocols will be different projects.

Looking at the artifacts there is nothing specific yet to Tcp

The implementations are TCP specific. Connection concept is protocol specific (connection based protocols). UDP would not have a connection as an example.

Same for Server and Client, does it have to be in separate module?

There is no client as of now. Yeah, they should be in a single module/project per protocol.

Even in a case of File streaming I could see the same abstraction working as well.

You mean file streaming over TCP, HTTP?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think about the channel from Netty then or from Java NIO, there is absolutely no need to replicate write for every sort of IO, it's not a big deal but its an API question. It would be like saying we have different type of Subscriber in Reactive Stream for Synchronous and Asynchronous ones. UDP is also handled through Channel. So maybe Channel is a better name overall if we want a common denominator.
At this module level we are API centric not Application centric anyway IMO, feel free to name it RxTcpConnection in rxjava tho if you bring specific value to this.

You mean file streaming over TCP, HTTP?
No, I am thinking about reading and writing a File. E.g. Chronicle or other File Memory Mapped IPC used in high throughput apps.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although I'm not saying specific protocol don't have their modules, but we should have a common notion living in ripc core, named Connection or Channel or Pipe or Flow or IO or whatever.
If we find anything specific to TCP in addition to write and subscribe, e.g. UDP or 0MQ specific helpers they will certainly be able to extend this common artifact. It will bring much more value to the core and a bunch of candidates protocol won't even bother going that far in their first iteration. RIPC is about connecting and moving data between processes so we must make sure the core interface represents that. Server and Client can also do the same, and you already have this notion in RxNetty (as in reactor-net), another qualifier as generic IMO is Recipient and Sender. This is inherent to any communication.

Besides from the place of the components and the naming, I'm equally satisfied with the current both PR which are converging on most of the concept and I find your implementation quite smart (using the Netty user fired event to subscribe was a nice trick 👍 ). I'm in fact already working at mapping the concept in reactor-net to make the transition later easy to the whole concept. But yes as an API this is also very important to set the right expectations. I find the interception a bit more flexible in the alternative PR we issued but beyond that the concepts is really cool.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the disconnect here. I think what you are mentioning is more of a commonality in an implementation i.e. every protocol using a socket/channel to read/write data. What this PR is defining is the client/server API for different protocols to be used by the users of reactive-ipc. From an API point of view, different protocols are very different and atleast while implementing UDP, TCP, HTTP, I have not seen commonalities worth defining a common contract across these protocols. This is the reason I am removing all commonalities across protocols in RxNetty w.r.t. client/server APIs. I would recommend refraining from creating these common abstractions unless we have multiple protocols implemented and we see value in such abstractions.


/**
* Writes the passed stream of {@code data} and returns the result as a {@link Publisher}. All items emitted by
* this stream are flushed on completion of the stream.
*
* @param data Data stream to write.
*
* @return Result of write.
*/
Publisher<Void> write(Publisher<W> data);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer writeWith/writeFrom to signal the intent clearly that we don't write a publisher but from/with a publisher (writeFrom). I think writeWith removes the confusion around single writer vs multiplexing too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer writeWith/writeFrom to signal the intent clearly that we don't write a publisher but from/with a publisher (writeFrom)

I think that would be pedantic. A Publisher is a source of data. I think for people dealing with asynchronous streams, it is pretty clear that in this case we are writing data from the passed source/stream and not the source itself.

I think writeWith removes the confusion around single writer vs multiplexing too.

How so?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are dealing with an API not an application. So yes pedantic is a qualifier it should have. Then writeWith is better than writeFrom to defuse any possibility to think it can be called once.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us defer this discussion for now and see if we have any other disagreement before we can merge this PR. I would like to move forward with code for now and we can always bike-shed on names later :)


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.ripc.protocol.tcp;

import org.reactivestreams.Publisher;

public interface TcpHandler<R, W> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a general rule, I'm not a fan of duplicating names in both packages and class names. It also doesn't "handle" Tcp, it handles a Connection. If needing a qualifier to Handler, it should probably be ConnectionHandler.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I added Tcp prefix was that Handler by itself is confusing when someone starts creating two servers with different transports in the same code. In that scenario they have to use the fully qualified name for one of the handlers. Even ConnectionHandler is ambiguous when we have a WebSocket/HTTP/2 connection handler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I added Tcp prefix was that Handler by itself is confusing when someone starts creating two servers with different transports in the same code.

Good point.

Maybe this should be applied across-the-board so that TcpConnection doesn't interfere with WebSocketConnection et al?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this should be applied across-the-board so that TcpConnection doesn't interfere with WebSocketConnection et al?

Indeed, it will be.


Publisher<Void> handle(TcpConnection<R, W> connection);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.ripc.protocol.tcp;

public interface TcpInterceptor<I, O, II, OO> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't have 4 generics placeholders. 2 is sufficient with 2 transformative ones on the method. Using this in practice will be horrific if the names are of any length.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that 4 generic types are verbose but I don't see how moving two to the method level will make it any different. The method definition then becomes horrific.

Finagle makes it simpler for interceptors that do not change the types, by having a SimpleFilter which has the same in & out types.


TcpHandler<II, OO> intercept(TcpHandler<I, O> handler);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was envisioning interceptors being analogous to Netty's pipeline feature. Pipelines can be dynamically altered and new ones are created when new connections are made. How would we use this intercept mechanism to wrap each new connection dynamically (especially those that come in as a result of an automatic reconnect after a client connection has died)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was envisioning interceptors being analogous to Netty's pipeline feature.

Yes and No. Yes because it can change the types of input and output on the channel (read and write in netty pipeline) and no because it does not intercept any other channel lifecycle event apart from read/write.
Conceptually, interceptors are application level constructs where as netty channels are network level constructs thus needing more feedback around channel lifecycle.

How would we use this intercept mechanism to wrap each new connection dynamically

Have a look at the example here which is wrapping over the next interceptor/handler in the chain and hence is in complete control of what connection to pass downstream.

especially those that come in as a result of an automatic reconnect after a client connection has died

I did not quiet understand, can you elaborate?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not quiet understand, can you elaborate?

Reconnection (and connection management in general) is an important use case we have that currently powers the Spring STOMP support. We actually support dynamic reconnection to a completely different address if one is down, so whatever we do with regards to lifecycle events would have to be aware that a connection is not a one-time event and that setup and teardown of connections can happen irregardless of the lifecycle of the client.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's also the question of "what exactly are we intercepting" and "to what end?". Are we intercepting only data messages coming in and going out? Those are two different things, actually: reading requires a subscribe and writing an entirely separate Publisher with a possibly completely different type. In the TCP case they'll often be the same (ex: ByteBuf) but they likely won't be in request/response servers.

If we're intercepting for the purposes of providing, say authorization, then we're doing more than just message transformation because we would be possibly closing the connection (canceling the Subscription).

I'm wondering if "intercepting" shouldn't be done at the site of the event (Subscription.request and onNext) and as a translation/codec/whatever rather than incurring the overhead of wrapping, delegates, etc...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I sketched out providing for interceptors on the connection by separating the reading and writing. This allows for different interceptors (and even multiple interceptors) on the read and write which I think will be necessary since reading and writing on a connection is often unbalanced (e.g. more emphasis is often placed on reading data and figuring out what to do than on what's written out to the client, since it can often be nothing more than an ACK).

https://gist.github.com/jbrisbin/686e565660b27b9b0f91

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reconnection (and connection management in general) is an important use case we have that currently powers the Spring STOMP support.

Actually, it is a very mainstream usecase in almost all applications. The way we do it in Netflix is by introducing a loadbalancer that does the determination of how to retry/load balance. Since, I have not even introduced the client APIs yet, I would like to keep this discussion for the client APIs. The API here is purely for the server.

I think there's also the question of "what exactly are we intercepting" and "to what end?"

In this particular case (TCP server interceptors) we are intercepting "connection processing" that will include but not limited to:

  • Modifying input/output.
  • Introspecting input/output from the point of view of insights.
  • Short-circuiting connection processing i.e. the connection does not even reach the actual handler.
  • Metrics gathering of connection processing times.
  • Logging

I'm wondering if "intercepting" shouldn't be done at the site of the event (Subscription.request and onNext) and as a translation/codec/whatever rather than incurring the overhead of wrapping, delegates, etc...

I think you are looking at it mostly from the point of view reading/writing data, in which case it may be ok to intercept read/write separately. However, that approach does not work well when we start talking about short-circuiting. The current approach of treating server as a function (issue #15) covers all the usecases. I would like to see if you have a usecase which can not be covered by that design.

This allows for different interceptors (and even multiple interceptors) on the read and write

The current design does not restrict the server to a single interceptor. I have described above the issue with designing interceptors as being applied on read and write.


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package io.ripc.protocol.tcp;

import java.util.concurrent.atomic.AtomicBoolean;

public abstract class TcpServer<R, W> {

protected final TcpHandler<R, W> thisHandler;
protected final AtomicBoolean started;

protected TcpServer() {
thisHandler = null;
this.started = new AtomicBoolean();
}

public final TcpServer<R, W> start(TcpHandler<R, W> handler) {
if (!started.compareAndSet(false, true)) {
throw new IllegalStateException("Server already started");
}

doStart(handler);
return this;
}

public final void startAndAwait(TcpHandler<R, W> handler) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be mixing blocking semantics into a fundamentally non-blocking server? It's likely that few "real" situations would use this anyway since there would be other means of keeping the JVM alive (a la Spring Boot, which would definitely not use an await() method. Can't we accomplish this via helpers that take a Publisher and block until Subscriber.onComplete?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Me & @benjchristensen talked about it and unless we model the server as a Publisher of ServerState which is Start and Stop it seems like an overkill since that would mean someone has to subscribe to the state observable to start the server. Since, there will only be two state Start and Stop, it sounds unnecessary to me to model it that way.
Are there any other usecases apart from await that you would see us gaining from if we move to returning a Publisher from start()?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't thinking one would have to subscribe to start a server. I was just thinking about a callback for shutdown rather than using blocking semantics (at least as part of the abstractions of the kernel...baked in, if you will).

But that does bring up the question of codifying lifecycle events in the kernel at all. IMO these things would be better off in the transport layer. I wonder if it's possible to reduce all the things about starting and stopping a server and connecting and closing a client into a common abstraction that simply exposes start and stop. If that's all that's modeled, I'm not sure it's even necessary since the kernel components could simply accept the handlers and Publishers directly. Each transport would still have to provide special methods for configuration anyway so it seems somewhat unnecessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking we need to be more judicious in our use of RS components and, just like the handler, look more to simple callbacks for all situations where that's possible and only use RS components for those key critical path elements that need backpressure support. Everything else could simply leverage simply callbacks and functional transformations (e.g. Consumer and Function).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Based on this, can you suggest what changes would you make to the current TcpServer?

start(handler);
awaitShutdown();
}

public final boolean shutdown() {
return !started.compareAndSet(true, false) || doShutdown();
}

public abstract void awaitShutdown();

public abstract boolean doShutdown();

protected abstract TcpServer<R, W> doStart(TcpHandler<R, W> handler);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.rpc.rx.protocol.tcp;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.ripc.protocol.tcp.TcpServer;
import io.ripc.rx.protocol.tcp.RxTcpServer;
import io.ripc.transport.netty4.tcp.Netty4TcpServer;
import rx.Observable;

import static java.nio.charset.Charset.*;

public class RxTcpServerSample {

public static void main(String[] args) throws InterruptedException {

TcpServer<ByteBuf, ByteBuf> transport = Netty4TcpServer.<ByteBuf, ByteBuf>create(0);

RxTcpServer.create(transport)
.startAndAwait(connection -> connection.flatMap(bb -> {
String msgStr = "Hello " + bb.toString(defaultCharset());
ByteBuf msg = Unpooled.buffer().writeBytes(msgStr.getBytes());
return connection.write(Observable.just(msg).doOnCompleted(() -> System.out.println("Done!")));
}));
}
}
Loading