Skip to content

Commit 7579cba

Browse files
rkuhnbenjchristensen
authored andcommitted
API/SPI Combination + Contract Details and Examples
Squash of #37 Work as a result of discussion in #19 Removes TCK implementation so it is not out of sync with API as per #39
1 parent 287ca7a commit 7579cba

32 files changed

+579
-2497
lines changed

Diff for: README.md

+65-31
Large diffs are not rendered by default.

Diff for: api/.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
/bin

Diff for: spi/build.sbt renamed to api/build.sbt

File renamed without changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package org.reactivestreams.example.multicast;
2+
3+
import org.reactivestreams.Publisher;
4+
5+
public class MulticastExample {
6+
7+
/**
8+
* Each subscribe will join an existing stream.
9+
*
10+
* @param args
11+
* @throws InterruptedException
12+
*/
13+
public static void main(String... args) throws InterruptedException {
14+
Publisher<Stock> dataStream = new StockPricePublisher();
15+
16+
dataStream.subscribe(new StockPriceSubscriber(5, 500)); // 500ms on each event, infinite
17+
dataStream.subscribe(new StockPriceSubscriber(10, 2000)); // 2000ms on each event, infinite
18+
Thread.sleep(5000);
19+
dataStream.subscribe(new StockPriceSubscriber(10, 111, 20)); // 111ms on each event, take 20
20+
Thread.sleep(5000);
21+
dataStream.subscribe(new StockPriceSubscriber(10, 222, 20));// 222ms on each event, take 20
22+
}
23+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package org.reactivestreams.example.multicast;
2+
3+
import java.util.Arrays;
4+
import java.util.HashSet;
5+
import java.util.concurrent.atomic.AtomicBoolean;
6+
import java.util.concurrent.atomic.AtomicLong;
7+
8+
/**
9+
* Simulate a network connection that is firing data at you.
10+
* <p>
11+
* Purposefully not using the `Subscriber` and `Publisher` types to not confuse things with `StockPricePublisher`
12+
*/
13+
public class NeverEndingStockStream {
14+
15+
private static final NeverEndingStockStream INSTANCE = new NeverEndingStockStream();
16+
17+
private NeverEndingStockStream() {
18+
}
19+
20+
// using array because it is far faster than list/set for iteration
21+
// which is where most of the time in the tight loop will go (... well, beside object allocation)
22+
private volatile Handler[] handlers = new Handler[0];
23+
24+
public static synchronized void addHandler(Handler handler) {
25+
if (INSTANCE.handlers.length == 0) {
26+
INSTANCE.handlers = new Handler[] { handler };
27+
} else {
28+
Handler[] newHandlers = new Handler[INSTANCE.handlers.length + 1];
29+
System.arraycopy(INSTANCE.handlers, 0, newHandlers, 0, INSTANCE.handlers.length);
30+
newHandlers[newHandlers.length - 1] = handler;
31+
INSTANCE.handlers = newHandlers;
32+
}
33+
INSTANCE.startIfNeeded();
34+
}
35+
36+
public static synchronized void removeHandler(Handler handler) {
37+
// too lazy to do the array handling
38+
HashSet<Handler> set = new HashSet<>(Arrays.asList(INSTANCE.handlers));
39+
set.remove(handler);
40+
INSTANCE.handlers = set.toArray(new Handler[set.size()]);
41+
}
42+
43+
public static interface Handler {
44+
public void handle(Stock event);
45+
}
46+
47+
private final AtomicBoolean running = new AtomicBoolean(false);
48+
private final AtomicLong stockIndex = new AtomicLong();
49+
50+
private void startIfNeeded() {
51+
if (running.compareAndSet(false, true)) {
52+
new Thread(new Runnable() {
53+
54+
@Override
55+
public void run() {
56+
while (handlers.length > 0) {
57+
long l = stockIndex.incrementAndGet();
58+
Stock s = new Stock(l);
59+
for (Handler h : handlers) {
60+
h.handle(s);
61+
}
62+
try {
63+
// just so it is someone sane how fast this is moving
64+
Thread.sleep(1);
65+
} catch (InterruptedException e) {
66+
}
67+
}
68+
running.set(false);
69+
}
70+
71+
}).start();
72+
}
73+
}
74+
75+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package org.reactivestreams.example.multicast;
2+
3+
public class Stock {
4+
5+
private final long l;
6+
7+
public Stock(long l) {
8+
this.l = l;
9+
}
10+
11+
public long getPrice() {
12+
return l;
13+
}
14+
15+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package org.reactivestreams.example.multicast;
2+
3+
import java.util.concurrent.atomic.AtomicInteger;
4+
5+
import org.reactivestreams.Subscription;
6+
import org.reactivestreams.Subscriber;
7+
import org.reactivestreams.Publisher;
8+
import org.reactivestreams.example.multicast.NeverEndingStockStream.Handler;
9+
10+
/**
11+
* Publisher of stock prices from a never ending stream.
12+
* <p>
13+
* It will share a single underlying stream with as many subscribers as it receives.
14+
* <p>
15+
* If the subscriber can not keep up, it will drop (different strategies could be implemented, configurable, etc).
16+
*/
17+
public class StockPricePublisher implements Publisher<Stock> {
18+
19+
@Override
20+
public void subscribe(final Subscriber<Stock> s) {
21+
s.onSubscribe(new Subscription() {
22+
23+
AtomicInteger capacity = new AtomicInteger();
24+
EventHandler handler = new EventHandler(s, capacity);
25+
26+
@Override
27+
public void request(int n) {
28+
if (capacity.getAndAdd(n) == 0) {
29+
// was at 0, so start up consumption again
30+
startConsuming();
31+
}
32+
}
33+
34+
@Override
35+
public void cancel() {
36+
System.out.println("StockPricePublisher => Cancel Subscription");
37+
NeverEndingStockStream.removeHandler(handler);
38+
}
39+
40+
public void startConsuming() {
41+
NeverEndingStockStream.addHandler(handler);
42+
}
43+
44+
});
45+
46+
}
47+
48+
private static final class EventHandler implements Handler {
49+
private final Subscriber<Stock> s;
50+
private final AtomicInteger capacity;
51+
52+
private EventHandler(Subscriber<Stock> s, AtomicInteger capacity) {
53+
this.s = s;
54+
this.capacity = capacity;
55+
}
56+
57+
@Override
58+
public void handle(Stock event) {
59+
int c = capacity.get();
60+
if (c == 0) {
61+
// shortcut instead of doing decrement/increment loops while no capacity
62+
return;
63+
}
64+
if (capacity.getAndDecrement() > 0) {
65+
s.onNext(event);
66+
} else {
67+
// we just decremented below 0 so increment back one
68+
capacity.incrementAndGet();
69+
}
70+
}
71+
}
72+
73+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package org.reactivestreams.example.multicast;
2+
3+
import java.util.concurrent.ArrayBlockingQueue;
4+
5+
import org.reactivestreams.Subscription;
6+
import org.reactivestreams.Subscriber;
7+
8+
public class StockPriceSubscriber implements Subscriber<Stock> {
9+
10+
private final ArrayBlockingQueue<Stock> buffer;
11+
private final int delayPerStock;
12+
private volatile boolean terminated = false;
13+
private final int take;
14+
15+
public StockPriceSubscriber(int bufferSize, int delayPerStock, int take) {
16+
this.buffer = new ArrayBlockingQueue<>(bufferSize);
17+
this.delayPerStock = delayPerStock;
18+
this.take = take;
19+
}
20+
21+
public StockPriceSubscriber(int bufferSize, int delayPerStock) {
22+
this(bufferSize, delayPerStock, -1);
23+
}
24+
25+
@Override
26+
public void onSubscribe(Subscription s) {
27+
System.out.println("StockPriceSubscriber.onSubscribe => request " + buffer.remainingCapacity());
28+
s.request(buffer.remainingCapacity());
29+
startAsyncWork(s);
30+
}
31+
32+
@Override
33+
public void onNext(Stock t) {
34+
buffer.add(t);
35+
}
36+
37+
@Override
38+
public void onError(Throwable t) {
39+
terminated = true;
40+
throw new RuntimeException(t);
41+
}
42+
43+
@Override
44+
public void onComplete() {
45+
terminated = true;
46+
}
47+
48+
private void startAsyncWork(final Subscription s) {
49+
System.out.println("StockPriceSubscriber => Start new worker thread");
50+
/* don't write real code like this! just for quick demo */
51+
new Thread(new Runnable() {
52+
public void run() {
53+
int received = 0;
54+
55+
while (!terminated) {
56+
Stock v = buffer.poll();
57+
try {
58+
Thread.sleep(delayPerStock);
59+
} catch (Exception e) {
60+
e.printStackTrace();
61+
}
62+
if (buffer.size() < 3) {
63+
s.request(buffer.remainingCapacity());
64+
}
65+
if (v != null) {
66+
received++;
67+
System.out.println("StockPriceSubscriber[" + delayPerStock + "] => " + v.getPrice());
68+
if (take > 0 && received >= take) {
69+
s.cancel();
70+
terminated = true;
71+
}
72+
}
73+
}
74+
}
75+
}).start();
76+
}
77+
78+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package org.reactivestreams.example.unicast;
2+
3+
import java.util.concurrent.atomic.AtomicInteger;
4+
5+
import org.reactivestreams.Subscription;
6+
import org.reactivestreams.Subscriber;
7+
import org.reactivestreams.Publisher;
8+
9+
class InfiniteIncrementNumberPublisher implements Publisher<Integer> {
10+
11+
@Override
12+
public void subscribe(final Subscriber<Integer> s) {
13+
14+
final AtomicInteger i = new AtomicInteger();
15+
16+
Subscription subscription = new Subscription() {
17+
18+
AtomicInteger capacity = new AtomicInteger();
19+
20+
@Override
21+
public void request(int n) {
22+
System.out.println("signalAdditionalDemand => " + n);
23+
if (capacity.getAndAdd(n) == 0) {
24+
// start sending again if it wasn't already running
25+
send();
26+
}
27+
}
28+
29+
private void send() {
30+
System.out.println("send => " + capacity.get());
31+
// this would normally use an eventloop, actor, whatever
32+
new Thread(new Runnable() {
33+
34+
public void run() {
35+
do {
36+
s.onNext(i.incrementAndGet());
37+
} while (capacity.decrementAndGet() > 0);
38+
}
39+
}).start();
40+
}
41+
42+
@Override
43+
public void cancel() {
44+
capacity.set(-1);
45+
}
46+
47+
};
48+
49+
s.onSubscribe(subscription);
50+
51+
}
52+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package org.reactivestreams.example.unicast;
2+
3+
import java.util.concurrent.ArrayBlockingQueue;
4+
5+
import org.reactivestreams.Subscription;
6+
import org.reactivestreams.Subscriber;
7+
8+
class NumberSubscriberThatHopsThreads implements Subscriber<Integer> {
9+
10+
final int BUFFER_SIZE = 10;
11+
private final ArrayBlockingQueue<Integer> buffer = new ArrayBlockingQueue<>(BUFFER_SIZE);
12+
private volatile boolean terminated = false;
13+
private final String token;
14+
15+
NumberSubscriberThatHopsThreads(String token) {
16+
this.token = token;
17+
}
18+
19+
@Override
20+
public void onSubscribe(Subscription s) {
21+
System.out.println("onSubscribe => request " + BUFFER_SIZE);
22+
s.request(BUFFER_SIZE);
23+
startAsyncWork(s);
24+
}
25+
26+
@Override
27+
public void onNext(Integer t) {
28+
buffer.add(t);
29+
}
30+
31+
@Override
32+
public void onError(Throwable t) {
33+
terminated = true;
34+
throw new RuntimeException(t);
35+
}
36+
37+
@Override
38+
public void onComplete() {
39+
terminated = true;
40+
}
41+
42+
private void startAsyncWork(final Subscription s) {
43+
System.out.println("**** Start new worker thread");
44+
/* don't write real code like this! just for quick demo */
45+
new Thread(new Runnable() {
46+
public void run() {
47+
while (!terminated) {
48+
Integer v = buffer.poll();
49+
try {
50+
Thread.sleep(100);
51+
} catch (Exception e) {
52+
e.printStackTrace();
53+
}
54+
if (buffer.size() < 3) {
55+
s.request(BUFFER_SIZE - buffer.size());
56+
}
57+
if (v != null) {
58+
System.out.println(token + " => Did stuff with v: " + v);
59+
}
60+
}
61+
}
62+
}).start();
63+
}
64+
}

0 commit comments

Comments
 (0)