Skip to content

Additional considerations around the first PR iteration #20

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ ext {

// Logging
slf4jVersion = '1.7.10'
logbackVersion = '1.1.2'
logbackVersion = '1.1.3'

// Network Transports
nettyVersion = '4.0.26.Final'

// Composition Libraries
rxjava1Version = '1.0.7'
rxjava1Version = '1.0.8'
reactorVersion = '2.0.0.RELEASE'

// Testing
mockitoVersion = '1.10.19'
Expand Down Expand Up @@ -59,6 +60,7 @@ allprojects {
}

repositories {
maven { url 'http://repo.spring.io/libs-release' }
jcenter()
mavenCentral()
}
Expand All @@ -69,6 +71,7 @@ subprojects { subproject ->
// Testing
testCompile "junit:junit:$junitVersion",
"org.hamcrest:hamcrest-library:1.3",
"org.mockito:mockito-core:$mockitoVersion",
"org.slf4j:slf4j-api:$slf4jVersion"
testRuntime "ch.qos.logback:logback-classic:$logbackVersion"
}
Expand Down Expand Up @@ -108,7 +111,19 @@ project('ripc-composition-rxjava1') {
compile project(":ripc-transport-netty4")

// RxJava 1.0
compile "io.reactivex:rxjava:$rxjava1Version"
compile "io.reactivex:rxjava:$rxjava1Version",
'io.reactivex:rxjava-reactive-streams:0.3.0'
}
}

project('ripc-composition-reactor') {
description = 'Reactive IPC Composition Layer Implementation'
dependencies {
// ripc-tcp
compile project(":ripc-transport-netty4")

// Reactor
compile "io.projectreactor:reactor-stream:$reactorVersion"
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.ripc.composition.reactor.tcp;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import reactor.io.buffer.Buffer;

/**
* Created by jbrisbin on 3/30/15.
*/
@ChannelHandler.Sharable
public class ReactorBufferChannelHandler extends ChannelDuplexHandler {

public static final ChannelDuplexHandler INSTANCE = new ReactorBufferChannelHandler();

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (Buffer.class.isAssignableFrom(msg.getClass())) {
Buffer reactorBuf = (Buffer) msg;
ByteBuf nettyBuf = ctx.alloc().buffer(reactorBuf.capacity());
nettyBuf.writeBytes(reactorBuf.asBytes());
msg = nettyBuf;
}
super.write(ctx, msg, promise);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (ByteBuf.class.isAssignableFrom(msg.getClass())) {
msg = new Buffer(((ByteBuf) msg).nioBuffer());
}
super.channelRead(ctx, msg);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io.ripc.composition.reactor.tcp;

import io.ripc.composition.reactor.tcp.connection.ReactorTcpConnection;
import io.ripc.protocol.tcp.TcpServer;
import io.ripc.transport.netty4.tcp.server.NettyTcpServer;
import org.reactivestreams.Subscriber;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;
import reactor.rx.Stream;
import reactor.rx.broadcast.Broadcaster;

/**
* An implementation of a TCP server that emits new connections as they are created.
*/
public class ReactorTcpServer<R, W> extends Stream<ReactorTcpConnection<R, W>> {

private final Broadcaster<ReactorTcpConnection<R, W>> connections = Broadcaster.create();

private TcpServer<ReactorTcpConnection<R, W>> server;

public ReactorTcpServer(int port, Codec<Buffer, R, W> codec) {
this.server = NettyTcpServer.listen(port)
.intercept(conn -> new ReactorTcpConnection<>(conn, codec))
.handler(connections::onNext);
}

public static <R, W> ReactorTcpServer<R, W> listen(int port, Codec<Buffer, R, W> codec) {
return new ReactorTcpServer<>(port, codec);
}

public void shutdown() {
server.shutdown();
}

@Override
public void subscribe(Subscriber<? super ReactorTcpConnection<R, W>> s) {
server.start();
connections.subscribe(s);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package io.ripc.composition.reactor.tcp.connection;

import io.netty.channel.ChannelHandlerContext;
import io.ripc.composition.reactor.tcp.ReactorBufferChannelHandler;
import io.ripc.protocol.tcp.connection.TcpConnection;
import io.ripc.protocol.tcp.connection.listener.WriteCompleteListener;
import io.ripc.transport.netty4.listener.ChannelActiveListener;
import reactor.fn.Function;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;
import reactor.rx.Stream;
import reactor.rx.Streams;
import reactor.rx.broadcast.Broadcaster;

/**
* Created by jbrisbin on 3/27/15.
*/
public class ReactorTcpConnection<R, W> {

private final Broadcaster<Object> writeComplete = Broadcaster.create();

private final TcpConnection connection;
private final Codec<Buffer, R, W> codec;
private final Stream<R> in;

@SuppressWarnings("unchecked")
public ReactorTcpConnection(TcpConnection connection, Codec<Buffer, R, W> codec) {
this.connection = connection;
this.codec = codec;

this.connection.addListener(new WriteCompleteListener() {
@SuppressWarnings("unchecked")
@Override
public boolean writeComplete(TcpConnection connection, long count, Object msg) {
writeComplete.onNext(msg);
return true;
}
});

this.connection.addListener(new ChannelActiveListener() {
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.pipeline().addBefore("reactive-ipc-inbound",
"byteBufToBufferHandler",
ReactorBufferChannelHandler.INSTANCE);
}
});

this.in = Streams.wrap(connection.reader())
.map(new Function<Object, R>() {
Function<Buffer, R> decoder = (null != codec ? codec.decoder() : null);

@Override
public R apply(Object o) {
return (null != decoder ? decoder.apply((Buffer) o) : (R) o);
}
});
}

public Stream<R> in() {
return in;
}

public Stream<Object> out(Stream<W> out) {
connection.writer(out.observeComplete(v -> writeComplete.onComplete())
.when(Throwable.class, writeComplete::onError)
.map(codec.encoder()));
return writeComplete;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.ripc.composition.reactor;

import io.ripc.composition.reactor.tcp.ReactorTcpServer;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.io.codec.StandardCodecs;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
* Integration tests for Reactor implementations of RIPC components.
*/
public class ReactorTcpServerIntegrationTests {

private static final Logger LOG = LoggerFactory.getLogger(ReactorTcpServerIntegrationTests.class);

@Test
public void reactorTcpServerAcceptsData() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);

ReactorTcpServer<String, String> server = ReactorTcpServer.listen(3000, StandardCodecs.STRING_CODEC);

server.log("connection")
.consume(conn -> conn.out(conn.in()
.log("in"))
.log("confirmation")
.observeComplete(v -> latch.countDown())
.consume(buf -> LOG.info("write confirmed: {}", buf)));

while (!latch.await(1, TimeUnit.SECONDS)) {
}

server.shutdown();
}

}
11 changes: 0 additions & 11 deletions ripc-core/src/main/java/io/ripc/core/Consumer.java

This file was deleted.

19 changes: 19 additions & 0 deletions ripc-core/src/main/java/io/ripc/core/DemandCalculator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.ripc.core;

/**
* A {@code DemandCalculator} implementation is responsible for calculating what the demand value should be to send to
* {@link org.reactivestreams.Subscription#request(long)}.
*/
public interface DemandCalculator {

/**
* Calculate demand based on current pending backlog. A value {@code < 1} means "don't make any new requests" since
* values less than {@code 1} are illegal according to the Reactive Streams spec. A value of {@code >= 1} means "use
* this value as the demand".
*
* @param pending outstanding backlog of previous demand accumulations
* @return &lt 1 to indicate no requests should be performed, &gt= 1 to indicate positive demand
*/
long calculateDemand(long pending);

}
8 changes: 8 additions & 0 deletions ripc-core/src/main/java/io/ripc/core/EventListener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.ripc.core;

/**
* An {@code EventListener} is a generic component that has no default behavior. Components are meant to use specific
* subclasses of this interface to intercept events and perform appropriate behavior.
*/
public interface EventListener {
}
9 changes: 0 additions & 9 deletions ripc-core/src/main/java/io/ripc/core/Function.java

This file was deleted.

12 changes: 12 additions & 0 deletions ripc-core/src/main/java/io/ripc/core/Handler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package io.ripc.core;

/**
* A {@code Handler} takes an arbitrary object as input and "handles" it.
*/
public interface Handler<T> {
/**
* Handle the given object (do something useful with it).
* @param obj
*/
void handle(T obj);
}
16 changes: 16 additions & 0 deletions ripc-core/src/main/java/io/ripc/core/Interceptor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.ripc.core;

/**
* An {@code Interceptor} takes one object, intercepts it, and transforms it into something else.
*/
public interface Interceptor<T, V> {

/**
* Transform T to V.
*
* @param obj
* @return
*/
V intercept(T obj);

}
17 changes: 17 additions & 0 deletions ripc-core/src/main/java/io/ripc/core/Publishers.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.ripc.core;

import org.reactivestreams.Publisher;

/**
* Created by jbrisbin on 3/26/15.
*/
public final class Publishers {

private Publishers() {
}

public static <T> Publisher<?> just(final T obj) {
return new SingletonPublisher<>(obj);
}

}
52 changes: 52 additions & 0 deletions ripc-core/src/main/java/io/ripc/core/SingletonPublisher.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package io.ripc.core;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.util.concurrent.atomic.AtomicBoolean;

/**
* A {@code SingletonPublisher} provides a single value only once and then calls {@code onComplete}. If the value is
* {@code null}, then the {@link org.reactivestreams.Subscriber#onNext(Object)} is not called but {@link
* org.reactivestreams.Subscriber#onComplete()} is.
*/
public class SingletonPublisher<T> implements Publisher<T>, DemandCalculator {

private final AtomicBoolean requested = new AtomicBoolean(false);

private final T value;

public SingletonPublisher(T value) {
this.value = value;
}

@Override
public long calculateDemand(long pending) {
return (requested.get() ? -1 : 1);
}

@Override
public void subscribe(final Subscriber<? super T> subscriber) {
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long n) {
if (!Specification.spec_3_9_verifyPositiveDemand(n, subscriber)) {
return;
}
if (requested.compareAndSet(false, true)) {
if (null != value) {
subscriber.onNext(value);
}
subscriber.onComplete();
}
}

@Override
public void cancel() {
requested.set(true);
}
});
}

}
Loading