-
Notifications
You must be signed in to change notification settings - Fork 13
Proposal for TCP server at all layers (core, transport & rxjava1) #19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
89bc5f1
46892b0
70956d4
44b860b
19c8639
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -48,11 +48,6 @@ allprojects { | |
"-Xlint:-rawtypes" // TODO enable and fix warnings | ||
] | ||
|
||
compileJava { | ||
sourceCompatibility = 1.7 | ||
targetCompatibility = 1.7 | ||
} | ||
|
||
compileTestJava { | ||
sourceCompatibility = 1.8 | ||
targetCompatibility = 1.8 | ||
|
@@ -87,9 +82,29 @@ project('ripc-protocol-tcp') { | |
dependencies { | ||
// ripc-core | ||
compile project(":ripc-core") | ||
compile "org.slf4j:slf4j-api:1.7.6" | ||
testCompile 'io.reactivex:rxjava:1.0.8' | ||
testCompile 'io.reactivex:rxjava-reactive-streams:0.3.0' | ||
} | ||
} | ||
|
||
project('ripc-transport-netty4-examples') { | ||
description = 'Reactive IPC TCP Component examples' | ||
|
||
compileTestJava { | ||
sourceCompatibility = 1.8 | ||
targetCompatibility = 1.8 | ||
} | ||
|
||
dependencies { | ||
// ripc-core | ||
compile project(":ripc-transport-netty4") | ||
compile 'io.reactivex:rxjava:1.0.8' | ||
compile 'io.reactivex:rxjava-reactive-streams:0.3.0' | ||
compile 'org.slf4j:slf4j-simple:1.7.6' | ||
} | ||
} | ||
|
||
project('ripc-transport-netty4') { | ||
description = 'Reactive IPC Netty 4.x Transport Implementation' | ||
dependencies { | ||
|
@@ -98,17 +113,32 @@ project('ripc-transport-netty4') { | |
|
||
// Netty | ||
compile "io.netty:netty-all:$nettyVersion" | ||
compile "org.slf4j:slf4j-api:1.7.6" | ||
} | ||
} | ||
|
||
project('ripc-composition-rxjava1') { | ||
project('ripc-rxjava1') { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Taking out the reference to "composition" doesn't tell me what this module's responsibility is now. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The reason to remove "composition" was to not indicate that it is just for functional composition. In the other issue, we are discussing to rename the layer to "adapter" (or adaptor), so when we do that we can make the name change part of the project. |
||
description = 'Reactive IPC Composition Layer Implementation' | ||
dependencies { | ||
// ripc-tcp | ||
compile project(":ripc-transport-netty4") | ||
compile project(":ripc-protocol-tcp") | ||
compile 'io.reactivex:rxjava:1.0.8' | ||
compile 'io.reactivex:rxjava-reactive-streams:0.3.0' | ||
} | ||
} | ||
|
||
// RxJava 1.0 | ||
compile "io.reactivex:rxjava:$rxjava1Version" | ||
project('ripc-rxjava1-examples') { | ||
description = 'Reactive IPC Composition Layer examples' | ||
|
||
compileTestJava { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should add compileJava compat for 1.7. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure. I did not change the build file much from the version currently in the master. Let me look at it again to see what else should be modified/added. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have made changes to the build file, have a look. |
||
sourceCompatibility = 1.8 | ||
targetCompatibility = 1.8 | ||
} | ||
|
||
dependencies { | ||
// ripc-tcp | ||
compile project(":ripc-rxjava1") | ||
compile project(":ripc-transport-netty4-examples") | ||
} | ||
} | ||
|
||
|
@@ -125,7 +155,7 @@ configure(rootProject) { | |
it.tasks.getByName("jar") | ||
} | ||
} | ||
options.memberLevel = org.gradle.external.javadoc.JavadocMemberLevel.PROTECTED | ||
options.memberLevel = JavadocMemberLevel.PROTECTED | ||
options.author = true | ||
options.header = rootProject.description | ||
//options.overview = "src/api/overview.html" | ||
|
This file was deleted.
This file was deleted.
This file was deleted.
This file was deleted.
This file was deleted.
This file was deleted.
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
package io.ripc.internal; | ||
|
||
import org.reactivestreams.Publisher; | ||
import org.reactivestreams.Subscriber; | ||
import org.reactivestreams.Subscription; | ||
|
||
/** | ||
* Temporary utility class for creating and transforming {@link Publisher}s. | ||
*/ | ||
public class Publishers { | ||
|
||
public static <T> Publisher<T> just(final T value) { | ||
return new Publisher<T>() { | ||
@Override | ||
public void subscribe(final Subscriber<? super T> s) { | ||
s.onSubscribe(new Subscription() { | ||
@Override | ||
public void request(long n) { | ||
s.onNext(value); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There should be a positive demand check here per the spec. The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, just to be clear, the implementation is not feature complete. It was a strawman to start discussion in light of all layers being implemented.
Not sure if having an independent Specifications class helps in this case. How does it makes sure that everyone abides by the spec? |
||
s.onComplete(); | ||
} | ||
|
||
@Override | ||
public void cancel() { | ||
} | ||
}); | ||
} | ||
}; | ||
} | ||
|
||
public static <T> Publisher<T> error(final Throwable t) { | ||
return new Publisher<T>() { | ||
@Override | ||
public void subscribe(final Subscriber<? super T> s) { | ||
s.onSubscribe(new Subscription() { | ||
@Override | ||
public void request(long n) { | ||
s.onError(t); | ||
} | ||
|
||
@Override | ||
public void cancel() { | ||
} | ||
}); | ||
} | ||
}; | ||
} | ||
} |
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,19 +1,53 @@ | ||
package io.ripc.protocol.tcp; | ||
|
||
import io.ripc.core.io.Buffer; | ||
import org.reactivestreams.Publisher; | ||
|
||
/** | ||
* A {@code Connection} is a Reactive Streams {@link org.reactivestreams.Publisher} that provides subscribers with | ||
* inbound data and exposes the {@link #write(org.reactivestreams.Publisher)} method for sending outbound data. | ||
* An abstraction for a TCP connection. | ||
* | ||
* @param <R> The type of objects read from this connection. | ||
* @param <W> The type of objects written to this connection. | ||
*/ | ||
public interface Connection<B> extends Publisher<Buffer<B>> { | ||
public interface Connection<R, W> extends Publisher<R> { | ||
|
||
/** | ||
* Send outbound data using the Reactive Streams {@code Publisher} contract. | ||
* | ||
* @param data publisher of outbound data | ||
*/ | ||
void write(Publisher<Buffer<B>> data); | ||
/** | ||
* Writes the passed stream of {@code data} and returns the result as a {@link Publisher}. All items emitted by | ||
* this stream are flushed on completion of the stream. | ||
* | ||
* @param data Data stream to write. | ||
* | ||
* @return Result of write. | ||
*/ | ||
Publisher<Void> write(Publisher<W> data); | ||
|
||
/** | ||
* Writes the passed stream of {@code data} and returns the result as a {@link Publisher}. All written items are | ||
* flushed whenever the passed {@code flushSelector} returns {@code true} | ||
* | ||
* @param data Data stream to write. | ||
* @param flushSelector Selector that is invoked after every emitted item is written. If this selector returns | ||
* {@code true} then all items written till now are flushed. | ||
* | ||
* @return Result of write. | ||
*/ | ||
Publisher<Void> write(Publisher<W> data, FlushSelector<W> flushSelector); | ||
|
||
/** | ||
* A function that is used for determining when a flush has to be invoked on the underlying channel. | ||
* | ||
* @param <W> Type of items emitted by the stream using this selector. | ||
*/ | ||
interface FlushSelector<W> { | ||
|
||
/** | ||
* Selects whether flush should be invoked on the channel. | ||
* | ||
* @param count The index of this item. Starts with 1. | ||
* @param lastWrittenItem Item which was last written before calling this selector. | ||
* | ||
* @return {@code true} if flush should be invoked. | ||
*/ | ||
boolean select(long count, W lastWrittenItem); | ||
|
||
} | ||
} |
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
package io.ripc.protocol.tcp; | ||
|
||
import org.reactivestreams.Publisher; | ||
|
||
public interface TcpHandler<R, W> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As a general rule, I'm not a fan of duplicating names in both packages and class names. It also doesn't "handle" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The reason I added Tcp prefix was that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Good point. Maybe this should be applied across-the-board so that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Indeed, it will be. |
||
|
||
Publisher<Void> handle(Connection<R, W> connection); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What does the returned There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a server construct. One server can only ever have a single handler.
The only subscriber to this This does not wire the connection close. Netty4TcpServer.create(0)
.start(connection -> Publishers.error(new IllegalStateException())); OTOH, this does as it is wiring the write to the returned Netty4TcpServer.create(0)
.start(connection -> connection.write(connection)); |
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
package io.ripc.protocol.tcp; | ||
|
||
public interface TcpInterceptor<I, O, II, OO> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This shouldn't have 4 generics placeholders. 2 is sufficient with 2 transformative ones on the method. Using this in practice will be horrific if the names are of any length. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree that 4 generic types are verbose but I don't see how moving two to the method level will make it any different. The method definition then becomes horrific. Finagle makes it simpler for interceptors that do not change the types, by having a |
||
|
||
TcpHandler<II, OO> intercept(TcpHandler<I, O> handler); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was envisioning interceptors being analogous to Netty's pipeline feature. Pipelines can be dynamically altered and new ones are created when new connections are made. How would we use this intercept mechanism to wrap each new connection dynamically (especially those that come in as a result of an automatic reconnect after a client connection has died)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes and No. Yes because it can change the types of input and output on the channel (read and write in netty pipeline) and no because it does not intercept any other channel lifecycle event apart from read/write.
Have a look at the example here which is wrapping over the next interceptor/handler in the chain and hence is in complete control of what connection to pass downstream.
I did not quiet understand, can you elaborate? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Reconnection (and connection management in general) is an important use case we have that currently powers the Spring STOMP support. We actually support dynamic reconnection to a completely different address if one is down, so whatever we do with regards to lifecycle events would have to be aware that a connection is not a one-time event and that setup and teardown of connections can happen irregardless of the lifecycle of the client. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think there's also the question of "what exactly are we intercepting" and "to what end?". Are we intercepting only data messages coming in and going out? Those are two different things, actually: reading requires a If we're intercepting for the purposes of providing, say authorization, then we're doing more than just message transformation because we would be possibly closing the connection (canceling the I'm wondering if "intercepting" shouldn't be done at the site of the event ( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I sketched out providing for interceptors on the connection by separating the reading and writing. This allows for different interceptors (and even multiple interceptors) on the read and write which I think will be necessary since reading and writing on a connection is often unbalanced (e.g. more emphasis is often placed on reading data and figuring out what to do than on what's written out to the client, since it can often be nothing more than an ACK). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Actually, it is a very mainstream usecase in almost all applications. The way we do it in Netflix is by introducing a loadbalancer that does the determination of how to retry/load balance. Since, I have not even introduced the client APIs yet, I would like to keep this discussion for the client APIs. The API here is purely for the server.
In this particular case (TCP server interceptors) we are intercepting "connection processing" that will include but not limited to:
I think you are looking at it mostly from the point of view reading/writing data, in which case it may be ok to intercept read/write separately. However, that approach does not work well when we start talking about short-circuiting. The current approach of treating server as a function (issue #15) covers all the usecases. I would like to see if you have a usecase which can not be covered by that design.
The current design does not restrict the server to a single interceptor. I have described above the issue with designing interceptors as being applied on read and write. |
||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO We shouldn't mix layers. We should provide mocks and stick to RS APIs in layers other than composition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aah this was a left-over from me having the example (as a test) inside ripc-protocol-tcp demonstrating the usage. This is a test dep though. I will remove it, it isn't used.