From d76e5e4daa45799859ac981826c6b0bbe1781324 Mon Sep 17 00:00:00 2001 From: Roland Kuhn Date: Tue, 2 Sep 2014 10:13:25 +0200 Subject: [PATCH 1/2] allow contravariant use of Subscriber fixes #104 --- api/src/main/java/org/reactivestreams/Publisher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/src/main/java/org/reactivestreams/Publisher.java b/api/src/main/java/org/reactivestreams/Publisher.java index 831340e7..d370ca52 100644 --- a/api/src/main/java/org/reactivestreams/Publisher.java +++ b/api/src/main/java/org/reactivestreams/Publisher.java @@ -16,5 +16,5 @@ public interface Publisher { * * @param s the {@link Subscriber} that will consume signals from this {@link Publisher} */ - public void subscribe(Subscriber s); + public void subscribe(Subscriber s); } From dbc5dadf749b7e0ab95d832e047adace2256d875 Mon Sep 17 00:00:00 2001 From: Roland Kuhn Date: Tue, 2 Sep 2014 11:21:34 +0200 Subject: [PATCH 2/2] also update README to match new Publisher.subscribe (and fix one case of request(int) in the examples) --- README.md | 2 +- .../example/multicast/StockPricePublisher.java | 6 +++--- .../example/unicast/InfiniteIncrementNumberPublisher.java | 5 +++-- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index a6835293..ddd83f04 100644 --- a/README.md +++ b/README.md @@ -69,7 +69,7 @@ onError | (onSubscribe onNext* (onError | onComplete)?) ```java public interface Publisher { - public void subscribe(Subscriber s); + public void subscribe(Subscriber s); } ```` diff --git a/api/src/examples/java/org/reactivestreams/example/multicast/StockPricePublisher.java b/api/src/examples/java/org/reactivestreams/example/multicast/StockPricePublisher.java index a9fab1b4..77ce160a 100644 --- a/api/src/examples/java/org/reactivestreams/example/multicast/StockPricePublisher.java +++ b/api/src/examples/java/org/reactivestreams/example/multicast/StockPricePublisher.java @@ -17,7 +17,7 @@ public class StockPricePublisher implements Publisher { @Override - public void subscribe(final Subscriber s) { + public void subscribe(final Subscriber s) { s.onSubscribe(new Subscription() { AtomicLong capacity = new AtomicLong(); @@ -46,10 +46,10 @@ public void startConsuming() { } private static final class EventHandler implements Handler { - private final Subscriber s; + private final Subscriber s; private final AtomicLong capacity; - private EventHandler(Subscriber s, AtomicLong capacity) { + private EventHandler(Subscriber s, AtomicLong capacity) { this.s = s; this.capacity = capacity; } diff --git a/api/src/examples/java/org/reactivestreams/example/unicast/InfiniteIncrementNumberPublisher.java b/api/src/examples/java/org/reactivestreams/example/unicast/InfiniteIncrementNumberPublisher.java index dc1a432e..528bf0bf 100644 --- a/api/src/examples/java/org/reactivestreams/example/unicast/InfiniteIncrementNumberPublisher.java +++ b/api/src/examples/java/org/reactivestreams/example/unicast/InfiniteIncrementNumberPublisher.java @@ -1,6 +1,7 @@ package org.reactivestreams.example.unicast; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.reactivestreams.Subscription; import org.reactivestreams.Subscriber; @@ -9,13 +10,13 @@ class InfiniteIncrementNumberPublisher implements Publisher { @Override - public void subscribe(final Subscriber s) { + public void subscribe(final Subscriber s) { final AtomicInteger i = new AtomicInteger(); Subscription subscription = new Subscription() { - AtomicInteger capacity = new AtomicInteger(); + AtomicLong capacity = new AtomicLong(); @Override public void request(long n) {