File tree 5 files changed +108
-1
lines changed
5 files changed +108
-1
lines changed Original file line number Diff line number Diff line change @@ -17,7 +17,9 @@ enum class Variant {
17
17
CONCURRENT , // Request5Concurrent
18
18
NOT_CANCELLABLE , // Request6NotCancellable
19
19
PROGRESS , // Request6Progress
20
- CHANNELS // Request7Channels
20
+ CHANNELS , // Request7Channels
21
+ RX , // Request8Rx
22
+ RX_PROGRESS // Request9RxProgress
21
23
}
22
24
23
25
interface Contributors : CoroutineScope {
@@ -108,6 +110,20 @@ interface Contributors: CoroutineScope {
108
110
}
109
111
}.setUpCancellation()
110
112
}
113
+ RX -> { // Using RxJava
114
+ loadContributorsReactive(service, req) { users, completed ->
115
+ SwingUtilities .invokeLater {
116
+ updateResults(users, startTime, completed)
117
+ }
118
+ }
119
+ }
120
+ RX_PROGRESS -> { // Using RxJava and showing progress
121
+ loadContributorsReactiveProgress(service, req) { users, completed ->
122
+ SwingUtilities .invokeLater {
123
+ updateResults(users, startTime, completed)
124
+ }
125
+ }
126
+ }
111
127
}
112
128
}
113
129
Original file line number Diff line number Diff line change @@ -38,6 +38,17 @@ interface GitHubService {
38
38
@Path(" owner" ) owner : String ,
39
39
@Path(" repo" ) repo : String
40
40
): Response <List <User >>
41
+
42
+ @GET(" orgs/{org}/repos?per_page=100" )
43
+ fun getOrgReposRx (
44
+ @Path(" org" ) org : String
45
+ ): Observable <Response <List <Repo >>>
46
+
47
+ @GET(" repos/{owner}/{repo}/contributors?per_page=100" )
48
+ fun getRepoContributorsRx (
49
+ @Path(" owner" ) owner : String ,
50
+ @Path(" repo" ) repo : String
51
+ ): Observable <Response <List <User >>>
41
52
}
42
53
43
54
@JsonIgnoreProperties(ignoreUnknown = true )
Original file line number Diff line number Diff line change
1
+ package tasks
2
+
3
+ import contributors.*
4
+ import io.reactivex.Observable
5
+ import io.reactivex.Single
6
+ import io.reactivex.rxkotlin.toObservable
7
+ import io.reactivex.rxkotlin.zipWith
8
+ import io.reactivex.schedulers.Schedulers
9
+ import kotlinx.coroutines.channels.Channel
10
+ import kotlinx.coroutines.coroutineScope
11
+ import kotlinx.coroutines.launch
12
+
13
+ fun loadContributorsReactive (
14
+ service : GitHubService ,
15
+ req : RequestData ,
16
+ callback : (List <User >, completed: Boolean ) -> Unit
17
+ ) {
18
+ val repos: Observable <Repo > = service
19
+ .getOrgReposRx(req.org)
20
+ .subscribeOn(Schedulers .io())
21
+ .doOnNext { response -> logRepos(req, response) }
22
+ .flatMapIterable { it.bodyList() }
23
+
24
+ val allUsers: Single <List <User >> = repos
25
+ .flatMap { repo ->
26
+ service.getRepoContributorsRx(req.org, repo.name)
27
+ .subscribeOn(Schedulers .io())
28
+ .doOnNext { response -> logUsers(repo, response) }
29
+ }
30
+ .flatMapIterable { it.bodyList() }
31
+ .toList()
32
+
33
+ allUsers
34
+ .doOnSuccess { callback(it.aggregate(), true ) }
35
+ .subscribe()
36
+ }
Original file line number Diff line number Diff line change
1
+ package tasks
2
+
3
+ import contributors.*
4
+ import io.reactivex.Observable
5
+ import io.reactivex.rxkotlin.toObservable
6
+ import io.reactivex.rxkotlin.zipWith
7
+ import io.reactivex.schedulers.Schedulers
8
+
9
+ fun loadContributorsReactiveProgress (
10
+ service : GitHubService ,
11
+ req : RequestData ,
12
+ callback : (List <User >, completed: Boolean ) -> Unit
13
+ ) {
14
+ val repos: Observable <Repo > = service
15
+ .getOrgReposRx(req.org)
16
+ .subscribeOn(Schedulers .io())
17
+ .doOnNext { response -> logRepos(req, response) }
18
+ .flatMapIterable { response -> response.bodyList() }
19
+
20
+ val repoUsers: Observable <List <User >> = repos
21
+ .flatMap { repo ->
22
+ service.getRepoContributorsRx(req.org, repo.name)
23
+ .subscribeOn(Schedulers .io())
24
+ .doOnNext { response -> logUsers(repo, response) }
25
+ .map { response -> response.bodyList() }
26
+ }
27
+ repoUsers
28
+ .reduce(listOf<User >()) { allUsers, users ->
29
+ (allUsers + users).aggregate().also {
30
+ callback(it, false )
31
+ }
32
+ }
33
+ .doOnSuccess { callback(it, true ) }
34
+ .subscribe()
35
+ }
Original file line number Diff line number Diff line change 1
1
package contributors
2
2
3
+ import io.reactivex.Observable
3
4
import kotlinx.coroutines.delay
4
5
import retrofit2.Call
5
6
import retrofit2.Response
@@ -24,4 +25,12 @@ object MockGithubService : GitHubService {
24
25
delay(testRepo.delay)
25
26
return Response .success(testRepo.users)
26
27
}
28
+
29
+ override fun getOrgReposRx (org : String ): Observable <Response <List <Repo >>> {
30
+ TODO ()
31
+ }
32
+
33
+ override fun getRepoContributorsRx (owner : String , repo : String ): Observable <Response <List <User >>> {
34
+ TODO ()
35
+ }
27
36
}
You can’t perform that action at this time.
0 commit comments