File tree 4 files changed +8
-7
lines changed
examples/java/org/reactivestreams/example
main/java/org/reactivestreams
4 files changed +8
-7
lines changed Original file line number Diff line number Diff line change @@ -69,7 +69,7 @@ onError | (onSubscribe onNext* (onError | onComplete)?)
69
69
70
70
``` java
71
71
public interface Publisher <T> {
72
- public void subscribe (Subscriber<T > s );
72
+ public void subscribe (Subscriber<? super T > s );
73
73
}
74
74
````
75
75
Original file line number Diff line number Diff line change 17
17
public class StockPricePublisher implements Publisher <Stock > {
18
18
19
19
@ Override
20
- public void subscribe (final Subscriber <Stock > s ) {
20
+ public void subscribe (final Subscriber <? super Stock > s ) {
21
21
s .onSubscribe (new Subscription () {
22
22
23
23
AtomicLong capacity = new AtomicLong ();
@@ -46,10 +46,10 @@ public void startConsuming() {
46
46
}
47
47
48
48
private static final class EventHandler implements Handler {
49
- private final Subscriber <Stock > s ;
49
+ private final Subscriber <? super Stock > s ;
50
50
private final AtomicLong capacity ;
51
51
52
- private EventHandler (Subscriber <Stock > s , AtomicLong capacity ) {
52
+ private EventHandler (Subscriber <? super Stock > s , AtomicLong capacity ) {
53
53
this .s = s ;
54
54
this .capacity = capacity ;
55
55
}
Original file line number Diff line number Diff line change 1
1
package org .reactivestreams .example .unicast ;
2
2
3
3
import java .util .concurrent .atomic .AtomicInteger ;
4
+ import java .util .concurrent .atomic .AtomicLong ;
4
5
5
6
import org .reactivestreams .Subscription ;
6
7
import org .reactivestreams .Subscriber ;
9
10
class InfiniteIncrementNumberPublisher implements Publisher <Integer > {
10
11
11
12
@ Override
12
- public void subscribe (final Subscriber <Integer > s ) {
13
+ public void subscribe (final Subscriber <? super Integer > s ) {
13
14
14
15
final AtomicInteger i = new AtomicInteger ();
15
16
16
17
Subscription subscription = new Subscription () {
17
18
18
- AtomicInteger capacity = new AtomicInteger ();
19
+ AtomicLong capacity = new AtomicLong ();
19
20
20
21
@ Override
21
22
public void request (long n ) {
Original file line number Diff line number Diff line change @@ -16,5 +16,5 @@ public interface Publisher<T> {
16
16
*
17
17
* @param s the {@link Subscriber} that will consume signals from this {@link Publisher}
18
18
*/
19
- public void subscribe (Subscriber <T > s );
19
+ public void subscribe (Subscriber <? super T > s );
20
20
}
You can’t perform that action at this time.
0 commit comments