-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathInterop101.java
104 lines (81 loc) · 2.88 KB
/
Interop101.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package org.reactivestreams.examples;
import akka.actor.ActorSystem;
import akka.stream.ActorFlowMaterializer;
import akka.stream.FlowMaterializer;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import org.reactivestreams.Publisher;
import ratpack.http.ResponseChunks;
import ratpack.server.RatpackServer;
import reactor.Environment;
import reactor.io.codec.StringCodec;
import reactor.io.net.NetStreams;
import reactor.io.net.http.HttpClient;
import reactor.rx.Promise;
import reactor.rx.Stream;
import reactor.rx.Streams;
import rx.Observable;
import rx.RxReactiveStreams;
import scala.runtime.BoxedUnit;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* @author Stephane Maldini
* Original credit and code Roland Kuhn: https://github.com/rkuhn/ReactiveStreamsInterop/blob/master/src/main/java/com/rolandkuhn/rsinterop/JavaMain.java
*/
public class Interop101 {
static public void main(String... args) throws Exception {
final ActorSystem system = ActorSystem.create("InteropTest");
final FlowMaterializer mat = ActorFlowMaterializer.create(system);
Environment.initialize().assignErrorJournal();
final int chunks = 20;
// RxJava Observable
final Observable<Integer> intObs = Observable.range(1, chunks);
// Reactive Streams Publisher
final Publisher<Integer> intPub = RxReactiveStreams.toPublisher(intObs);
// Akka Streams Source
final Source<String, BoxedUnit> stringSource = Source
.from(intPub)
.map(Object::toString)
.scan("", (prev, next) -> prev+next);
// Reactive Streams Publisher
final Publisher<String> stringPub = stringSource.runWith(Sink.fanoutPublisher(1, 1), mat);
// Reactor Stream starting with START on the subscriber thread, then emits Akka Source with some log
final Stream<String> linesStream = Streams
.wrap(stringPub)
.log("reactor.map")
.map(i -> i + "!");
//A Ratpack http server
RatpackServer server = RatpackServer.of(spec -> spec
.handlers(chain -> chain
.get(":name", ctx ->
// and now render the HTTP response
ctx.render(ResponseChunks.stringChunks(ctx.stream(linesStream)))
)
)
);
server.start();
//A Reactor http client
HttpClient<String, String> client = NetStreams.httpClient(conf ->
conf.connect("localhost", 5050).codec(new StringCodec())
);
//Fire a get request that will only ping "anchorMan"
Promise<List<String>> result = client.get("/anchorMan")
//flatten the chunked results amd aggregate into a single List "Promise"
.flatMap(replies ->
replies
.log("reactor.replies")
.toList()
);
//Subscribe result with RxJava ReactiveStreams bridge
RxReactiveStreams
.toObservable(result)
.flatMap(Observable::from)
.toBlocking()
.forEach(System.out::println);
//shutdown server
server.stop();
//shutdown actors
system.shutdown();
}
}