Skip to content

Commit 01ad7f7

Browse files
Merge pull request #423 from square/zachklipp/completable-asworker
Add Completable.asWorker() operator.
2 parents a5c703c + 9ddb5dd commit 01ad7f7

File tree

3 files changed

+68
-1
lines changed

3 files changed

+68
-1
lines changed

kotlin/workflow-core/src/main/java/com/squareup/workflow/Worker.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,8 @@ interface Worker<out T> {
194194
* Creates a [Worker] that just performs some side effects and doesn't emit anything.
195195
*
196196
* The returned [Worker] will equate to any other workers created with this function that have
197-
* the same key.
197+
* the same key. The key is required for this builder because there is no type information
198+
* available to distinguish workers.
198199
*
199200
* E.g.:
200201
* ```

kotlin/workflow-rx2/src/main/java/com/squareup/workflow/rx2/RxWorkers.kt

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package com.squareup.workflow.rx2
1818
import com.squareup.workflow.Worker
1919
import com.squareup.workflow.Worker.Emitter
2020
import com.squareup.workflow.emitAll
21+
import io.reactivex.Completable
2122
import io.reactivex.Flowable
2223
import io.reactivex.Maybe
2324
import io.reactivex.Observable
@@ -108,3 +109,15 @@ inline fun <reified T : Any> Single<out T?>.asWorker(key: String = ""): Worker<T
108109
// This !! works because RxJava types don't actually allow nulls, it's just that they can't
109110
// express that in their types because Java.
110111
Worker.from(key) { await()!! }
112+
113+
/**
114+
* Creates a [Worker] from this [Completable].
115+
*
116+
* The [Completable] will be subscribed to when the [Worker] is started, and disposed when it is
117+
* cancelled.
118+
*
119+
* The key is required for this operator because there is no type information available to
120+
* distinguish workers.
121+
*/
122+
fun Completable.asWorker(key: String) =
123+
Worker.createSideEffect(key) { await() }

kotlin/workflow-rx2/src/test/java/com/squareup/workflow/rx2/RxWorkersTest.kt

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import io.reactivex.Flowable
2121
import io.reactivex.Maybe
2222
import io.reactivex.Observable
2323
import io.reactivex.Single
24+
import io.reactivex.subjects.CompletableSubject
2425
import io.reactivex.subjects.MaybeSubject
2526
import io.reactivex.subjects.PublishSubject
2627
import io.reactivex.subjects.SingleSubject
@@ -310,4 +311,56 @@ class RxWorkersTest {
310311
}
311312

312313
// endregion
314+
315+
// region Completable
316+
317+
@Test fun `completable emits`() {
318+
val subject = CompletableSubject.create()
319+
val worker = subject.asWorker("")
320+
321+
worker.test {
322+
subject.onComplete()
323+
assertFinished()
324+
}
325+
}
326+
327+
@Test fun `completable throws`() {
328+
val subject = CompletableSubject.create()
329+
val worker = subject.asWorker("")
330+
331+
worker.test {
332+
subject.onError(ExpectedException())
333+
assertTrue(getException() is ExpectedException)
334+
}
335+
}
336+
337+
@Test fun `completable is subscribed lazily`() {
338+
var subscriptions = 0
339+
val subject = CompletableSubject.create()
340+
val worker = subject.doOnSubscribe { subscriptions++ }
341+
.asWorker("")
342+
343+
assertEquals(0, subscriptions)
344+
345+
worker.test {
346+
assertEquals(1, subscriptions)
347+
}
348+
}
349+
350+
@Test fun `completable is disposed when worker cancelled`() {
351+
var cancels = 0
352+
val subject = CompletableSubject.create()
353+
val worker = subject.doOnDispose { cancels++ }
354+
.asWorker("")
355+
356+
assertEquals(0, cancels)
357+
358+
worker.test {
359+
assertEquals(0, cancels)
360+
cancelWorker()
361+
assertEquals(1, cancels)
362+
}
363+
}
364+
365+
// endregion
313366
}

0 commit comments

Comments
 (0)