forked from kotlin-hands-on/intro-coroutines
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathRequest9RxProgress.kt
33 lines (28 loc) · 1000 Bytes
/
Request9RxProgress.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package tasks
import contributors.*
import io.reactivex.Observable
import io.reactivex.Scheduler
import io.reactivex.schedulers.Schedulers
fun loadContributorsReactiveProgress(
service: GitHubService,
req: RequestData,
scheduler: Scheduler = Schedulers.io()
): Observable<List<User>> {
val repos: Observable<Repo> = service
.getOrgReposRx(req.org)
.subscribeOn(scheduler)
.doOnNext { response -> logRepos(req, response) }
.flatMapIterable { response -> response.bodyList() }
val repoUsers: Observable<List<User>> = repos
.flatMap { repo ->
service.getRepoContributorsRx(req.org, repo.name)
.subscribeOn(scheduler)
.doOnNext { response -> logUsers(repo, response) }
.map { response -> response.bodyList() }
}
var allUsers = emptyList<User>()
return repoUsers.map { users: List<User> ->
allUsers = (allUsers + users).aggregate()
allUsers
}
}