File tree 5 files changed +31
-30
lines changed
main/java/rx/internal/reactivestreams
test/java/rx/reactivestreams
5 files changed +31
-30
lines changed Original file line number Diff line number Diff line change @@ -2,8 +2,8 @@ description = "Adapter between RxJava and ReactiveStreams"
2
2
3
3
dependencies {
4
4
compile ' io.reactivex:rxjava:1.0.+'
5
- compile ' org.reactivestreams:reactive-streams:0.4.0 '
6
- testCompile ' org.reactivestreams:reactive-streams-tck:0.4.0 '
5
+ compile ' org.reactivestreams:reactive-streams:1.0.0.M1 '
6
+ testCompile ' org.reactivestreams:reactive-streams-tck:1.0.0.M1 '
7
7
}
8
8
9
9
test {
Original file line number Diff line number Diff line change 23
23
import java .util .HashSet ;
24
24
import java .util .Set ;
25
25
import java .util .concurrent .atomic .AtomicBoolean ;
26
+ import java .util .concurrent .atomic .AtomicLong ;
26
27
27
28
public class PublisherAdapter <T > implements Publisher <T > {
28
29
@@ -39,9 +40,17 @@ public void subscribe(final Subscriber<? super T> s) {
39
40
if (subscribers .add (s )) {
40
41
observable .subscribe (new rx .Subscriber <T >() {
41
42
private final AtomicBoolean done = new AtomicBoolean ();
43
+ private final AtomicLong pending = new AtomicLong (Long .MIN_VALUE );
42
44
43
45
private void doRequest (long n ) {
44
- request (n );
46
+ if (!done .get ()) {
47
+ if (pending .addAndGet (n ) >= 0 ) {
48
+ unsubscribe ();
49
+ onError (new IllegalStateException ("Violation of rule 3.17 - more than Long.MAX_VALUE elements requested" ));
50
+ } else {
51
+ request (n );
52
+ }
53
+ }
45
54
}
46
55
47
56
@ Override
@@ -70,27 +79,34 @@ public void cancel() {
70
79
}
71
80
}
72
81
73
- private void fireDone () {
74
- if (done .compareAndSet (false , true )) {
82
+ private boolean fireDone () {
83
+ boolean first = done .compareAndSet (false , true );
84
+ if (first ) {
75
85
subscribers .remove (s );
76
86
}
87
+ return first ;
77
88
}
78
89
79
90
@ Override
80
91
public void onCompleted () {
81
- s .onComplete ();
92
+ if (fireDone ()) {
93
+ s .onComplete ();
94
+ }
82
95
}
83
96
84
97
@ Override
85
98
public void onError (Throwable e ) {
86
- s .onError (e );
87
- fireDone ();
99
+ if (fireDone ()) {
100
+ s .onError (e );
101
+ }
88
102
}
89
103
90
104
@ Override
91
105
public void onNext (T t ) {
92
- s .onNext (t );
93
- fireDone ();
106
+ if (!done .get ()) {
107
+ pending .decrementAndGet ();
108
+ s .onNext (t );
109
+ }
94
110
}
95
111
});
96
112
} else {
Original file line number Diff line number Diff line change 23
23
import rx .Observable ;
24
24
import rx .RxReactiveStreams ;
25
25
import rx .reactivestreams .test .CountdownIterable ;
26
+ import rx .schedulers .Schedulers ;
26
27
27
28
@ Test
28
29
public class TckPublisherTest extends PublisherVerification <Long > {
@@ -34,14 +35,12 @@ public TckPublisherTest() {
34
35
super (new TestEnvironment (DEFAULT_TIMEOUT_MILLIS ), PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS );
35
36
}
36
37
37
- @ Override
38
- public long maxElementsFromPublisher () {
39
- return Long .MAX_VALUE ;
40
- }
41
-
42
38
@ Override
43
39
public Publisher <Long > createPublisher (long elements ) {
44
- return RxReactiveStreams .toPublisher (Observable .from (new CountdownIterable (elements )));
40
+ return RxReactiveStreams .toPublisher (
41
+ Observable .from (new CountdownIterable (elements ))
42
+ .observeOn (Schedulers .computation ())
43
+ );
45
44
}
46
45
47
46
@ Override
Original file line number Diff line number Diff line change @@ -66,5 +66,4 @@ public Publisher<Long> createHelperPublisher(long elements) {
66
66
return RxReactiveStreams .toPublisher (Observable .from (new CountdownIterable (elements )));
67
67
}
68
68
69
-
70
69
}
Original file line number Diff line number Diff line change @@ -113,17 +113,4 @@ public Publisher<Long> createHelperPublisher(long elements) {
113
113
return RxReactiveStreams .toPublisher (Observable .from (new CountdownIterable (elements )));
114
114
}
115
115
116
- public void spec309_callingRequestWithNegativeNumberMustThrow () throws Throwable {
117
- notVerified (); // nonsense test, subscriber doesn't create any Subscription implementations
118
- }
119
-
120
- @ Override
121
- public void spec309_callingRequestZeroMustThrow () throws Throwable {
122
- notVerified (); // nonsense test, subscriber doesn't create any Subscription implementations
123
- }
124
-
125
- @ Override
126
- public void spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue () throws Throwable {
127
- notVerified (); // nonsense test, the publisher should implement this
128
- }
129
116
}
You can’t perform that action at this time.
0 commit comments