Skip to content

Commit a596f0f

Browse files
committed
Adds delay operator to Single
This commit adds the `delay(long delay, TimeUnit unit, Scheduler scheduler)` and `delay(long delay, TimeUnit unit)` operators to `rx.Single`.
1 parent 0198217 commit a596f0f

File tree

2 files changed

+91
-2
lines changed

2 files changed

+91
-2
lines changed

src/main/java/rx/Single.java

+48
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import rx.annotations.Experimental;
2222
import rx.exceptions.Exceptions;
2323
import rx.exceptions.OnErrorNotImplementedException;
24+
import rx.functions.Action0;
2425
import rx.functions.Action1;
2526
import rx.functions.Func1;
2627
import rx.functions.Func2;
@@ -32,6 +33,7 @@
3233
import rx.functions.Func8;
3334
import rx.functions.Func9;
3435
import rx.internal.operators.OnSubscribeToObservableFuture;
36+
import rx.internal.operators.OperatorDelay;
3537
import rx.internal.operators.OperatorDoOnEach;
3638
import rx.internal.operators.OperatorMap;
3739
import rx.internal.operators.OperatorObserveOn;
@@ -1898,4 +1900,50 @@ public void onNext(T t) {
18981900

18991901
return lift(new OperatorDoOnEach<T>(observer));
19001902
}
1903+
1904+
/**
1905+
* Returns an Single that emits the items emitted by the source Single shifted forward in time by a
1906+
* specified delay. Error notifications from the source Single are not delayed.
1907+
* <p>
1908+
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/delay.s.png" alt="">
1909+
* <dl>
1910+
* <dt><b>Scheduler:</b></dt>
1911+
* <dd>you specify which {@link Scheduler} this operator will use</dd>
1912+
* </dl>
1913+
*
1914+
* @param delay
1915+
* the delay to shift the source by
1916+
* @param unit
1917+
* the time unit of {@code delay}
1918+
* @param scheduler
1919+
* the {@link Scheduler} to use for delaying
1920+
* @return the source Single shifted in time by the specified delay
1921+
* @see <a href="http://reactivex.io/documentation/operators/delay.html">ReactiveX operators documentation: Delay</a>
1922+
*/
1923+
@Experimental
1924+
public final Single<T> delay(long delay, TimeUnit unit, Scheduler scheduler) {
1925+
return lift(new OperatorDelay<T>(delay, unit, scheduler));
1926+
}
1927+
1928+
/**
1929+
* Returns an Single that emits the items emitted by the source Single shifted forward in time by a
1930+
* specified delay. Error notifications from the source Observable are not delayed.
1931+
* <p>
1932+
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/delay.png" alt="">
1933+
* <dl>
1934+
* <dt><b>Scheduler:</b></dt>
1935+
* <dd>This version of {@code delay} operates by default on the {@code compuation} {@link Scheduler}.</dd>
1936+
* </dl>
1937+
*
1938+
* @param delay
1939+
* the delay to shift the source by
1940+
* @param unit
1941+
* the {@link TimeUnit} in which {@code period} is defined
1942+
* @return the source Single shifted in time by the specified delay
1943+
* @see <a href="http://reactivex.io/documentation/operators/delay.html">ReactiveX operators documentation: Delay</a>
1944+
*/
1945+
@Experimental
1946+
public final Single<T> delay(long delay, TimeUnit unit) {
1947+
return delay(delay, unit, Schedulers.computation());
1948+
}
19011949
}

src/test/java/rx/SingleTest.java

+43-2
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,12 @@
3939
import rx.functions.Action1;
4040
import rx.functions.Func1;
4141
import rx.functions.Func2;
42+
import rx.schedulers.TestScheduler;
4243
import rx.observers.TestSubscriber;
4344
import rx.schedulers.Schedulers;
4445
import rx.subscriptions.Subscriptions;
4546

47+
4648
public class SingleTest {
4749

4850
@Test
@@ -436,7 +438,7 @@ public void call() {
436438
fail("timed out waiting for latch");
437439
}
438440
}
439-
441+
440442
@Test
441443
public void testBackpressureAsObservable() {
442444
Single<String> s = Single.create(new OnSubscribe<String>() {
@@ -462,7 +464,7 @@ public void onStart() {
462464

463465
ts.assertValue("hello");
464466
}
465-
467+
466468
@Test
467469
public void testToObservable() {
468470
Observable<String> a = Single.just("a").toObservable();
@@ -648,4 +650,43 @@ public void doOnSuccessShouldNotSwallowExceptionThrownByAction() {
648650

649651
verify(action).call(eq("value"));
650652
}
653+
654+
@Test
655+
public void delayWithSchedulerShouldDelayCompletion() {
656+
TestScheduler scheduler = new TestScheduler();
657+
Single<Integer> single = Single.just(1).delay(100, TimeUnit.DAYS, scheduler);
658+
659+
TestSubscriber<Integer> subscriber = new TestSubscriber<Integer>();
660+
single.subscribe(subscriber);
661+
662+
subscriber.assertNotCompleted();
663+
scheduler.advanceTimeBy(99, TimeUnit.DAYS);
664+
subscriber.assertNotCompleted();
665+
scheduler.advanceTimeBy(91, TimeUnit.DAYS);
666+
subscriber.assertCompleted();
667+
subscriber.assertValue(1);
668+
}
669+
670+
@Test
671+
public void delayWithSchedulerShouldShortCutWithFailure() {
672+
TestScheduler scheduler = new TestScheduler();
673+
final RuntimeException expected = new RuntimeException();
674+
Single<Integer> single = Single.create(new OnSubscribe<Integer>() {
675+
@Override
676+
public void call(SingleSubscriber<? super Integer> singleSubscriber) {
677+
singleSubscriber.onSuccess(1);
678+
singleSubscriber.onError(expected);
679+
}
680+
}).delay(100, TimeUnit.DAYS, scheduler);
681+
682+
TestSubscriber<Integer> subscriber = new TestSubscriber<Integer>();
683+
single.subscribe(subscriber);
684+
685+
subscriber.assertNotCompleted();
686+
scheduler.advanceTimeBy(99, TimeUnit.DAYS);
687+
subscriber.assertNotCompleted();
688+
scheduler.advanceTimeBy(91, TimeUnit.DAYS);
689+
subscriber.assertNoValues();
690+
subscriber.assertError(expected);
691+
}
651692
}

0 commit comments

Comments
 (0)