Skip to content

Remove Consumer/Processor/Producer #25

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.reactivestreams.example.multicast;

import org.reactivestreams.Publisher;

public class MulticastExample {

/**
* Each subscribe will join an existing stream.
*
* @param args
* @throws InterruptedException
*/
public static void main(String... args) throws InterruptedException {
Publisher<Stock> dataStream = new StockPricePublisher();

dataStream.subscribe(new StockPriceSubscriber(5, 500)); // 500ms on each event, infinite
dataStream.subscribe(new StockPriceSubscriber(10, 2000)); // 2000ms on each event, infinite
Thread.sleep(5000);
dataStream.subscribe(new StockPriceSubscriber(10, 111, 20)); // 111ms on each event, take 20
Thread.sleep(5000);
dataStream.subscribe(new StockPriceSubscriber(10, 222, 20));// 222ms on each event, take 20
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package org.reactivestreams.example.multicast;

import java.util.Arrays;
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

/**
* Simulate a network connection that is firing data at you.
* <p>
* Purposefully not using the `Subscriber` and `Publisher` types to not confuse things with `StockPricePublisher`
*/
public class NeverEndingStockStream {

private static final NeverEndingStockStream INSTANCE = new NeverEndingStockStream();

private NeverEndingStockStream() {
}

// using array because it is far faster than list/set for iteration
// which is where most of the time in the tight loop will go (... well, beside object allocation)
private volatile Handler[] handlers = new Handler[0];

public static synchronized void addHandler(Handler handler) {
if (INSTANCE.handlers.length == 0) {
INSTANCE.handlers = new Handler[] { handler };
} else {
Handler[] newHandlers = new Handler[INSTANCE.handlers.length + 1];
System.arraycopy(INSTANCE.handlers, 0, newHandlers, 0, INSTANCE.handlers.length);
newHandlers[newHandlers.length - 1] = handler;
INSTANCE.handlers = newHandlers;
}
INSTANCE.startIfNeeded();
}

public static synchronized void removeHandler(Handler handler) {
// too lazy to do the array handling
HashSet<Handler> set = new HashSet<>(Arrays.asList(INSTANCE.handlers));
set.remove(handler);
INSTANCE.handlers = set.toArray(new Handler[set.size()]);
}

public static interface Handler {
public void handle(Stock event);
}

private final AtomicBoolean running = new AtomicBoolean(false);
private final AtomicLong stockIndex = new AtomicLong();

private void startIfNeeded() {
if (running.compareAndSet(false, true)) {
new Thread(new Runnable() {

@Override
public void run() {
while (handlers.length > 0) {
long l = stockIndex.incrementAndGet();
Stock s = new Stock(l);
for (Handler h : handlers) {
h.handle(s);
}
try {
// just so it is someone sane how fast this is moving
Thread.sleep(1);
} catch (InterruptedException e) {
}
}
running.set(false);
}

}).start();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.reactivestreams.example.multicast;

public class Stock {

private final long l;

public Stock(long l) {
this.l = l;
}

public long getPrice() {
return l;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package org.reactivestreams.example.multicast;

import java.util.concurrent.atomic.AtomicInteger;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.reactivestreams.example.multicast.NeverEndingStockStream.Handler;

/**
* Publisher of stock prices from a never ending stream.
* <p>
* It will share a single underlying stream with as many subscribers as it receives.
* <p>
* If the subscriber can not keep up, it will drop (different strategies could be implemented, configurable, etc).
*/
public class StockPricePublisher implements Publisher<Stock> {

@Override
public void subscribe(final Subscriber<Stock> s) {
s.onSubscribe(new Subscription() {

AtomicInteger capacity = new AtomicInteger();
EventHandler handler = new EventHandler(s, capacity);

@Override
public void signalAdditionalDemand(int n) {
if (capacity.getAndAdd(n) == 0) {
// was at 0, so start up consumption again
startConsuming();
}
}

@Override
public void cancel() {
System.out.println("StockPricePublisher => Cancel Subscription");
NeverEndingStockStream.removeHandler(handler);
}

public void startConsuming() {
NeverEndingStockStream.addHandler(handler);
}

});

}

private static final class EventHandler implements Handler {
private final Subscriber<Stock> s;
private final AtomicInteger capacity;

private EventHandler(Subscriber<Stock> s, AtomicInteger capacity) {
this.s = s;
this.capacity = capacity;
}

@Override
public void handle(Stock event) {
int c = capacity.get();
if (c == 0) {
// shortcut instead of doing decrement/increment loops while no capacity
return;
}
if (capacity.getAndDecrement() > 0) {
s.onNext(event);
} else {
// we just decremented below 0 so increment back one
capacity.incrementAndGet();
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package org.reactivestreams.example.multicast;

import java.util.concurrent.ArrayBlockingQueue;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class StockPriceSubscriber implements Subscriber<Stock> {

private final ArrayBlockingQueue<Stock> buffer;
private final int delayPerStock;
private volatile boolean terminated = false;
private final int take;

public StockPriceSubscriber(int bufferSize, int delayPerStock, int take) {
this.buffer = new ArrayBlockingQueue<>(bufferSize);
this.delayPerStock = delayPerStock;
this.take = take;
}

public StockPriceSubscriber(int bufferSize, int delayPerStock) {
this(bufferSize, delayPerStock, -1);
}

@Override
public void onSubscribe(Subscription s) {
System.out.println("StockPriceSubscriber.onSubscribe => request " + buffer.remainingCapacity());
s.signalAdditionalDemand(buffer.remainingCapacity());
startAsyncWork(s);
}

@Override
public void onNext(Stock t) {
buffer.add(t);
}

@Override
public void onError(Throwable t) {
terminated = true;
throw new RuntimeException(t);
}

@Override
public void onCompleted() {
terminated = true;
}

private void startAsyncWork(final Subscription s) {
System.out.println("StockPriceSubscriber => Start new worker thread");
/* don't write real code like this! just for quick demo */
new Thread(new Runnable() {
public void run() {
int received = 0;

while (!terminated) {
Stock v = buffer.poll();
try {
Thread.sleep(delayPerStock);
} catch (Exception e) {
e.printStackTrace();
}
if (buffer.size() < 3) {
s.signalAdditionalDemand(buffer.remainingCapacity());
}
if (v != null) {
received++;
System.out.println("StockPriceSubscriber[" + delayPerStock + "] => " + v.getPrice());
if (take > 0 && received >= take) {
s.cancel();
terminated = true;
}
}
}
}
}).start();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package org.reactivestreams.example.unicast;

import java.util.concurrent.atomic.AtomicInteger;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

class InfiniteIncrementNumberPublisher implements Publisher<Integer> {

@Override
public void subscribe(final Subscriber<Integer> s) {

final AtomicInteger i = new AtomicInteger();

Subscription subscription = new Subscription() {

AtomicInteger capacity = new AtomicInteger();

@Override
public void signalAdditionalDemand(int n) {
System.out.println("signalAdditionalDemand => " + n);
if (capacity.getAndAdd(n) == 0) {
// start sending again if it wasn't already running
send();
}
}

private void send() {
System.out.println("send => " + capacity.get());
// this would normally use an eventloop, actor, whatever
new Thread(new Runnable() {

public void run() {
do {
s.onNext(i.incrementAndGet());
} while (capacity.decrementAndGet() > 0);
}
}).start();
}

@Override
public void cancel() {
capacity.set(-1);
}

};

s.onSubscribe(subscription);

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package org.reactivestreams.example.unicast;

import java.util.concurrent.ArrayBlockingQueue;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

class NumberSubscriberThatHopsThreads implements Subscriber<Integer> {

final int BUFFER_SIZE = 10;
private final ArrayBlockingQueue<Integer> buffer = new ArrayBlockingQueue<>(BUFFER_SIZE);
private volatile boolean terminated = false;
private final String token;

NumberSubscriberThatHopsThreads(String token) {
this.token = token;
}

@Override
public void onSubscribe(Subscription s) {
System.out.println("onSubscribe => request " + BUFFER_SIZE);
s.signalAdditionalDemand(BUFFER_SIZE);
startAsyncWork(s);
}

@Override
public void onNext(Integer t) {
buffer.add(t);
}

@Override
public void onError(Throwable t) {
terminated = true;
throw new RuntimeException(t);
}

@Override
public void onCompleted() {
terminated = true;
}

private void startAsyncWork(final Subscription s) {
System.out.println("**** Start new worker thread");
/* don't write real code like this! just for quick demo */
new Thread(new Runnable() {
public void run() {
while (!terminated) {
Integer v = buffer.poll();
try {
Thread.sleep(100);
} catch (Exception e) {
e.printStackTrace();
}
if (buffer.size() < 3) {
s.signalAdditionalDemand(BUFFER_SIZE - buffer.size());
}
if (v != null) {
System.out.println(token + " => Did stuff with v: " + v);
}
}
}
}).start();
}
}
Loading