Skip to content

Version 1.2.0 #1089

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Apr 12, 2019
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,18 @@ To release new `<version>` of `kotlinx-coroutines`:

5. Announce new release in [Slack](https://kotlinlang.slack.com)

6. Switch into `develop` branch:<br>
6. Create a ticket to update coroutines version on [try.kotlinlang.org](try.kotlinlang.org).
* Use [KT-30870](https://youtrack.jetbrains.com/issue/KT-30870) as a template
* This step should be skipped for eap versions that are not merged to `master`

7. Switch into `develop` branch:<br>
`git checkout develop`

7. Fetch the latest `master`:<br>
8. Fetch the latest `master`:<br>
`git fetch`

8. Merge release from `master`:<br>
9. Merge release from `master`:<br>
`git merge origin/master`

9. Push updates to `develop`:<br>
10. Push updates to `develop`:<br>
`git push`
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# Kotlin
version=1.2.0-alpha-2-SNAPSHOT
group=org.jetbrains.kotlinx
kotlin_version=1.3.21
kotlin_version=1.3.30

# Dependencies
junit_version=4.12
atomicFU_version=0.12.2
atomicFU_version=0.12.3
html_version=0.6.8
lincheck_version=2.0
dokka_version=0.9.16-rdev-2-mpp-hacks
Expand Down
2 changes: 1 addition & 1 deletion integration/kotlinx-coroutines-play-services/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import java.util.zip.ZipFile
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

ext.tasks_version = '15.0.1'
ext.tasks_version = '16.0.1'

def attr = Attribute.of("artifactType", String.class)
configurations {
Expand Down
12 changes: 8 additions & 4 deletions integration/kotlinx-coroutines-play-services/src/Tasks.kt
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ public fun <T> Task<T>.asDeferred(): Deferred<T> {
if (isComplete) {
val e = exception
return if (e == null) {
CompletableDeferred<T>().apply { if (isCanceled) cancel() else complete(result) }
@Suppress("UNCHECKED_CAST")
CompletableDeferred<T>().apply { if (isCanceled) cancel() else complete(result as T) }
} else {
CompletableDeferred<T>().apply { completeExceptionally(e) }
}
Expand All @@ -60,7 +61,8 @@ public fun <T> Task<T>.asDeferred(): Deferred<T> {
addOnCompleteListener {
val e = it.exception
if (e == null) {
if (isCanceled) result.cancel() else result.complete(it.result)
@Suppress("UNCHECKED_CAST")
if (isCanceled) result.cancel() else result.complete(it.result as T)
} else {
result.completeExceptionally(e)
}
Expand All @@ -83,7 +85,8 @@ public suspend fun <T> Task<T>.await(): T {
if (isCanceled) {
throw CancellationException("Task $this was cancelled normally.")
} else {
result
@Suppress("UNCHECKED_CAST")
result as T
}
} else {
throw e
Expand All @@ -94,7 +97,8 @@ public suspend fun <T> Task<T>.await(): T {
addOnCompleteListener {
val e = exception
if (e == null) {
if (isCanceled) cont.cancel() else cont.resume(result)
@Suppress("UNCHECKED_CAST")
if (isCanceled) cont.cancel() else cont.resume(result as T)
} else {
cont.resumeWithException(e)
}
Expand Down
5 changes: 5 additions & 0 deletions integration/kotlinx-coroutines-play-services/test/TaskTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ class TaskTest : TestBase() {
assertEquals(42, deferred.await())
}

@Test
fun testNullResultTaskAsDeferred() = runTest {
assertNull(Tasks.forResult(null).asDeferred().await())
}

@Test
fun testCancelledTaskAsDeferred() = runTest {
val deferred = Tasks.forCanceled<Int>().asDeferred()
Expand Down
6 changes: 5 additions & 1 deletion kotlinx-coroutines-core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,11 @@ Synchronization primitives (mutex).

# Package kotlinx.coroutines.channels

Channels -- non-blocking primitives for communicating a stream of elements between coroutines.
Channels &mdash; non-blocking primitives for communicating a stream of elements between coroutines.

# Package kotlinx.coroutines.flow

Flow &mdash; asynchronous cold stream of elements.

# Package kotlinx.coroutines.selects

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -562,8 +562,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
// ------ ReceiveChannel ------

public final override val isClosedForReceive: Boolean get() = closedForReceive != null && isBufferEmpty
public final override val isEmpty: Boolean get() = empty
private val empty: Boolean get() = queue.nextNode !is Send && isBufferEmpty // TODO rename to `isEmpty`
public final override val isEmpty: Boolean get() = queue.nextNode !is Send && isBufferEmpty

@Suppress("UNCHECKED_CAST")
public final override suspend fun receive(): E {
Expand Down Expand Up @@ -750,7 +749,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
private fun <R> registerSelectReceive(select: SelectInstance<R>, block: suspend (E) -> R) {
while (true) {
if (select.isSelected) return
if (empty) {
if (isEmpty) {
val enqueueOp = TryEnqueueReceiveDesc(select, block as (suspend (E?) -> R), nullOnClose = false)
val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return
when {
Expand Down Expand Up @@ -784,7 +783,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
private fun <R> registerSelectReceiveOrNull(select: SelectInstance<R>, block: suspend (E?) -> R) {
while (true) {
if (select.isSelected) return
if (empty) {
if (isEmpty) {
val enqueueOp = TryEnqueueReceiveDesc(select, block, nullOnClose = true)
val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return
when {
Expand Down
3 changes: 0 additions & 3 deletions kotlinx-coroutines-core/common/src/channels/Channel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,8 @@ public interface ReceiveChannel<out E> {
/**
* Returns `true` if the channel is empty (contains no elements) and the [receive] attempt will suspend.
* This function returns `false` for [isClosedForReceive] channel.
*
* @suppress **Will be removed in next releases, no replacement.**
*/
@ExperimentalCoroutinesApi
@Deprecated(level = DeprecationLevel.ERROR, message = "Will be removed in next releases without replacement")
public val isEmpty: Boolean

/**
Expand Down
24 changes: 19 additions & 5 deletions kotlinx-coroutines-core/common/src/flow/Builders.kt
Original file line number Diff line number Diff line change
Expand Up @@ -183,19 +183,33 @@ public fun LongRange.asFlow(): Flow<Long> = flow {
}

/**
* Creates an instance of the cold [Flow] from a supplied [SendChannel].
* Creates an instance of the cold [Flow] with elements that are sent to a [SendChannel]
* that is provided to the builder's [block] of code. It allows elements to be
* produced by the code that is running in a different context,
* e.g. from a callback-based API.
*
* The resulting flow is _cold_, which means that [block] is called on each call of a terminal operator
* on the resulting flow.
*
* To control backpressure, [bufferSize] is used and matches directly the `capacity` parameter of [Channel] factory.
* The provided channel can later be used by any external service to communicate with flow and its buffer determines
* backpressure buffer size or its behaviour (e.g. in case when [Channel.CONFLATED] was used).
*
* Example of usage:
*
* ```
* fun flowFrom(api: CallbackBasedApi): Flow<Int> = flowViaChannel { channel ->
* val adapter = FlowSinkAdapter(channel) // implementation of callback interface
* api.register(adapter)
* fun flowFrom(api: CallbackBasedApi): Flow<T> = flowViaChannel { channel ->
* val callback = object : Callback { // implementation of some callback interface
* override fun onNextValue(value: T) {
* channel.offer(value) // Note: offer drops value when buffer is full
* }
* override fun onApiError(cause: Throwable) {
* channel.cancel("API Error", CancellationException(cause))
* }
* }
* api.register(callback)
* channel.invokeOnClose {
* api.unregister(adapter)
* api.unregister(callback)
* }
* }
* ```
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/flow/Flow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import kotlinx.coroutines.*
*
* Flow does not carry information whether it is a cold stream (that can be collected multiple times and
* triggers its evaluation every time collection is executed) or hot one, but conventionally flow represents a cold stream.
* Transitions between hot and cold streams are support via channels and corresponding API: [flowViaChannel], [broadcastIn], [produceIn].
* Transitions between hot and cold streams are supported via channels and corresponding API: [flowViaChannel], [broadcastIn], [produceIn].
*
* Flow is a **pure** concept: it encapsulates its own execution context and never propagates it to the downstream, thus making
* reasoning about execution context of particular transformations or terminal operations trivial.
Expand Down
14 changes: 9 additions & 5 deletions kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ class ArrayChannelTest : TestBase() {
@Test
fun testSimple() = runTest {
val q = Channel<Int>(1)
check(q.isEmpty)
expect(1)
val sender = launch {
expect(4)
q.send(1) // success -- buffered
check(!q.isEmpty)
expect(5)
q.send(2) // suspends (buffer full)
expect(9)
Expand All @@ -23,27 +25,29 @@ class ArrayChannelTest : TestBase() {
val receiver = launch {
expect(6)
check(q.receive() == 1) // does not suspend -- took from buffer
check(!q.isEmpty) // waiting sender's element moved to buffer
expect(7)
check(q.receive() == 2) // does not suspend (takes from sender)
expect(8)
}
expect(3)
sender.join()
receiver.join()
check(q.isEmpty)
finish(10)
}

@Test
fun testClosedBufferedReceiveOrNull() = runTest {
val q = Channel<Int>(1)
check(!q.isClosedForSend && !q.isClosedForReceive)
check(q.isEmpty && !q.isClosedForSend && !q.isClosedForReceive)
expect(1)
launch {
expect(5)
check(q.isClosedForSend && !q.isClosedForReceive)
check(!q.isEmpty && q.isClosedForSend && !q.isClosedForReceive)
assertEquals(42, q.receiveOrNull())
expect(6)
check(q.isClosedForSend && q.isClosedForReceive)
check(!q.isEmpty && q.isClosedForSend && q.isClosedForReceive)
assertEquals(null, q.receiveOrNull())
expect(7)
}
Expand All @@ -52,9 +56,9 @@ class ArrayChannelTest : TestBase() {
expect(3)
q.close() // goes on
expect(4)
check(q.isClosedForSend && !q.isClosedForReceive)
check(!q.isEmpty && q.isClosedForSend && !q.isClosedForReceive)
yield()
check(q.isClosedForSend && q.isClosedForReceive)
check(!q.isEmpty && q.isClosedForSend && q.isClosedForReceive)
finish(8)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class RendezvousChannelTest : TestBase() {
@Test
fun testSimple() = runTest {
val q = Channel<Int>(Channel.RENDEZVOUS)
check(q.isEmpty)
expect(1)
val sender = launch {
expect(4)
Expand All @@ -30,13 +31,14 @@ class RendezvousChannelTest : TestBase() {
expect(3)
sender.join()
receiver.join()
check(q.isEmpty)
finish(10)
}

@Test
fun testClosedReceiveOrNull() = runTest {
val q = Channel<Int>(Channel.RENDEZVOUS)
check(!q.isClosedForSend && !q.isClosedForReceive)
check(q.isEmpty && !q.isClosedForSend && !q.isClosedForReceive)
expect(1)
launch {
expect(3)
Expand All @@ -49,9 +51,9 @@ class RendezvousChannelTest : TestBase() {
q.send(42)
expect(5)
q.close()
check(q.isClosedForSend && q.isClosedForReceive)
check(!q.isEmpty && q.isClosedForSend && q.isClosedForReceive)
yield()
check(q.isClosedForSend && q.isClosedForReceive)
check(!q.isEmpty && q.isClosedForSend && q.isClosedForReceive)
finish(7)
}

Expand Down Expand Up @@ -252,7 +254,7 @@ class RendezvousChannelTest : TestBase() {
expect(1)
send(bad)
}
assertTrue(c.receive() === bad)
assertSame(c.receive(), bad)
finish(2)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ private class ChannelViaBroadcast<E>(
val sub = broadcast.openSubscription()

override val isClosedForReceive: Boolean get() = sub.isClosedForReceive
@Suppress("DEPRECATION_ERROR")
override val isEmpty: Boolean get() = sub.isEmpty

override suspend fun receive(): E = sub.receive()
Expand Down