Skip to content

Commit 46b1049

Browse files
committed
Upgrade to 1.0 RC4 of RS.
Closes #142.
1 parent 442d5c3 commit 46b1049

File tree

7 files changed

+34
-12
lines changed

7 files changed

+34
-12
lines changed

rxjava-reactive-streams/build.gradle

+2-2
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ apply plugin: 'java'
44

55
dependencies {
66
compile 'io.reactivex:rxjava:1.0.+'
7-
compile 'org.reactivestreams:reactive-streams:1.0.0.RC1'
8-
testCompile 'org.reactivestreams:reactive-streams-tck:1.0.0.RC1'
7+
compile 'org.reactivestreams:reactive-streams:1.0.0.RC4'
8+
testCompile 'org.reactivestreams:reactive-streams-tck:1.0.0.RC4'
99
}
1010

1111
test {

rxjava-reactive-streams/src/main/java/rx/internal/reactivestreams/PublisherAdapter.java

+1-9
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import java.util.HashSet;
2424
import java.util.Set;
2525
import java.util.concurrent.atomic.AtomicBoolean;
26-
import java.util.concurrent.atomic.AtomicLong;
2726

2827
public class PublisherAdapter<T> implements Publisher<T> {
2928

@@ -40,16 +39,10 @@ public void subscribe(final Subscriber<? super T> s) {
4039
if (subscribers.add(s)) {
4140
observable.subscribe(new rx.Subscriber<T>() {
4241
private final AtomicBoolean done = new AtomicBoolean();
43-
private final AtomicLong pending = new AtomicLong(Long.MIN_VALUE);
4442

4543
private void doRequest(long n) {
4644
if (!done.get()) {
47-
if (pending.addAndGet(n) >= 0) {
48-
unsubscribe();
49-
onError(new IllegalStateException("Violation of rule 3.17 - more than Long.MAX_VALUE elements requested"));
50-
} else {
51-
request(n);
52-
}
45+
request(n);
5346
}
5447
}
5548

@@ -105,7 +98,6 @@ public void onError(Throwable e) {
10598
@Override
10699
public void onNext(T t) {
107100
if (!done.get()) {
108-
pending.decrementAndGet();
109101
s.onNext(t);
110102
}
111103
}

rxjava-reactive-streams/src/main/java/rx/internal/reactivestreams/SubscriberAdapter.java

+10
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ public SubscriberAdapter(rx.Subscriber<? super T> rxSubscriber) {
3434

3535
@Override
3636
public void onSubscribe(final Subscription rsSubscription) {
37+
if (rsSubscription == null) {
38+
throw new NullPointerException("onSubscribe(null)");
39+
}
40+
3741
if (started.compareAndSet(false, true)) {
3842
rxSubscriber.add(Subscriptions.create(new Action0() {
3943
@Override
@@ -57,11 +61,17 @@ public void request(long n) {
5761

5862
@Override
5963
public void onNext(T t) {
64+
if (t == null) {
65+
throw new NullPointerException("onNext(null)");
66+
}
6067
rxSubscriber.onNext(t);
6168
}
6269

6370
@Override
6471
public void onError(Throwable t) {
72+
if (t == null) {
73+
throw new NullPointerException("onError(null)");
74+
}
6575
rxSubscriber.onError(t);
6676
}
6777

rxjava-reactive-streams/src/test/java/rx/reactivestreams/TckPublisherTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public Publisher<Long> createPublisher(long elements) {
4444
}
4545

4646
@Override
47-
public Publisher<Long> createErrorStatePublisher() {
47+
public Publisher<Long> createFailedPublisher() {
4848
// Null because we always successfully subscribe.
4949
// If the observable is in error state, it will subscribe and then emit the error as the first item
5050
// This is not an “error state” publisher as defined by RS

rxjava-reactive-streams/src/test/java/rx/reactivestreams/TckSubscriberBlackboxTest.java

+5
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,11 @@ public void onNext(Long aLong) {
6161
});
6262
}
6363

64+
@Override
65+
public Long createElement(int element) {
66+
return Long.valueOf(Integer.toString(element));
67+
}
68+
6469
@Override
6570
public Publisher<Long> createHelperPublisher(long elements) {
6671
return RxReactiveStreams.toPublisher(Observable.from(new CountdownIterable(elements)));

rxjava-reactive-streams/src/test/java/rx/reactivestreams/TckSubscriberWhiteboxTest.java

+5
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,11 @@ public void onComplete() {
108108
};
109109
}
110110

111+
@Override
112+
public Long createElement(int element) {
113+
return Long.valueOf(Integer.toString(element));
114+
}
115+
111116
@Override
112117
public Publisher<Long> createHelperPublisher(long elements) {
113118
return RxReactiveStreams.toPublisher(Observable.from(new CountdownIterable(elements)));

rxjava-reactive-streams/src/test/java/rx/reactivestreams/test/RsSubscriber.java

+10
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,18 @@ public Wait(long count, CountDownLatch latch) {
4545

4646
@Override
4747
public void onSubscribe(Subscription s) {
48+
if (s == null) {
49+
throw new NullPointerException("onSubscribe(null)");
50+
}
51+
4852
subscription = s;
4953
}
5054

5155
@Override
5256
public void onNext(T t) {
57+
if (t == null) {
58+
throw new NullPointerException("onNext(null)");
59+
}
5360
received.add(t);
5461
for (Wait wait : waits) {
5562
if (received.size() >= wait.count) {
@@ -60,6 +67,9 @@ public void onNext(T t) {
6067

6168
@Override
6269
public void onError(Throwable t) {
70+
if (t == null) {
71+
throw new NullPointerException("onError(null)");
72+
}
6373
error = t;
6474
unwaitAll();
6575
}

0 commit comments

Comments
 (0)