-
Notifications
You must be signed in to change notification settings - Fork 13
Proposal for TCP server at all layers (core, transport & rxjava1) #19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
89bc5f1
46892b0
70956d4
44b860b
19c8639
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -48,11 +48,6 @@ allprojects { | |
"-Xlint:-rawtypes" // TODO enable and fix warnings | ||
] | ||
|
||
compileJava { | ||
sourceCompatibility = 1.7 | ||
targetCompatibility = 1.7 | ||
} | ||
|
||
compileTestJava { | ||
sourceCompatibility = 1.8 | ||
targetCompatibility = 1.8 | ||
|
@@ -87,9 +82,29 @@ project('ripc-protocol-tcp') { | |
dependencies { | ||
// ripc-core | ||
compile project(":ripc-core") | ||
compile "org.slf4j:slf4j-api:1.7.6" | ||
testCompile 'io.reactivex:rxjava:1.0.8' | ||
testCompile 'io.reactivex:rxjava-reactive-streams:0.5.0' | ||
} | ||
} | ||
|
||
project('ripc-transport-netty4-examples') { | ||
description = 'Reactive IPC TCP Component examples' | ||
|
||
compileTestJava { | ||
sourceCompatibility = 1.8 | ||
targetCompatibility = 1.8 | ||
} | ||
|
||
dependencies { | ||
// ripc-core | ||
compile project(":ripc-transport-netty4") | ||
compile 'io.reactivex:rxjava:1.0.8' | ||
compile 'io.reactivex:rxjava-reactive-streams:0.5.0' | ||
compile 'org.slf4j:slf4j-simple:1.7.6' | ||
} | ||
} | ||
|
||
project('ripc-transport-netty4') { | ||
description = 'Reactive IPC Netty 4.x Transport Implementation' | ||
dependencies { | ||
|
@@ -98,17 +113,32 @@ project('ripc-transport-netty4') { | |
|
||
// Netty | ||
compile "io.netty:netty-all:$nettyVersion" | ||
compile "org.slf4j:slf4j-api:1.7.6" | ||
} | ||
} | ||
|
||
project('ripc-composition-rxjava1') { | ||
project('ripc-rxjava1') { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Taking out the reference to "composition" doesn't tell me what this module's responsibility is now. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The reason to remove "composition" was to not indicate that it is just for functional composition. In the other issue, we are discussing to rename the layer to "adapter" (or adaptor), so when we do that we can make the name change part of the project. |
||
description = 'Reactive IPC Composition Layer Implementation' | ||
dependencies { | ||
// ripc-tcp | ||
compile project(":ripc-transport-netty4") | ||
compile project(":ripc-protocol-tcp") | ||
compile 'io.reactivex:rxjava:1.0.8' | ||
compile 'io.reactivex:rxjava-reactive-streams:0.5.0' | ||
} | ||
} | ||
|
||
// RxJava 1.0 | ||
compile "io.reactivex:rxjava:$rxjava1Version" | ||
project('ripc-rxjava1-examples') { | ||
description = 'Reactive IPC Composition Layer examples' | ||
|
||
compileTestJava { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should add compileJava compat for 1.7. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure. I did not change the build file much from the version currently in the master. Let me look at it again to see what else should be modified/added. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have made changes to the build file, have a look. |
||
sourceCompatibility = 1.8 | ||
targetCompatibility = 1.8 | ||
} | ||
|
||
dependencies { | ||
// ripc-tcp | ||
compile project(":ripc-rxjava1") | ||
compile project(":ripc-transport-netty4-examples") | ||
} | ||
} | ||
|
||
|
@@ -125,7 +155,7 @@ configure(rootProject) { | |
it.tasks.getByName("jar") | ||
} | ||
} | ||
options.memberLevel = org.gradle.external.javadoc.JavadocMemberLevel.PROTECTED | ||
options.memberLevel = JavadocMemberLevel.PROTECTED | ||
options.author = true | ||
options.header = rootProject.description | ||
//options.overview = "src/api/overview.html" | ||
|
This file was deleted.
This file was deleted.
This file was deleted.
This file was deleted.
This file was deleted.
This file was deleted.
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
package io.ripc.internal; | ||
|
||
import org.reactivestreams.Publisher; | ||
import org.reactivestreams.Subscriber; | ||
import org.reactivestreams.Subscription; | ||
|
||
/** | ||
* Temporary utility class for creating and transforming {@link Publisher}s. | ||
*/ | ||
public class Publishers { | ||
|
||
public static <T> Publisher<T> just(final T value) { | ||
return new Publisher<T>() { | ||
@Override | ||
public void subscribe(final Subscriber<? super T> s) { | ||
s.onSubscribe(new Subscription() { | ||
@Override | ||
public void request(long n) { | ||
s.onNext(value); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There should be a positive demand check here per the spec. The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, just to be clear, the implementation is not feature complete. It was a strawman to start discussion in light of all layers being implemented.
Not sure if having an independent Specifications class helps in this case. How does it makes sure that everyone abides by the spec? |
||
s.onComplete(); | ||
} | ||
|
||
@Override | ||
public void cancel() { | ||
} | ||
}); | ||
} | ||
}; | ||
} | ||
|
||
public static <T> Publisher<T> error(final Throwable t) { | ||
return new Publisher<T>() { | ||
@Override | ||
public void subscribe(final Subscriber<? super T> s) { | ||
s.onSubscribe(new Subscription() { | ||
@Override | ||
public void request(long n) { | ||
s.onError(t); | ||
} | ||
|
||
@Override | ||
public void cancel() { | ||
} | ||
}); | ||
} | ||
}; | ||
} | ||
} |
This file was deleted.
This file was deleted.
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
package io.ripc.protocol.tcp; | ||
|
||
import org.reactivestreams.Publisher; | ||
|
||
/** | ||
* An abstraction for a TCP connection. | ||
* | ||
* @param <R> The type of objects read from this connection. | ||
* @param <W> The type of objects written to this connection. | ||
*/ | ||
public interface TcpConnection<R, W> extends Publisher<R> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why having a separate tcp and ripc-core ? I would suggest just removing all Tcp* prefixes and merge core and tcp protocol. Looking at the artifacts there is nothing specific yet to Tcp. Same for Server and Client, does it have to be in separate module ? Even in a case of File streaming I could see the same abstraction working as well. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Have a look at issue #18 where we are suggesting a different project structure.
The implementations are TCP specific. Connection concept is protocol specific (connection based protocols). UDP would not have a connection as an example.
There is no client as of now. Yeah, they should be in a single module/project per protocol.
You mean file streaming over TCP, HTTP? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Think about the channel from Netty then or from Java NIO, there is absolutely no need to replicate write for every sort of IO, it's not a big deal but its an API question. It would be like saying we have different type of Subscriber in Reactive Stream for Synchronous and Asynchronous ones. UDP is also handled through Channel. So maybe Channel is a better name overall if we want a common denominator.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Although I'm not saying specific protocol don't have their modules, but we should have a common notion living in ripc core, named Connection or Channel or Pipe or Flow or IO or whatever. Besides from the place of the components and the naming, I'm equally satisfied with the current both PR which are converging on most of the concept and I find your implementation quite smart (using the Netty user fired event to subscribe was a nice trick 👍 ). I'm in fact already working at mapping the concept in reactor-net to make the transition later easy to the whole concept. But yes as an API this is also very important to set the right expectations. I find the interception a bit more flexible in the alternative PR we issued but beyond that the concepts is really cool. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see the disconnect here. I think what you are mentioning is more of a commonality in an implementation i.e. every protocol using a socket/channel to read/write data. What this PR is defining is the client/server API for different protocols to be used by the users of reactive-ipc. From an API point of view, different protocols are very different and atleast while implementing UDP, TCP, HTTP, I have not seen commonalities worth defining a common contract across these protocols. This is the reason I am removing all commonalities across protocols in RxNetty w.r.t. client/server APIs. I would recommend refraining from creating these common abstractions unless we have multiple protocols implemented and we see value in such abstractions. |
||
|
||
/** | ||
* Writes the passed stream of {@code data} and returns the result as a {@link Publisher}. All items emitted by | ||
* this stream are flushed on completion of the stream. | ||
* | ||
* @param data Data stream to write. | ||
* | ||
* @return Result of write. | ||
*/ | ||
Publisher<Void> write(Publisher<W> data); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd prefer writeWith/writeFrom to signal the intent clearly that we don't write a publisher but from/with a publisher (writeFrom). I think writeWith removes the confusion around single writer vs multiplexing too. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I think that would be pedantic. A
How so? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are dealing with an API not an application. So yes pedantic is a qualifier it should have. Then writeWith is better than writeFrom to defuse any possibility to think it can be called once. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let us defer this discussion for now and see if we have any other disagreement before we can merge this PR. I would like to move forward with code for now and we can always bike-shed on names later :) |
||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
package io.ripc.protocol.tcp; | ||
|
||
import org.reactivestreams.Publisher; | ||
|
||
public interface TcpHandler<R, W> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As a general rule, I'm not a fan of duplicating names in both packages and class names. It also doesn't "handle" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The reason I added Tcp prefix was that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Good point. Maybe this should be applied across-the-board so that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Indeed, it will be. |
||
|
||
Publisher<Void> handle(TcpConnection<R, W> connection); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
package io.ripc.protocol.tcp; | ||
|
||
public interface TcpInterceptor<I, O, II, OO> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This shouldn't have 4 generics placeholders. 2 is sufficient with 2 transformative ones on the method. Using this in practice will be horrific if the names are of any length. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree that 4 generic types are verbose but I don't see how moving two to the method level will make it any different. The method definition then becomes horrific. Finagle makes it simpler for interceptors that do not change the types, by having a |
||
|
||
TcpHandler<II, OO> intercept(TcpHandler<I, O> handler); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was envisioning interceptors being analogous to Netty's pipeline feature. Pipelines can be dynamically altered and new ones are created when new connections are made. How would we use this intercept mechanism to wrap each new connection dynamically (especially those that come in as a result of an automatic reconnect after a client connection has died)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes and No. Yes because it can change the types of input and output on the channel (read and write in netty pipeline) and no because it does not intercept any other channel lifecycle event apart from read/write.
Have a look at the example here which is wrapping over the next interceptor/handler in the chain and hence is in complete control of what connection to pass downstream.
I did not quiet understand, can you elaborate? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Reconnection (and connection management in general) is an important use case we have that currently powers the Spring STOMP support. We actually support dynamic reconnection to a completely different address if one is down, so whatever we do with regards to lifecycle events would have to be aware that a connection is not a one-time event and that setup and teardown of connections can happen irregardless of the lifecycle of the client. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think there's also the question of "what exactly are we intercepting" and "to what end?". Are we intercepting only data messages coming in and going out? Those are two different things, actually: reading requires a If we're intercepting for the purposes of providing, say authorization, then we're doing more than just message transformation because we would be possibly closing the connection (canceling the I'm wondering if "intercepting" shouldn't be done at the site of the event ( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I sketched out providing for interceptors on the connection by separating the reading and writing. This allows for different interceptors (and even multiple interceptors) on the read and write which I think will be necessary since reading and writing on a connection is often unbalanced (e.g. more emphasis is often placed on reading data and figuring out what to do than on what's written out to the client, since it can often be nothing more than an ACK). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Actually, it is a very mainstream usecase in almost all applications. The way we do it in Netflix is by introducing a loadbalancer that does the determination of how to retry/load balance. Since, I have not even introduced the client APIs yet, I would like to keep this discussion for the client APIs. The API here is purely for the server.
In this particular case (TCP server interceptors) we are intercepting "connection processing" that will include but not limited to:
I think you are looking at it mostly from the point of view reading/writing data, in which case it may be ok to intercept read/write separately. However, that approach does not work well when we start talking about short-circuiting. The current approach of treating server as a function (issue #15) covers all the usecases. I would like to see if you have a usecase which can not be covered by that design.
The current design does not restrict the server to a single interceptor. I have described above the issue with designing interceptors as being applied on read and write. |
||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
package io.ripc.protocol.tcp; | ||
|
||
import java.util.concurrent.atomic.AtomicBoolean; | ||
|
||
public abstract class TcpServer<R, W> { | ||
|
||
protected final TcpHandler<R, W> thisHandler; | ||
protected final AtomicBoolean started; | ||
|
||
protected TcpServer() { | ||
thisHandler = null; | ||
this.started = new AtomicBoolean(); | ||
} | ||
|
||
public final TcpServer<R, W> start(TcpHandler<R, W> handler) { | ||
if (!started.compareAndSet(false, true)) { | ||
throw new IllegalStateException("Server already started"); | ||
} | ||
|
||
doStart(handler); | ||
return this; | ||
} | ||
|
||
public final void startAndAwait(TcpHandler<R, W> handler) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we be mixing blocking semantics into a fundamentally non-blocking server? It's likely that few "real" situations would use this anyway since there would be other means of keeping the JVM alive (a la Spring Boot, which would definitely not use an await() method. Can't we accomplish this via helpers that take a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Me & @benjchristensen talked about it and unless we model the server as a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wasn't thinking one would have to subscribe to start a server. I was just thinking about a callback for shutdown rather than using blocking semantics (at least as part of the abstractions of the kernel...baked in, if you will). But that does bring up the question of codifying lifecycle events in the kernel at all. IMO these things would be better off in the transport layer. I wonder if it's possible to reduce all the things about starting and stopping a server and connecting and closing a client into a common abstraction that simply exposes start and stop. If that's all that's modeled, I'm not sure it's even necessary since the kernel components could simply accept the handlers and Publishers directly. Each transport would still have to provide special methods for configuration anyway so it seems somewhat unnecessary. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm thinking we need to be more judicious in our use of RS components and, just like the handler, look more to simple callbacks for all situations where that's possible and only use RS components for those key critical path elements that need backpressure support. Everything else could simply leverage simply callbacks and functional transformations (e.g. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok. Based on this, can you suggest what changes would you make to the current |
||
start(handler); | ||
awaitShutdown(); | ||
} | ||
|
||
public final boolean shutdown() { | ||
return !started.compareAndSet(true, false) || doShutdown(); | ||
} | ||
|
||
public abstract void awaitShutdown(); | ||
|
||
public abstract boolean doShutdown(); | ||
|
||
protected abstract TcpServer<R, W> doStart(TcpHandler<R, W> handler); | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
package io.rpc.rx.protocol.tcp; | ||
|
||
import io.netty.buffer.ByteBuf; | ||
import io.netty.buffer.Unpooled; | ||
import io.ripc.protocol.tcp.TcpServer; | ||
import io.ripc.rx.protocol.tcp.RxTcpServer; | ||
import io.ripc.transport.netty4.tcp.Netty4TcpServer; | ||
import rx.Observable; | ||
|
||
import static java.nio.charset.Charset.*; | ||
|
||
public class RxTcpServerSample { | ||
|
||
public static void main(String[] args) throws InterruptedException { | ||
|
||
TcpServer<ByteBuf, ByteBuf> transport = Netty4TcpServer.<ByteBuf, ByteBuf>create(0); | ||
|
||
RxTcpServer.create(transport) | ||
.startAndAwait(connection -> connection.flatMap(bb -> { | ||
String msgStr = "Hello " + bb.toString(defaultCharset()); | ||
ByteBuf msg = Unpooled.buffer().writeBytes(msgStr.getBytes()); | ||
return connection.write(Observable.just(msg).doOnCompleted(() -> System.out.println("Done!"))); | ||
})); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO We shouldn't mix layers. We should provide mocks and stick to RS APIs in layers other than composition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aah this was a left-over from me having the example (as a test) inside ripc-protocol-tcp demonstrating the usage. This is a test dep though. I will remove it, it isn't used.