Skip to content

Commit bcd1e46

Browse files
committed
Add toSubscriber() for converting subscriber types.
Closes #24.
1 parent 354856d commit bcd1e46

File tree

1 file changed

+11
-1
lines changed

1 file changed

+11
-1
lines changed

rxjava-reactive-streams/src/main/java/rx/RxReactiveStreams.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,19 @@ public static <T> Observable<T> toObservable(final Publisher<T> publisher) {
5555
return Observable.create(new Observable.OnSubscribe<T>() {
5656
@Override
5757
public void call(final rx.Subscriber<? super T> rxSubscriber) {
58-
publisher.subscribe(new SubscriberAdapter<T>(rxSubscriber));
58+
publisher.subscribe(toSubscriber(rxSubscriber));
5959
}
6060
});
6161
}
6262

63+
/**
64+
* Convert an RxJava {@link rx.Subscriber} into a Reactive Streams {@link org.reactivestreams.Subscriber}.
65+
*
66+
* @param rxSubscriber an RxJava subscriber
67+
* @return a Reactive Streams subscriber
68+
*/
69+
public static <T> org.reactivestreams.Subscriber<T> toSubscriber(final rx.Subscriber<T> rxSubscriber) {
70+
return new SubscriberAdapter<T>(rxSubscriber);
71+
}
72+
6373
}

0 commit comments

Comments
 (0)