Skip to content

Transforming Observables

DavidMGross edited this page Jun 5, 2013 · 83 revisions

This section explains operators with which you can transform items that are emitted by an Observable.

  • map( ) — transform the items emitted by an Observable by applying a function to each of them
  • mapMany( ) or flatMap( ) — transform the items emitted by an Observable into Observables, then flatten this into a single Observable
  • mapManyDelayError( ) — transform the items emitted by an Observable into Observables, then flatten this into a single Observable, waiting to report errors until all error-free observables have a chance to complete
  • reduce( ) or aggregate( ) — apply a function to each emitted item, sequentially, and emit only the final accumulated value
  • scan( ) — apply a function to each item emitted by an Observable, sequentially, and emit each successive value
  • groupBy( ) — divide an Observable into a set of Observables that emit groups of items from the original Observable, organized by key
  • buffer( ) — periodically gather items from an Observable into bundles and emit these bundles rather than emitting the items one at a time

map( )

transform the items emitted by an Observable by applying a function to each of them

The map( ) method applies a function of your choosing to every item emitted by an Observable, and returns this transformation as a new Observable. For example, the following code maps a function that squares the incoming value onto the values in numbers:

numbers = Observable.toObservable([1, 2, 3, 4, 5]);

Observable.map(numbers, {it * it}).subscribe(
  [ onNext:{ myWriter.println(it); },
    onCompleted:{ myWriter.println("Sequence complete"); },
    onError:{ myWriter.println("Error encountered"); } ]
);
1
4
9
16
25
Sequence complete

In addition to calling map( ) as a stand-alone method, you can also call it as a method of an Observable, so, in the example above, instead of

Observable.map(numbers, { it * it }) ...

you could instead write

numbers.map({ it * it }) ...

mapMany( ) or flatMap( ), and mapManyDelayError( )

Transform the items emitted by an Observable into Observables, then flatten this into a single Observable

The mapMany( ) method (or flatMap( ), which has identical behavior) creates a new Observable by applying a function that you supply to each item emitted by the original Observable, where that function is itself an Observable that emits items, and then merges the results of that function applied to every item emitted by the original Observable, emitting these merged results.

This method is useful, for example, when you have an Observable that emits a series of items that themselves have Observable members or are in other ways transformable into Observables, so that you can create a new Observable that emits the complete collection of items emitted by the sub-Observables of these items.

// this closure is an Observable that emits three numbers
numbers   = Observable.toObservable([1, 2, 3]);
// this closure is an Observable that emits two numbers based on what number it is passed
multiples = { n -> Observable.toObservable([ n*2, n*3 ]) };   

numbers.mapMany(multiples).subscribe(
  [ onNext:{ myWriter.println(it.toString()); },
    onCompleted:{ myWriter.println("Sequence complete"); },
    onError:{ myWriter.println("Error encountered"); } ]
);
2
3
4
6
6
9
Sequence complete

If any of the individual Observables mapped to the items from the source Observable in mapMany( ) aborts by invoking onError, the mapMany( ) call itself will immediately abort and invoke onError. If you would prefer that the map-many operation continue emitting the results of the remaining, error-free Observables before reporting the error, use mapManyDelayError( ) instead.

Because it is possible for more than one of the individual Observables to encounter an error, mapManyDelayError( ) may pass information about multiple errors to the onError method of its Observers (which it will never invoke more than once). For this reason, if you want to know the nature of these errors, you should write your onError method so that it accepts a parameter of the class CompositeException.

reduce( ) or aggregate( )

Apply a function to each emitted item, sequentially, and emit only the final accumulated value

The reduce( ) method (or aggregate( ), which has the same behavior) returns an Observable that applies a function of your choosing to the first item emitted by a source Observable, then feeds the result of that function along with the second item emitted by the source Observable into the same function, then feeds the result of that function along with the third item into the same function, and so on until all items have been emitted by the source Observable. Then it emits the final result from the final call to your function as the sole output from the returned Observable.

This technique, which is called “reduce” or “aggregate” here, is sometimes called “fold,” “accumulate,” “compress,” or “inject” in other programming contexts.

For example, the following code uses reduce( ) to compute, and then emit as an Observable, the sum of the numbers emitted by the source Observable:

numbers = Observable.toObservable([1, 2, 3, 4, 5]);

Observable.reduce(numbers, { a, b -> a+b }).subscribe(
  [ onNext:{ myWriter.println(it); },
    onCompleted:{ myWriter.println("Sequence complete"); },
    onError:{ myWriter.println("Error encountered"); } ]
);
15
Sequence complete

In addition to calling reduce( ) as a stand-alone method, you can also call it as a method of an Observable, so, in the example above, instead of

Observable.reduce(numbers, { a, b -> a+b }) ...

you could instead write

numbers.reduce({ a, b -> a+b }) ...

There is also a version of reduce( ) to which you can pass a seed item in addition to an accumulator function:

Observable.reduce(my_observable, initial_seed, accumulator_closure)

or

my_observable.reduce(initial_seed, accumulator_closure)

Note that passing a null seed is not the same as not passing a seed. The behavior will be different. If you pass a seed of null, you will be seeding your reduction with the item null.

scan( )

Apply a function to each item emitted by an Observable and emit each successive value

The scan( ) method returns an Observable that applies a function of your choosing to the first item emitted by a source Observable, then feeds the result of that function along with the second item emitted by the source Observable into the same function, then feeds the result of that function along with the third item into the same function, and so on until all items have been emitted by the source Observable. It emits the result of each of these iterations from the returned Observable. This sort of function is sometimes called an “accumulator.”

For example, the following code takes an Observable that emits a consecutive sequence of n integers starting with 1 and converts it into an Observable that emits the first n triangular numbers:

numbers = Observable.toObservable([1, 2, 3, 4, 5]);

Observable.scan(numbers, { a, b -> a+b }).subscribe(
  [ onNext:{ myWriter.println(it); },
    onCompleted:{ myWriter.println("Sequence complete"); },
    onError:{ myWriter.println("Error encountered"); } ]
);
1
3
6
10
15
Sequence complete

In addition to calling scan( ) as a stand-alone method, you can also call it as a method of an Observable, so, in the example above, instead of

Observable.scan(numbers, { a, b -> a+b }) ...

you could instead write

numbers.scan({ a, b -> a+b }) ...

There is also a version of scan( ) to which you can pass a seed item in addition to an accumulator function:

Observable.scan(my_observable, initial_seed, accumulator_closure)

or

my_observable.scan(initial_seed, accumulator_closure)

Note: if you pass a seed item to scan( ), it will emit the seed itself as its first item.

Note also that passing a null seed is not the same as not passing a seed. The behavior will be different. If you pass a seed of null, you will be seeding your scan with null, and scan( ) will emit null as its first item.

groupBy( )

divide an Observable into a set of Observables that emit groups of items from the original Observable, organized by key

The groupBy( ) method creates or extracts a key from all of the items emitted by a source Observable. For each unique key created in this way, groupBy( ) creates a GroupedObservable that emits all of the items emitted by the source Observable that match that key. groupBy( ) then emits each of these Observables, as an Observable. A GroupedObservable has a method, getKey( ) with which you can retrieve the key that defines the GroupedObservable.

There are two versions of groupBy( ):

  1. One version takes two parameters: the source Observable and a function that takes as its parameter an item emitted by the source Observable and returns the key.
  2. The second version adds a third parameter: a function that takes as its parameter an item emitted by the source Observable and returns an item to be emitted by the resulting GroupedObservable (the first version just emits the source Observable's items unchanged).

The following sample code uses groupBy( ) to transform a list of numbers into two lists, grouped by whether or not the numbers are even:

class isEven implements rx.util.functions.Func1
{
  java.lang.Object call(java.lang.Object T) { return(0 == (T % 2)); }
}

def numbers = Observable.toObservable([1, 2, 3, 4, 5, 6, 7, 8, 9]);
def groupFunc = new isEven();

numbers.groupBy(groupFunc).mapMany({ Observable.reduce(it, [it.getKey()], {a, b -> a << b}) }).subscribe(
  [onNext:{ api.servletResponse.getWriter().println(it) },
   onCompleted:{ api.servletResponse.getWriter().println("Sequence complete"); },
   onError:{ api.servletResponse.getWriter().println("Error encountered"); } ]
)
[false, 1, 3, 5, 7, 9]
[true, 2, 4, 6, 8]
Sequence complete

buffer( )

periodically gather items emitted by an Observable into bundles and emit these bundles rather than emitting the items one at a time

Clone this wiki locally