Skip to content

Commit 61c09fa

Browse files
committed
adjusted for upcoming reactive-streams#108 change
1 parent da7be63 commit 61c09fa

File tree

5 files changed

+18
-19
lines changed

5 files changed

+18
-19
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ onError | (onSubscribe onNext* (onError | onComplete)?)
6969

7070
```java
7171
public interface Publisher<T> {
72-
public void subscribe(Subscriber<T> s);
72+
public void subscribe(Subscriber<? super T> s);
7373
}
7474
````
7575

api/src/main/java/org/reactivestreams/Publisher.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ public interface Publisher<T> {
1616
*
1717
* @param s the {@link Subscriber} that will consume signals from this {@link Publisher}
1818
*/
19-
public void subscribe(Subscriber<T> s);
19+
public void subscribe(Subscriber<? super T> s);
2020
}

tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ public void spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithoutPre
244244
public void run(BlackboxTestStage stage) throws Throwable {
245245
final Publisher<T> pub = new Publisher<T>() {
246246
@Override
247-
public void subscribe(Subscriber<T> s) {
247+
public void subscribe(Subscriber<? super T> s) {
248248
s.onComplete();
249249
}
250250
};
@@ -399,7 +399,7 @@ public BlackboxTestStage(TestEnvironment env, boolean runDefaultInit) throws Int
399399
}
400400
}
401401

402-
public Subscriber<T> sub() {
402+
public Subscriber<? super T> sub() {
403403
return subscriber.value();
404404
}
405405

tck/src/main/java/org/reactivestreams/tck/SubscriberWhiteboxVerification.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -529,7 +529,7 @@ public WhiteboxTestStage(TestEnvironment env, boolean runDefaultInit) throws Int
529529
}
530530
}
531531

532-
public Subscriber<T> sub() {
532+
public Subscriber<? super T> sub() {
533533
return subscriber.value();
534534
}
535535

@@ -568,8 +568,8 @@ public void verifyNoAsyncErrors() {
568568
*/
569569
public static class BlackboxSubscriberProxy<T> extends BlackboxProbe<T> implements Subscriber<T> {
570570

571-
public BlackboxSubscriberProxy(TestEnvironment env, Subscriber<T> subscriber) {
572-
super(env, Promise.completed(env, subscriber));
571+
public BlackboxSubscriberProxy(TestEnvironment env, Subscriber<? super T> subscriber) {
572+
super(env, new Promise<Subscriber<? super T>>(env, subscriber));
573573
}
574574

575575
@Override
@@ -598,12 +598,12 @@ public void onComplete() {
598598

599599
public static class BlackboxProbe<T> implements SubscriberProbe<T> {
600600
protected final TestEnvironment env;
601-
protected final Promise<Subscriber<T>> subscriber;
601+
protected final Promise<Subscriber<? super T>> subscriber;
602602

603603
protected final Receptacle<T> elements;
604604
protected final Promise<Throwable> error;
605605

606-
public BlackboxProbe(TestEnvironment env, Promise<Subscriber<T>> subscriber) {
606+
public BlackboxProbe(TestEnvironment env, Promise<Subscriber<? super T>> subscriber) {
607607
this.env = env;
608608
this.subscriber = subscriber;
609609
elements = new Receptacle<T>(env);
@@ -640,7 +640,7 @@ public void expectNext(T expected, long timeoutMillis) throws InterruptedExcepti
640640
}
641641
}
642642

643-
public Subscriber<T> sub() {
643+
public Subscriber<? super T> sub() {
644644
return subscriber.value();
645645
}
646646

@@ -706,7 +706,7 @@ public void expectNone(long withinMillis) throws InterruptedException {
706706
public static class WhiteboxSubscriberProbe<T> extends BlackboxProbe<T> implements SubscriberPuppeteer {
707707
protected Promise<SubscriberPuppet> puppet;
708708

709-
public WhiteboxSubscriberProbe(TestEnvironment env, Promise<Subscriber<T>> subscriber) {
709+
public WhiteboxSubscriberProbe(TestEnvironment env, Promise<Subscriber<? super T>> subscriber) {
710710
super(env, subscriber);
711711
puppet = new Promise<SubscriberPuppet>(env);
712712
}

tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java

+7-8
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,7 @@ public static class ManualPublisher<T> implements Publisher<T> {
441441
protected final TestEnvironment env;
442442

443443
protected long pendingDemand = 0L;
444-
protected Promise<Subscriber<T>> subscriber;
444+
protected Promise<Subscriber<? super T>> subscriber;
445445

446446
protected final Receptacle<Long> requests;
447447

@@ -451,11 +451,11 @@ public ManualPublisher(TestEnvironment env) {
451451
this.env = env;
452452
requests = new Receptacle<Long>(env);
453453
cancelled = new Latch(env);
454-
subscriber = new Promise<Subscriber<T>>(this.env);
454+
subscriber = new Promise<Subscriber<? super T>>(this.env);
455455
}
456456

457457
@Override
458-
public void subscribe(Subscriber<T> s) {
458+
public void subscribe(Subscriber<? super T> s) {
459459
if (!subscriber.isCompleted()) {
460460
subscriber.completeImmediatly(s);
461461

@@ -596,14 +596,13 @@ public void expectClose(long timeoutMillis, String notClosedErrorMsg) throws Int
596596
public static class Promise<T> {
597597
private final TestEnvironment env;
598598

599-
public static <T> Promise<T> completed(TestEnvironment env, T value) {
600-
Promise<T> promise = new Promise<T>(env);
601-
promise.completeImmediatly(value);
602-
return promise;
599+
public Promise(TestEnvironment env) {
600+
this.env = env;
603601
}
604602

605-
public Promise(TestEnvironment env) {
603+
public Promise(TestEnvironment env, T value) {
606604
this.env = env;
605+
completeImmediatly(value);
607606
}
608607

609608
private ArrayBlockingQueue<T> abq = new ArrayBlockingQueue<T>(1);

0 commit comments

Comments
 (0)