-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Implementing Your Own Operators
You can implement your own Observable operators. This page shows you how.
If your operator is designed to originate an Observable, rather than to transform or react to a source Observable, use the create( )
method rather than trying to implement Observable
manually. Otherwise, you can create a custom operator by following the instructions on this page.
If your operator is designed to act on the individual items emitted by a source Observable, follow the instructions under Sequence Operators below. If your operator is designed to transform the source Observable as a whole (for instance, by applying a particular set of existing RxJava operators to it) follow the instructions under Transformational Operators below.
(Note: in Xtend, a Groovy-like language, you can implement your operators as extension methods and can thereby chain them directly without using the methods described on this page. See RxJava and Xtend for details.)
The following example shows how you can use the lift( )
operator to chain your custom operator (in this example: myOperator
) alongside standard RxJava operators like ofType
and map
:
fooObservable = barObservable.ofType(Integer).map({it*2}).lift(new myOperator<T>()).map({"transformed by myOperator: " + it});
The following section shows how you form the scaffolding of your operator so that it will work correctly with lift( )
.
Define your operator as a public class that implements the Operator
interface, like so:
public class myOperator<T> implements Operator<T> {
public myOperator( /* any necessary params here */ ) {
/* any necessary initialization here */
}
@Override
public Subscriber<? super T> call(final Subscriber<? super T> s) {
return new Subscriber<t>(s) {
@Override
public void onCompleted() {
/* add your own onCompleted behavior here, or just pass the completed notification through: */
if(!s.isUnsubscribed()) {
s.onCompleted();
}
}
@Override
public void onError(Throwable t) {
/* add your own onError behavior here, or just pass the error notification through: */
if(!s.isUnsubscribed()) {
s.onError(t);
}
}
@Override
public void onNext(T item) {
/* this example performs some sort of operation on each incoming item and emits the results */
if(!s.isUnsubscribed()) {
transformedItem = myOperatorTransformOperation(item);
s.onNext(transformedItem);
}
}
};
}
}
- Your operator should check its Subscriber's
isUnsubscribed( )
status before it emits any item to (or sends any notification to) the Subscriber. Do not waste time generating items that no Subscriber is interested in seeing. - Your operator should obey the core tenets of the Observable contract:
- It may call a Subscriber's
onNext( )
method any number of times, but these calls must be non-overlapping. - It may call either a Subscriber's
onCompleted( )
oronError( )
method, but not both, exactly once, and it may not subsequently call a Subscriber'sonNext( )
method. - If you are unable to guarantee that your operator conforms to the above two tenets, you can add the
serialize( )
operator to it to force the correct behavior.
- It may call a Subscriber's
- Do not block within your operator.
- It is usually best that you compose new operators by combining existing ones, to the extent that this is possible, rather than reinventing the wheel. RxJava itself does this with some of its standard operators, for example:
-
first( )
is defined astake(1)
.
single( )
-
ignoreElements( )
is defined asfilter(alwaysFalse( ))
-
reduce(a)
is defined asscan(a)
.
last( )
-
- If your operator uses functions or lambdas that are passed in as parameters (predicates, for instance), note that these may be sources of exceptions, and be prepared to catch these and notify subscribers via
onError( )
calls.- Some exceptions are considered "fatal" and for them there's no point in trying to call
onError( )
because that will either be futile or will just compound the problem. You can use theExceptions.throwIfFatal(throwable)
method to filter out such fatal exceptions and rethrow them rather than try to notify about them.
- Some exceptions are considered "fatal" and for them there's no point in trying to call
- In general, notify subscribers of error conditions immediately, rather than making an effort to emit more items first.
The following example shows how you can use the compose( )
operator to chain your custom operator (in this example, an operator called myTransformer
that transforms an Observable that emits Integers into one that emits Strings) alongside standard RxJava operators like ofType
and map
:
fooObservable = barObservable.ofType(Integer).map({it*2}).compose(new myTransformer<Integer,String>()).map({"transformed by myOperator: " + it});
The following section shows how you form the scaffolding of your operator so that it will work correctly with compose( )
.
Define your transforming function as a public class that implements the Transformer
interface, like so:
public class myTransformer<Integer,String> implements Transformer<Integer,String> {
public myTransformer( /* any necessary params here */ ) {
/* any necessary initialization here */
}
@Override
public Observable<String> call(Observable<Integer> source) {
/*
* this simple example Transformer applies map() to the source Observable as a whole
* in order to transform the "source" observable from one that emits integers to one
* that emits the string representations of those integers.
*/
return t1.map( new Func1<Integer,String>() {
@Override
public String call(Integer t1) {
return String.valueOf(t1);
}
} );
}
}
Copyright (c) 2016-present, RxJava Contributors.
Twitter @RxJava | Gitter @RxJava