Skip to content

Commit 77d1aea

Browse files
committed
Improve code style and test coverage
1 parent 8a059f5 commit 77d1aea

File tree

2 files changed

+57
-10
lines changed

2 files changed

+57
-10
lines changed

reactive/kotlinx-coroutines-reactive/src/Publish.kt

+12-8
Original file line numberDiff line numberDiff line change
@@ -160,10 +160,10 @@ private class PublisherCoroutine<in T>(
160160

161161
private fun unlockAndCheckCompleted() {
162162
/*
163-
There is no sense to check completion before doing `unlock`, because completion might
164-
happen after this check and before `unlock` (see `signalCompleted` that does not do anything
165-
if it fails to acquire the lock that we are still holding).
166-
We have to recheck `isCompleted` after `unlock` anyway.
163+
* There is no sense to check completion before doing `unlock`, because completion might
164+
* happen after this check and before `unlock` (see `signalCompleted` that does not do anything
165+
* if it fails to acquire the lock that we are still holding).
166+
* We have to recheck `isCompleted` after `unlock` anyway.
167167
*/
168168
mutex.unlock()
169169
// check isCompleted and and try to regain lock to signal completion
@@ -180,13 +180,17 @@ private class PublisherCoroutine<in T>(
180180
if (cancelled) {
181181
// If the parent had failed to handle our exception (handleJobException was invoked), then
182182
// we must not loose this exception
183-
if (shouldHandleException && cause != null) handleExceptionViaHandler(parentContext, cause)
183+
if (shouldHandleException && cause != null) {
184+
handleExceptionViaHandler(parentContext, cause)
185+
}
184186
} else {
185187
try {
186-
if (cause != null && cause !is CancellationException)
188+
if (cause != null && cause !is CancellationException) {
187189
subscriber.onError(cause)
188-
else
190+
}
191+
else {
189192
subscriber.onComplete()
193+
}
190194
} catch (e: Throwable) {
191195
handleExceptionViaHandler(parentContext, e)
192196
}
@@ -261,4 +265,4 @@ private class PublisherCoroutine<in T>(
261265
cancelled = true
262266
super.cancel()
263267
}
264-
}
268+
}

reactive/kotlinx-coroutines-reactive/test/PublishTest.kt

+45-2
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,51 @@ class PublishTest : TestBase() {
131131
finish(6)
132132
}
133133

134+
@Test
135+
fun testParentHandlesFailure() = runTest {
136+
expect(1)
137+
val deferred = CompletableDeferred<Unit>()
138+
val publisher = publish<Unit>(deferred + Dispatchers.Unconfined) {
139+
try {
140+
expect(3)
141+
delay(10000)
142+
} finally {
143+
expect(5)
144+
throw TestException("FAILED")
145+
}
146+
}
147+
var sub: Subscription? = null
148+
publisher.subscribe(object : Subscriber<Unit> {
149+
override fun onComplete() {
150+
expectUnreached()
151+
}
152+
153+
override fun onSubscribe(s: Subscription) {
154+
expect(2)
155+
sub = s
156+
}
157+
158+
override fun onNext(t: Unit?) {
159+
expectUnreached()
160+
}
161+
162+
override fun onError(t: Throwable?) {
163+
expectUnreached()
164+
}
165+
})
166+
expect(4)
167+
sub!!.cancel()
168+
169+
try {
170+
deferred.await()
171+
expectUnreached()
172+
} catch (e: TestException) {
173+
expect(6)
174+
}
175+
176+
finish(7)
177+
}
178+
134179
@Test
135180
fun testPublishFailureCancelsParent() = runTest(
136181
expected = { it is TestException }
@@ -207,6 +252,4 @@ class PublishTest : TestBase() {
207252
latch.await()
208253
finish(8)
209254
}
210-
211-
private class TestException : Exception()
212255
}

0 commit comments

Comments
 (0)