diff --git a/src/contributors/Contributors.kt b/src/contributors/Contributors.kt index 682af72..0d11743 100644 --- a/src/contributors/Contributors.kt +++ b/src/contributors/Contributors.kt @@ -2,8 +2,8 @@ package contributors import contributors.Contributors.LoadingStatus.* import contributors.Variant.* +import io.reactivex.disposables.Disposable import kotlinx.coroutines.* -import kotlinx.coroutines.swing.Swing import tasks.* import java.awt.event.ActionListener import javax.swing.SwingUtilities @@ -111,18 +111,30 @@ interface Contributors: CoroutineScope { }.setUpCancellation() } RX -> { // Using RxJava - loadContributorsReactive(service, req) { users, completed -> - SwingUtilities.invokeLater { - updateResults(users, startTime, completed) - } - } + loadContributorsReactive(service, req) + .subscribe { users -> + SwingUtilities.invokeLater { + updateResults(users, startTime) + } + }.setUpCancellation() } - RX_PROGRESS -> { // Using RxJava and showing progress - loadContributorsReactiveProgress(service, req) { users, completed -> - SwingUtilities.invokeLater { - updateResults(users, startTime, completed) - } - } + RX_PROGRESS -> { // Using RxJava and showing progress { users, completed -> + loadContributorsReactiveProgress(service, req) + .subscribe({ + SwingUtilities.invokeLater { + updateResults(it, startTime, false) + } + }, { + SwingUtilities.invokeLater { + setLoadingStatus("error ${it.message}", false) + setActionsStatus(newLoadingEnabled = true) + } + }, { + SwingUtilities.invokeLater { + updateLoadingStatus(COMPLETED, startTime) + setActionsStatus(newLoadingEnabled = true) + } + }).setUpCancellation() } } } @@ -186,6 +198,22 @@ interface Contributors: CoroutineScope { } } + private fun Disposable.setUpCancellation() { + // make active the 'cancel' button + setActionsStatus(newLoadingEnabled = false, cancellationEnabled = true) + + val loadingDisposable = this + + // cancel the loading job if the 'cancel' button was clicked + var listener: ActionListener + listener = ActionListener { + loadingDisposable.dispose() + updateLoadingStatus(CANCELED) + setActionsStatus(newLoadingEnabled = true) + } + addCancelListener(listener) + } + fun loadInitialParams() { setParams(loadStoredParams()) } diff --git a/src/tasks/Request8Rx.kt b/src/tasks/Request8Rx.kt index 6e4b57b..946a97c 100644 --- a/src/tasks/Request8Rx.kt +++ b/src/tasks/Request8Rx.kt @@ -2,35 +2,30 @@ package tasks import contributors.* import io.reactivex.Observable +import io.reactivex.Scheduler import io.reactivex.Single -import io.reactivex.rxkotlin.toObservable -import io.reactivex.rxkotlin.zipWith import io.reactivex.schedulers.Schedulers -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.launch fun loadContributorsReactive( service: GitHubService, req: RequestData, - callback: (List, completed: Boolean) -> Unit -) { + scheduler: Scheduler = Schedulers.io() +): Single> { val repos: Observable = service .getOrgReposRx(req.org) - .subscribeOn(Schedulers.io()) + .subscribeOn(scheduler) .doOnNext { response -> logRepos(req, response) } .flatMapIterable { it.bodyList() } val allUsers: Single> = repos .flatMap { repo -> service.getRepoContributorsRx(req.org, repo.name) - .subscribeOn(Schedulers.io()) + .subscribeOn(scheduler) .doOnNext { response -> logUsers(repo, response) } } .flatMapIterable { it.bodyList() } .toList() - allUsers - .doOnSuccess { callback(it.aggregate(), true) } - .subscribe() + return allUsers + .map { it.aggregate() } } \ No newline at end of file diff --git a/src/tasks/Request9RxProgress.kt b/src/tasks/Request9RxProgress.kt index e916e54..2ae8fcb 100644 --- a/src/tasks/Request9RxProgress.kt +++ b/src/tasks/Request9RxProgress.kt @@ -2,34 +2,32 @@ package tasks import contributors.* import io.reactivex.Observable -import io.reactivex.rxkotlin.toObservable -import io.reactivex.rxkotlin.zipWith +import io.reactivex.Scheduler import io.reactivex.schedulers.Schedulers fun loadContributorsReactiveProgress( service: GitHubService, req: RequestData, - callback: (List, completed: Boolean) -> Unit -) { + scheduler: Scheduler = Schedulers.io() +): Observable> { val repos: Observable = service .getOrgReposRx(req.org) - .subscribeOn(Schedulers.io()) + .subscribeOn(scheduler) .doOnNext { response -> logRepos(req, response) } .flatMapIterable { response -> response.bodyList() } val repoUsers: Observable> = repos .flatMap { repo -> service.getRepoContributorsRx(req.org, repo.name) - .subscribeOn(Schedulers.io()) + .subscribeOn(scheduler) .doOnNext { response -> logUsers(repo, response) } .map { response -> response.bodyList() } } - repoUsers - .reduce(listOf()) { allUsers, users -> - (allUsers + users).aggregate().also { - callback(it, false) - } - } - .doOnSuccess { callback(it, true) } - .subscribe() + + var allUsers = emptyList() + + return repoUsers.map { users: List -> + allUsers = (allUsers + users).aggregate() + allUsers + } } \ No newline at end of file diff --git a/test/contributors/MockGithubService.kt b/test/contributors/MockGithubService.kt index a3aee83..2315bf3 100644 --- a/test/contributors/MockGithubService.kt +++ b/test/contributors/MockGithubService.kt @@ -5,6 +5,7 @@ import kotlinx.coroutines.delay import retrofit2.Call import retrofit2.Response import retrofit2.mock.Calls +import java.util.concurrent.TimeUnit object MockGithubService : GitHubService { override fun getOrgReposCall(org: String): Call> { @@ -26,11 +27,13 @@ object MockGithubService : GitHubService { return Response.success(testRepo.users) } - override fun getOrgReposRx(org: String): Observable>> { - TODO() - } + override fun getOrgReposRx(org: String): Observable>> = Observable + .just(Response.success(repos)) + .delay(reposDelay, TimeUnit.MILLISECONDS, testScheduler) override fun getRepoContributorsRx(owner: String, repo: String): Observable>> { - TODO() + val testRepo = reposMap.getValue(repo) + return Observable.just(Response.success(testRepo.users)) + .delay(testRepo.delay, TimeUnit.MILLISECONDS, testScheduler) } } \ No newline at end of file diff --git a/test/contributors/testData.kt b/test/contributors/testData.kt index 0a59987..e5ab518 100644 --- a/test/contributors/testData.kt +++ b/test/contributors/testData.kt @@ -1,5 +1,7 @@ package contributors +import io.reactivex.schedulers.TestScheduler + val testRequestData = RequestData("username", "password", "org") data class TestRepo(val name: String, val delay: Long, val users: List) @@ -71,4 +73,6 @@ val concurrentProgressResults = listOf( User(login = "user-1", contributions = 10)) ), expectedConcurrentResults -) \ No newline at end of file +) + +val testScheduler = TestScheduler() \ No newline at end of file diff --git a/test/tasks/Request8RxKtTest.kt b/test/tasks/Request8RxKtTest.kt new file mode 100644 index 0000000..5b461b3 --- /dev/null +++ b/test/tasks/Request8RxKtTest.kt @@ -0,0 +1,36 @@ +package tasks + +import contributors.MockGithubService +import contributors.expectedConcurrentResults +import contributors.testRequestData +import contributors.testScheduler +import org.junit.Assert +import org.junit.Before +import org.junit.Test +import java.util.concurrent.TimeUnit + +internal class Request8RxKtTest { + + @Before + fun setUp() { + testScheduler.advanceTimeTo(0, TimeUnit.MILLISECONDS) + } + + @Test + fun loadContributorsReactive() { + val testObserver = loadContributorsReactive(MockGithubService, testRequestData, testScheduler).test() + testObserver.assertNoValues() + + val startTime = testScheduler.now(TimeUnit.MILLISECONDS) + testScheduler.advanceTimeBy(expectedConcurrentResults.timeFromStart, TimeUnit.MILLISECONDS) + testObserver.assertValue(expectedConcurrentResults.users) + + val totalTime = testScheduler.now(TimeUnit.MILLISECONDS) - startTime + Assert.assertEquals( + "The calls run concurrently, so the total virtual time should be 2200 ms: " + + "1000 ms for repos request plus max(1000, 1200, 800) = 1200 ms for concurrent contributors requests)", + expectedConcurrentResults.timeFromStart, totalTime + ) + } + +} \ No newline at end of file diff --git a/test/tasks/Request9RxProgressKtTest.kt b/test/tasks/Request9RxProgressKtTest.kt new file mode 100644 index 0000000..07cb6d4 --- /dev/null +++ b/test/tasks/Request9RxProgressKtTest.kt @@ -0,0 +1,34 @@ +package tasks + +import contributors.* +import org.junit.Assert +import org.junit.Before +import org.junit.Test +import java.util.concurrent.TimeUnit + +internal class Request9RxProgressKtTest { + + @Before + fun setUp() { + testScheduler.advanceTimeTo(0, TimeUnit.MILLISECONDS) + } + + @Test + fun loadContributorsReactiveProgress() { + val testObserver = loadContributorsReactiveProgress(MockGithubService, testRequestData, testScheduler).test() + testObserver.assertNoValues() + + val startTime = testScheduler.now(TimeUnit.MILLISECONDS) + + concurrentProgressResults.forEachIndexed { index: Int, expected: TestResults -> + println("index: $index, expected: $expected") + testScheduler.advanceTimeTo(expected.timeFromStart, TimeUnit.MILLISECONDS) + testObserver.assertValueAt(index, expected.users) + + val time = testScheduler.now(TimeUnit.MILLISECONDS) - startTime + Assert.assertEquals("Expected intermediate result after virtual ${expected.timeFromStart} ms:", + expected.timeFromStart, time) + } + testObserver.assertComplete() + } +} \ No newline at end of file