Skip to content

Added reactive solutions for 'testing' & 'cancellation' #9

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 40 additions & 12 deletions src/contributors/Contributors.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package contributors

import contributors.Contributors.LoadingStatus.*
import contributors.Variant.*
import io.reactivex.disposables.Disposable
import kotlinx.coroutines.*
import kotlinx.coroutines.swing.Swing
import tasks.*
import java.awt.event.ActionListener
import javax.swing.SwingUtilities
Expand Down Expand Up @@ -111,18 +111,30 @@ interface Contributors: CoroutineScope {
}.setUpCancellation()
}
RX -> { // Using RxJava
loadContributorsReactive(service, req) { users, completed ->
SwingUtilities.invokeLater {
updateResults(users, startTime, completed)
}
}
loadContributorsReactive(service, req)
.subscribe { users ->
SwingUtilities.invokeLater {
updateResults(users, startTime)
}
}.setUpCancellation()
}
RX_PROGRESS -> { // Using RxJava and showing progress
loadContributorsReactiveProgress(service, req) { users, completed ->
SwingUtilities.invokeLater {
updateResults(users, startTime, completed)
}
}
RX_PROGRESS -> { // Using RxJava and showing progress { users, completed ->
loadContributorsReactiveProgress(service, req)
.subscribe({
SwingUtilities.invokeLater {
updateResults(it, startTime, false)
}
}, {
SwingUtilities.invokeLater {
setLoadingStatus("error ${it.message}", false)
setActionsStatus(newLoadingEnabled = true)
}
}, {
SwingUtilities.invokeLater {
updateLoadingStatus(COMPLETED, startTime)
setActionsStatus(newLoadingEnabled = true)
}
}).setUpCancellation()
}
}
}
Expand Down Expand Up @@ -186,6 +198,22 @@ interface Contributors: CoroutineScope {
}
}

private fun Disposable.setUpCancellation() {
// make active the 'cancel' button
setActionsStatus(newLoadingEnabled = false, cancellationEnabled = true)

val loadingDisposable = this

// cancel the loading job if the 'cancel' button was clicked
var listener: ActionListener
listener = ActionListener {
loadingDisposable.dispose()
updateLoadingStatus(CANCELED)
setActionsStatus(newLoadingEnabled = true)
}
addCancelListener(listener)
}

fun loadInitialParams() {
setParams(loadStoredParams())
}
Expand Down
19 changes: 7 additions & 12 deletions src/tasks/Request8Rx.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,30 @@ package tasks

import contributors.*
import io.reactivex.Observable
import io.reactivex.Scheduler
import io.reactivex.Single
import io.reactivex.rxkotlin.toObservable
import io.reactivex.rxkotlin.zipWith
import io.reactivex.schedulers.Schedulers
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch

fun loadContributorsReactive(
service: GitHubService,
req: RequestData,
callback: (List<User>, completed: Boolean) -> Unit
) {
scheduler: Scheduler = Schedulers.io()
): Single<List<User>> {
val repos: Observable<Repo> = service
.getOrgReposRx(req.org)
.subscribeOn(Schedulers.io())
.subscribeOn(scheduler)
.doOnNext { response -> logRepos(req, response) }
.flatMapIterable { it.bodyList() }

val allUsers: Single<List<User>> = repos
.flatMap { repo ->
service.getRepoContributorsRx(req.org, repo.name)
.subscribeOn(Schedulers.io())
.subscribeOn(scheduler)
.doOnNext { response -> logUsers(repo, response) }
}
.flatMapIterable { it.bodyList() }
.toList()

allUsers
.doOnSuccess { callback(it.aggregate(), true) }
.subscribe()
return allUsers
.map { it.aggregate() }
}
26 changes: 12 additions & 14 deletions src/tasks/Request9RxProgress.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,32 @@ package tasks

import contributors.*
import io.reactivex.Observable
import io.reactivex.rxkotlin.toObservable
import io.reactivex.rxkotlin.zipWith
import io.reactivex.Scheduler
import io.reactivex.schedulers.Schedulers

fun loadContributorsReactiveProgress(
service: GitHubService,
req: RequestData,
callback: (List<User>, completed: Boolean) -> Unit
) {
scheduler: Scheduler = Schedulers.io()
): Observable<List<User>> {
val repos: Observable<Repo> = service
.getOrgReposRx(req.org)
.subscribeOn(Schedulers.io())
.subscribeOn(scheduler)
.doOnNext { response -> logRepos(req, response) }
.flatMapIterable { response -> response.bodyList() }

val repoUsers: Observable<List<User>> = repos
.flatMap { repo ->
service.getRepoContributorsRx(req.org, repo.name)
.subscribeOn(Schedulers.io())
.subscribeOn(scheduler)
.doOnNext { response -> logUsers(repo, response) }
.map { response -> response.bodyList() }
}
repoUsers
.reduce(listOf<User>()) { allUsers, users ->
(allUsers + users).aggregate().also {
callback(it, false)
}
}
.doOnSuccess { callback(it, true) }
.subscribe()

var allUsers = emptyList<User>()

return repoUsers.map { users: List<User> ->
allUsers = (allUsers + users).aggregate()
allUsers
}
}
11 changes: 7 additions & 4 deletions test/contributors/MockGithubService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import kotlinx.coroutines.delay
import retrofit2.Call
import retrofit2.Response
import retrofit2.mock.Calls
import java.util.concurrent.TimeUnit

object MockGithubService : GitHubService {
override fun getOrgReposCall(org: String): Call<List<Repo>> {
Expand All @@ -26,11 +27,13 @@ object MockGithubService : GitHubService {
return Response.success(testRepo.users)
}

override fun getOrgReposRx(org: String): Observable<Response<List<Repo>>> {
TODO()
}
override fun getOrgReposRx(org: String): Observable<Response<List<Repo>>> = Observable
.just(Response.success(repos))
.delay(reposDelay, TimeUnit.MILLISECONDS, testScheduler)

override fun getRepoContributorsRx(owner: String, repo: String): Observable<Response<List<User>>> {
TODO()
val testRepo = reposMap.getValue(repo)
return Observable.just(Response.success(testRepo.users))
.delay(testRepo.delay, TimeUnit.MILLISECONDS, testScheduler)
}
}
6 changes: 5 additions & 1 deletion test/contributors/testData.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package contributors

import io.reactivex.schedulers.TestScheduler

val testRequestData = RequestData("username", "password", "org")

data class TestRepo(val name: String, val delay: Long, val users: List<User>)
Expand Down Expand Up @@ -71,4 +73,6 @@ val concurrentProgressResults = listOf(
User(login = "user-1", contributions = 10))
),
expectedConcurrentResults
)
)

val testScheduler = TestScheduler()
36 changes: 36 additions & 0 deletions test/tasks/Request8RxKtTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package tasks

import contributors.MockGithubService
import contributors.expectedConcurrentResults
import contributors.testRequestData
import contributors.testScheduler
import org.junit.Assert
import org.junit.Before
import org.junit.Test
import java.util.concurrent.TimeUnit

internal class Request8RxKtTest {

@Before
fun setUp() {
testScheduler.advanceTimeTo(0, TimeUnit.MILLISECONDS)
}

@Test
fun loadContributorsReactive() {
val testObserver = loadContributorsReactive(MockGithubService, testRequestData, testScheduler).test()
testObserver.assertNoValues()

val startTime = testScheduler.now(TimeUnit.MILLISECONDS)
testScheduler.advanceTimeBy(expectedConcurrentResults.timeFromStart, TimeUnit.MILLISECONDS)
testObserver.assertValue(expectedConcurrentResults.users)

val totalTime = testScheduler.now(TimeUnit.MILLISECONDS) - startTime
Assert.assertEquals(
"The calls run concurrently, so the total virtual time should be 2200 ms: " +
"1000 ms for repos request plus max(1000, 1200, 800) = 1200 ms for concurrent contributors requests)",
expectedConcurrentResults.timeFromStart, totalTime
)
}

}
34 changes: 34 additions & 0 deletions test/tasks/Request9RxProgressKtTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package tasks

import contributors.*
import org.junit.Assert
import org.junit.Before
import org.junit.Test
import java.util.concurrent.TimeUnit

internal class Request9RxProgressKtTest {

@Before
fun setUp() {
testScheduler.advanceTimeTo(0, TimeUnit.MILLISECONDS)
}

@Test
fun loadContributorsReactiveProgress() {
val testObserver = loadContributorsReactiveProgress(MockGithubService, testRequestData, testScheduler).test()
testObserver.assertNoValues()

val startTime = testScheduler.now(TimeUnit.MILLISECONDS)

concurrentProgressResults.forEachIndexed { index: Int, expected: TestResults ->
println("index: $index, expected: $expected")
testScheduler.advanceTimeTo(expected.timeFromStart, TimeUnit.MILLISECONDS)
testObserver.assertValueAt(index, expected.users)

val time = testScheduler.now(TimeUnit.MILLISECONDS) - startTime
Assert.assertEquals("Expected intermediate result after virtual ${expected.timeFromStart} ms:",
expected.timeFromStart, time)
}
testObserver.assertComplete()
}
}