Skip to content

Commit 14a9a26

Browse files
authored
Merge pull request #9 from softartdev/rx
Added reactive solutions for 'testing' & 'cancellation'
2 parents d4de33c + 5d5d8d8 commit 14a9a26

File tree

7 files changed

+141
-43
lines changed

7 files changed

+141
-43
lines changed

src/contributors/Contributors.kt

+40-12
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ package contributors
22

33
import contributors.Contributors.LoadingStatus.*
44
import contributors.Variant.*
5+
import io.reactivex.disposables.Disposable
56
import kotlinx.coroutines.*
6-
import kotlinx.coroutines.swing.Swing
77
import tasks.*
88
import java.awt.event.ActionListener
99
import javax.swing.SwingUtilities
@@ -111,18 +111,30 @@ interface Contributors: CoroutineScope {
111111
}.setUpCancellation()
112112
}
113113
RX -> { // Using RxJava
114-
loadContributorsReactive(service, req) { users, completed ->
115-
SwingUtilities.invokeLater {
116-
updateResults(users, startTime, completed)
117-
}
118-
}
114+
loadContributorsReactive(service, req)
115+
.subscribe { users ->
116+
SwingUtilities.invokeLater {
117+
updateResults(users, startTime)
118+
}
119+
}.setUpCancellation()
119120
}
120-
RX_PROGRESS -> { // Using RxJava and showing progress
121-
loadContributorsReactiveProgress(service, req) { users, completed ->
122-
SwingUtilities.invokeLater {
123-
updateResults(users, startTime, completed)
124-
}
125-
}
121+
RX_PROGRESS -> { // Using RxJava and showing progress { users, completed ->
122+
loadContributorsReactiveProgress(service, req)
123+
.subscribe({
124+
SwingUtilities.invokeLater {
125+
updateResults(it, startTime, false)
126+
}
127+
}, {
128+
SwingUtilities.invokeLater {
129+
setLoadingStatus("error ${it.message}", false)
130+
setActionsStatus(newLoadingEnabled = true)
131+
}
132+
}, {
133+
SwingUtilities.invokeLater {
134+
updateLoadingStatus(COMPLETED, startTime)
135+
setActionsStatus(newLoadingEnabled = true)
136+
}
137+
}).setUpCancellation()
126138
}
127139
}
128140
}
@@ -186,6 +198,22 @@ interface Contributors: CoroutineScope {
186198
}
187199
}
188200

201+
private fun Disposable.setUpCancellation() {
202+
// make active the 'cancel' button
203+
setActionsStatus(newLoadingEnabled = false, cancellationEnabled = true)
204+
205+
val loadingDisposable = this
206+
207+
// cancel the loading job if the 'cancel' button was clicked
208+
var listener: ActionListener
209+
listener = ActionListener {
210+
loadingDisposable.dispose()
211+
updateLoadingStatus(CANCELED)
212+
setActionsStatus(newLoadingEnabled = true)
213+
}
214+
addCancelListener(listener)
215+
}
216+
189217
fun loadInitialParams() {
190218
setParams(loadStoredParams())
191219
}

src/tasks/Request8Rx.kt

+7-12
Original file line numberDiff line numberDiff line change
@@ -2,35 +2,30 @@ package tasks
22

33
import contributors.*
44
import io.reactivex.Observable
5+
import io.reactivex.Scheduler
56
import io.reactivex.Single
6-
import io.reactivex.rxkotlin.toObservable
7-
import io.reactivex.rxkotlin.zipWith
87
import io.reactivex.schedulers.Schedulers
9-
import kotlinx.coroutines.channels.Channel
10-
import kotlinx.coroutines.coroutineScope
11-
import kotlinx.coroutines.launch
128

139
fun loadContributorsReactive(
1410
service: GitHubService,
1511
req: RequestData,
16-
callback: (List<User>, completed: Boolean) -> Unit
17-
) {
12+
scheduler: Scheduler = Schedulers.io()
13+
): Single<List<User>> {
1814
val repos: Observable<Repo> = service
1915
.getOrgReposRx(req.org)
20-
.subscribeOn(Schedulers.io())
16+
.subscribeOn(scheduler)
2117
.doOnNext { response -> logRepos(req, response) }
2218
.flatMapIterable { it.bodyList() }
2319

2420
val allUsers: Single<List<User>> = repos
2521
.flatMap { repo ->
2622
service.getRepoContributorsRx(req.org, repo.name)
27-
.subscribeOn(Schedulers.io())
23+
.subscribeOn(scheduler)
2824
.doOnNext { response -> logUsers(repo, response) }
2925
}
3026
.flatMapIterable { it.bodyList() }
3127
.toList()
3228

33-
allUsers
34-
.doOnSuccess { callback(it.aggregate(), true) }
35-
.subscribe()
29+
return allUsers
30+
.map { it.aggregate() }
3631
}

src/tasks/Request9RxProgress.kt

+12-14
Original file line numberDiff line numberDiff line change
@@ -2,34 +2,32 @@ package tasks
22

33
import contributors.*
44
import io.reactivex.Observable
5-
import io.reactivex.rxkotlin.toObservable
6-
import io.reactivex.rxkotlin.zipWith
5+
import io.reactivex.Scheduler
76
import io.reactivex.schedulers.Schedulers
87

98
fun loadContributorsReactiveProgress(
109
service: GitHubService,
1110
req: RequestData,
12-
callback: (List<User>, completed: Boolean) -> Unit
13-
) {
11+
scheduler: Scheduler = Schedulers.io()
12+
): Observable<List<User>> {
1413
val repos: Observable<Repo> = service
1514
.getOrgReposRx(req.org)
16-
.subscribeOn(Schedulers.io())
15+
.subscribeOn(scheduler)
1716
.doOnNext { response -> logRepos(req, response) }
1817
.flatMapIterable { response -> response.bodyList() }
1918

2019
val repoUsers: Observable<List<User>> = repos
2120
.flatMap { repo ->
2221
service.getRepoContributorsRx(req.org, repo.name)
23-
.subscribeOn(Schedulers.io())
22+
.subscribeOn(scheduler)
2423
.doOnNext { response -> logUsers(repo, response) }
2524
.map { response -> response.bodyList() }
2625
}
27-
repoUsers
28-
.reduce(listOf<User>()) { allUsers, users ->
29-
(allUsers + users).aggregate().also {
30-
callback(it, false)
31-
}
32-
}
33-
.doOnSuccess { callback(it, true) }
34-
.subscribe()
26+
27+
var allUsers = emptyList<User>()
28+
29+
return repoUsers.map { users: List<User> ->
30+
allUsers = (allUsers + users).aggregate()
31+
allUsers
32+
}
3533
}

test/contributors/MockGithubService.kt

+7-4
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import kotlinx.coroutines.delay
55
import retrofit2.Call
66
import retrofit2.Response
77
import retrofit2.mock.Calls
8+
import java.util.concurrent.TimeUnit
89

910
object MockGithubService : GitHubService {
1011
override fun getOrgReposCall(org: String): Call<List<Repo>> {
@@ -26,11 +27,13 @@ object MockGithubService : GitHubService {
2627
return Response.success(testRepo.users)
2728
}
2829

29-
override fun getOrgReposRx(org: String): Observable<Response<List<Repo>>> {
30-
TODO()
31-
}
30+
override fun getOrgReposRx(org: String): Observable<Response<List<Repo>>> = Observable
31+
.just(Response.success(repos))
32+
.delay(reposDelay, TimeUnit.MILLISECONDS, testScheduler)
3233

3334
override fun getRepoContributorsRx(owner: String, repo: String): Observable<Response<List<User>>> {
34-
TODO()
35+
val testRepo = reposMap.getValue(repo)
36+
return Observable.just(Response.success(testRepo.users))
37+
.delay(testRepo.delay, TimeUnit.MILLISECONDS, testScheduler)
3538
}
3639
}

test/contributors/testData.kt

+5-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package contributors
22

3+
import io.reactivex.schedulers.TestScheduler
4+
35
val testRequestData = RequestData("username", "password", "org")
46

57
data class TestRepo(val name: String, val delay: Long, val users: List<User>)
@@ -71,4 +73,6 @@ val concurrentProgressResults = listOf(
7173
User(login = "user-1", contributions = 10))
7274
),
7375
expectedConcurrentResults
74-
)
76+
)
77+
78+
val testScheduler = TestScheduler()

test/tasks/Request8RxKtTest.kt

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package tasks
2+
3+
import contributors.MockGithubService
4+
import contributors.expectedConcurrentResults
5+
import contributors.testRequestData
6+
import contributors.testScheduler
7+
import org.junit.Assert
8+
import org.junit.Before
9+
import org.junit.Test
10+
import java.util.concurrent.TimeUnit
11+
12+
internal class Request8RxKtTest {
13+
14+
@Before
15+
fun setUp() {
16+
testScheduler.advanceTimeTo(0, TimeUnit.MILLISECONDS)
17+
}
18+
19+
@Test
20+
fun loadContributorsReactive() {
21+
val testObserver = loadContributorsReactive(MockGithubService, testRequestData, testScheduler).test()
22+
testObserver.assertNoValues()
23+
24+
val startTime = testScheduler.now(TimeUnit.MILLISECONDS)
25+
testScheduler.advanceTimeBy(expectedConcurrentResults.timeFromStart, TimeUnit.MILLISECONDS)
26+
testObserver.assertValue(expectedConcurrentResults.users)
27+
28+
val totalTime = testScheduler.now(TimeUnit.MILLISECONDS) - startTime
29+
Assert.assertEquals(
30+
"The calls run concurrently, so the total virtual time should be 2200 ms: " +
31+
"1000 ms for repos request plus max(1000, 1200, 800) = 1200 ms for concurrent contributors requests)",
32+
expectedConcurrentResults.timeFromStart, totalTime
33+
)
34+
}
35+
36+
}
+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package tasks
2+
3+
import contributors.*
4+
import org.junit.Assert
5+
import org.junit.Before
6+
import org.junit.Test
7+
import java.util.concurrent.TimeUnit
8+
9+
internal class Request9RxProgressKtTest {
10+
11+
@Before
12+
fun setUp() {
13+
testScheduler.advanceTimeTo(0, TimeUnit.MILLISECONDS)
14+
}
15+
16+
@Test
17+
fun loadContributorsReactiveProgress() {
18+
val testObserver = loadContributorsReactiveProgress(MockGithubService, testRequestData, testScheduler).test()
19+
testObserver.assertNoValues()
20+
21+
val startTime = testScheduler.now(TimeUnit.MILLISECONDS)
22+
23+
concurrentProgressResults.forEachIndexed { index: Int, expected: TestResults ->
24+
println("index: $index, expected: $expected")
25+
testScheduler.advanceTimeTo(expected.timeFromStart, TimeUnit.MILLISECONDS)
26+
testObserver.assertValueAt(index, expected.users)
27+
28+
val time = testScheduler.now(TimeUnit.MILLISECONDS) - startTime
29+
Assert.assertEquals("Expected intermediate result after virtual ${expected.timeFromStart} ms:",
30+
expected.timeFromStart, time)
31+
}
32+
testObserver.assertComplete()
33+
}
34+
}

0 commit comments

Comments
 (0)