From cf9db6c01aaa6047def015c5a3c092ef55fad284 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Thu, 9 Aug 2018 16:17:58 +0300 Subject: [PATCH] Introduce typesafe actors abstraction Fixes #87 and #169 --- .../kotlinx-coroutines-core.txt | 33 ++ .../src/actors/AbstractActor.kt | 76 +++++ .../src/actors/Actor.kt | 65 ++++ .../src/actors/ActorTraits.kt | 82 +++++ .../src/actors/Actors.kt | 49 +++ .../src/actors/TypedActor.kt | 80 +++++ core/kotlinx-coroutines-core/test/TestBase.kt | 8 +- .../ActorSequentialProcessingStressTest.kt | 96 ++++++ .../test/actors/ActorTest.kt | 100 ++++++ .../test/actors/ActorsBaseTest.kt | 304 ++++++++++++++++++ .../test/actors/TypedActorTest.kt | 109 +++++++ .../test/guide/example-actors-01.kt | 46 +++ .../test/guide/example-actors-02.kt | 41 +++ .../test/guide/test/GuideTest.kt | 16 +- coroutines-guide.md | 155 +++++++-- 15 files changed, 1224 insertions(+), 36 deletions(-) create mode 100644 core/kotlinx-coroutines-core/src/actors/AbstractActor.kt create mode 100644 core/kotlinx-coroutines-core/src/actors/Actor.kt create mode 100644 core/kotlinx-coroutines-core/src/actors/ActorTraits.kt create mode 100644 core/kotlinx-coroutines-core/src/actors/Actors.kt create mode 100644 core/kotlinx-coroutines-core/src/actors/TypedActor.kt create mode 100644 core/kotlinx-coroutines-core/test/actors/ActorSequentialProcessingStressTest.kt create mode 100644 core/kotlinx-coroutines-core/test/actors/ActorTest.kt create mode 100644 core/kotlinx-coroutines-core/test/actors/ActorsBaseTest.kt create mode 100644 core/kotlinx-coroutines-core/test/actors/TypedActorTest.kt create mode 100644 core/kotlinx-coroutines-core/test/guide/example-actors-01.kt create mode 100644 core/kotlinx-coroutines-core/test/guide/example-actors-02.kt diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt index a9aa842d9a..40c61cde48 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt @@ -462,6 +462,39 @@ public final class kotlinx/coroutines/experimental/YieldKt { public static final fun yield (Lkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object; } +public abstract class kotlinx/coroutines/experimental/actors/Actor { + public fun ()V + public fun (Lkotlin/coroutines/experimental/CoroutineContext;Lkotlinx/coroutines/experimental/Job;Lkotlinx/coroutines/experimental/CoroutineStart;I)V + public synthetic fun (Lkotlin/coroutines/experimental/CoroutineContext;Lkotlinx/coroutines/experimental/Job;Lkotlinx/coroutines/experimental/CoroutineStart;IILkotlin/jvm/internal/DefaultConstructorMarker;)V + protected final fun act (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object; +} + +public abstract class kotlinx/coroutines/experimental/actors/ActorTraits { + public fun ()V + public abstract fun cancel ()V + public abstract fun close ()V + public abstract fun getJob ()Lkotlinx/coroutines/experimental/Job; + public final fun join (Lkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object; + protected fun onClose ()V + protected fun onStart (Lkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object; +} + +public final class kotlinx/coroutines/experimental/actors/ActorsKt { + public static final fun actor (Lkotlin/coroutines/experimental/CoroutineContext;Lkotlinx/coroutines/experimental/Job;Lkotlinx/coroutines/experimental/CoroutineStart;ILkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/experimental/actors/TypedActor; + public static final fun actor (Lkotlin/coroutines/experimental/CoroutineContext;Lkotlinx/coroutines/experimental/actors/ActorTraits;Lkotlinx/coroutines/experimental/CoroutineStart;ILkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/experimental/actors/TypedActor; + public static synthetic fun actor$default (Lkotlin/coroutines/experimental/CoroutineContext;Lkotlinx/coroutines/experimental/Job;Lkotlinx/coroutines/experimental/CoroutineStart;ILkotlin/jvm/functions/Function3;ILjava/lang/Object;)Lkotlinx/coroutines/experimental/actors/TypedActor; + public static synthetic fun actor$default (Lkotlin/coroutines/experimental/CoroutineContext;Lkotlinx/coroutines/experimental/actors/ActorTraits;Lkotlinx/coroutines/experimental/CoroutineStart;ILkotlin/jvm/functions/Function3;ILjava/lang/Object;)Lkotlinx/coroutines/experimental/actors/TypedActor; +} + +public abstract class kotlinx/coroutines/experimental/actors/TypedActor { + public fun ()V + public fun (Lkotlin/coroutines/experimental/CoroutineContext;Lkotlinx/coroutines/experimental/Job;Lkotlinx/coroutines/experimental/CoroutineStart;I)V + public synthetic fun (Lkotlin/coroutines/experimental/CoroutineContext;Lkotlinx/coroutines/experimental/Job;Lkotlinx/coroutines/experimental/CoroutineStart;IILkotlin/jvm/internal/DefaultConstructorMarker;)V + public final fun offer (Ljava/lang/Object;)Z + protected abstract fun receive (Ljava/lang/Object;Lkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object; + public final fun send (Ljava/lang/Object;Lkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object; +} + public abstract class kotlinx/coroutines/experimental/channels/AbstractChannel : kotlinx/coroutines/experimental/channels/AbstractSendChannel, kotlinx/coroutines/experimental/channels/Channel { public fun ()V public fun cancel (Ljava/lang/Throwable;)Z diff --git a/core/kotlinx-coroutines-core/src/actors/AbstractActor.kt b/core/kotlinx-coroutines-core/src/actors/AbstractActor.kt new file mode 100644 index 0000000000..c6349a2be4 --- /dev/null +++ b/core/kotlinx-coroutines-core/src/actors/AbstractActor.kt @@ -0,0 +1,76 @@ +/* + * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.experimental.actors + +import kotlinx.atomicfu.* +import kotlinx.coroutines.experimental.* +import kotlinx.coroutines.experimental.channels.* +import kotlin.coroutines.experimental.* + +/** + * Base class for actors implementation, which provides implementation for [ActorTraits] + * This class is not designed to be extended outside of kotlinx.coroutines, so it's internal + * + * @param T type of messages which are stored in the mailbox + */ +internal abstract class AbstractActor( + context: CoroutineContext = DefaultDispatcher, + parent: Job? = null, + start: CoroutineStart = CoroutineStart.LAZY, + channelCapacity: Int = 16 +) : ActorTraits() { + + internal val mailbox = Channel(channelCapacity) + public final override val job: Job = launch(context, start, parent) { actorLoop() } + + /* + * Guard for onClose. + * It's necessary to invoke onClose in the end of actor body even when we have job completion: + * if actor decides to decompose its work, then onClose should be called *before* actor's body end, + * otherwise delegated work will never be closed, because job completion will await all created children + * to complete + */ + private val onCloseInvoked = atomic(0) + + // Save an allocation + private inner class OnCloseNode : JobNode(job) { + override fun invoke(cause: Throwable?) { + if (onCloseInvoked.compareAndSet(0, 1)) { + onClose() + } + } + } + + init { + job.invokeOnCompletion(OnCloseNode()) + } + + public override fun close() { + mailbox.close() + } + + public override fun cancel() { + job.cancel() + mailbox.cancel() + } + + private suspend fun actorLoop() { + try { + onStart() + for (message in mailbox) { + onMessage(message) + } + } catch (e: Throwable) { + handleCoroutineException(coroutineContext, e) + } finally { + mailbox.close() + if (onCloseInvoked.compareAndSet(0, 1)) { + onClose() + } + } + } + + internal abstract suspend fun onMessage(message: T) +} diff --git a/core/kotlinx-coroutines-core/src/actors/Actor.kt b/core/kotlinx-coroutines-core/src/actors/Actor.kt new file mode 100644 index 0000000000..c78ecba05e --- /dev/null +++ b/core/kotlinx-coroutines-core/src/actors/Actor.kt @@ -0,0 +1,65 @@ +/* + * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.experimental.actors + +import kotlinx.coroutines.experimental.* +import kotlinx.coroutines.experimental.channels.* +import kotlin.coroutines.experimental.* + +/** + * [Actor] is the base for all stateful actors, who have to process more than one type of messages. + * [Actor] has well-defined lifecycle described in [ActorTraits]. + * + * To declare message handler, actor should have methods declared using [act], + * which are used to send message "Send message which handler invokes `act` body" + * + * Example, where the actor asynchronously processes two types of messages: + * ``` + * class ExampleActor : Actor() { + * + * suspend fun sendInt(number: Int) = act { + * println("Received $number") + * } + * + * suspend fun sendString(string: String) = act { + * println("Received $string") + * } + * } + * + * + * // Sender + * exampleActor.sendInt(42) + * ``` + * + * @param context context in which actor's job will be launched + * @param parent optional parent of actor's job + * @param start start mode of actor's job + * @param channelCapacity capacity of actor's mailbox aka maximum count of pending messages + */ +@Suppress("EXPOSED_SUPER_CLASS") +abstract class Actor( + context: CoroutineContext = DefaultDispatcher, + parent: Job? = null, + start: CoroutineStart = CoroutineStart.LAZY, + channelCapacity: Int = 16 +) : AbstractActor Unit>(context, parent, start, channelCapacity) { + + /** + * Schedules [block] as a message to the actor mailbox. + * All messages sent via [act] will be processed sequentially in the actor context. + * Act semantics is equivalent to sending lambda to channel with receiver, which invokes + * all sent lambdas. + * + * @throws ClosedSendChannelException if actor is [closed][close] + */ + protected suspend fun act(block: suspend () -> Unit) { + job.start() + mailbox.send(block) + } + + internal override suspend fun onMessage(message: suspend () -> Unit) { + message() + } +} diff --git a/core/kotlinx-coroutines-core/src/actors/ActorTraits.kt b/core/kotlinx-coroutines-core/src/actors/ActorTraits.kt new file mode 100644 index 0000000000..7e4b7cedb8 --- /dev/null +++ b/core/kotlinx-coroutines-core/src/actors/ActorTraits.kt @@ -0,0 +1,82 @@ +/* + * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.experimental.actors + +import kotlinx.coroutines.experimental.* +import kotlinx.coroutines.experimental.channels.* +import kotlin.coroutines.experimental.* + +/** + * Actor traits, common for [Actor] and [TypedActor]. + * Simply speaking, actor is a high-level abstraction for [channel][ReceiveChannel] and coroutine, which + * sequentially processes messages from the channel. + * + * Actors are inspired by the Actor Model: http://en.wikipedia.org/wiki/Actor_model, + * but have slightly different semantics to expose type-safety over address transparency. + * + * Every actor has a [Job] associated with it, which lifecycle is tightly bound with actor lifecycle. + * + * Any actor has well-defined lifecycle: + * -- Not started. Note that by default actors are started [lazily][CoroutineStart.LAZY] + * -- Active. Actor is running and processing incoming messages + * -- Closing. Actor's channel is closed for new messages, but actor is processing all pending messages, + * then invokes [onClose]. Can be triggered by [close] call + * -- Closed. Actor and all its children (both actors and launched jobs) are completed, [job] is completed. + * -- Cancelled. Actor's channel is closed for new messages, its job is cancelled, pending messages are not processed and + * hang in the channel, [onClose] is invoked. Can be triggered by [cancel] call + * + * Note: + * [ActorTraits] doesn't have any variations of `send` method, because different implementations + * have different ways to expose mailbox to provide static typing. + */ +abstract class ActorTraits { + + /** + * Job identifying current actor and available from its [coroutineContext] + * + * Lifecycle: + * If job is cancelled, actor is effectively killed + * If actor is closed, job is completed as soon as all messages are processed and all launched children are completed + * If actor is cancelled, job is cancelled immediately + */ + public abstract val job: Job + + /** + * Close the actor and its channel. + * Before closing, the actor processes all pending messages and calls [onClose] + */ + public abstract fun close() + + /** + * Cancel the actor and its channel without letting the actor to process pending messages. + * This is a last ditch way to stop the actor which shouldn't be used normally. + * It's guaranteed that [onClose] will be called. + */ + public abstract fun cancel() + + /** + * Handler which is invoked when actor is started. + * Actor is started according to its [start mode][CoroutineStart]. + * This method will not be invoked is actor is started lazily and is cancelled before receiving any messages. + * If [onStart] throws an exception, actor is immediately [cancelled][cancel]. + */ + protected open suspend fun onStart() {} + + /** + * Handler which is invoked when actor is being closed or killed. + * It's guaranteed that on the moment of invocation no more messages will be processed by the actor + * and no more messages can be sent. + * This handler is invoked even if actor wasn't started to properly cleanup resources owned by the actor. + * + * Handler is invoked before associated [job] is completed or cancelled to allow graceful shutdown + * and ability to shutdown child tasks. + */ + protected open fun onClose() {} + + /** + * Waits until the actor is completed or cancelled + */ + public suspend fun join(): Unit = job.join() +} diff --git a/core/kotlinx-coroutines-core/src/actors/Actors.kt b/core/kotlinx-coroutines-core/src/actors/Actors.kt new file mode 100644 index 0000000000..c2e41edfdd --- /dev/null +++ b/core/kotlinx-coroutines-core/src/actors/Actors.kt @@ -0,0 +1,49 @@ +/* + * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.experimental.actors + +import kotlinx.coroutines.experimental.* +import kotlin.coroutines.experimental.* + + +/** + * Creates a new [TypedActor] with given [block] as [message handler][TypedActor.receive] + * + * @param context context in which actor's job will be launched + * @param parent optional parent of actor's job + * @param start start mode of actor's job + * @param channelCapacity capacity of actor's mailbox aka maximum count of pending messages + * @param block actor's message handler + */ +public fun actor( + context: CoroutineContext = DefaultDispatcher, + parent: ActorTraits, + start: CoroutineStart = CoroutineStart.LAZY, + channelCapacity: Int = 16, block: suspend TypedActor.(T) -> Unit +): TypedActor { + return actor(context, parent.job, start, channelCapacity, block) +} + +/** + * Creates a new [TypedActor] with given [block] as [message handler][TypedActor.receive] + * + * @param context context in which actor's job will be launched + * @param parent optional parent of actor's job + * @param start start mode of actor's job + * @param channelCapacity capacity of actor's mailbox aka maximum count of pending messages + * @param block actor's message handler + */ +public fun actor( + context: CoroutineContext = DefaultDispatcher, + parent: Job? = null, + start: CoroutineStart = CoroutineStart.LAZY, + channelCapacity: Int = 16, block: suspend TypedActor.(T) -> Unit +): TypedActor { + return object : TypedActor(context, parent, start, channelCapacity) { + override suspend fun receive(message: T) { + block(message) + } + } +} diff --git a/core/kotlinx-coroutines-core/src/actors/TypedActor.kt b/core/kotlinx-coroutines-core/src/actors/TypedActor.kt new file mode 100644 index 0000000000..b3c0954416 --- /dev/null +++ b/core/kotlinx-coroutines-core/src/actors/TypedActor.kt @@ -0,0 +1,80 @@ +/* + * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.experimental.actors + +import kotlinx.coroutines.experimental.* +import kotlinx.coroutines.experimental.channels.* +import kotlin.coroutines.experimental.* + +/** + * [TypedActor] is the base for all stateful actors, which can process only one [type][T] of messages. + * [TypedActor] has well-defined lifecycle described in [ActorTraits]. + * [TypedActor.receive] method is used to declare a message handler, which is parametrized by [T] + * to provide better compile-type safety. + * + * Example: + * ``` + * class ExampleActor : TypedActor() { + * + * override suspend fun receive(string: String) = act { + * println("Received $string") + * } + * } + * + * // Sender + * exampleActor.send("foo") + * ``` + * + * @param T type of the message this actor can handle + * @param context context in which actor's job will be launched + * @param parent optional parent of actor's job + * @param start start mode of actor's job + * @param channelCapacity capacity of actor's mailbox aka maximum count of pending messages + */ +@Suppress("EXPOSED_SUPER_CLASS") +abstract class TypedActor( + context: CoroutineContext = DefaultDispatcher, + parent: Job? = null, + start: CoroutineStart = CoroutineStart.LAZY, + channelCapacity: Int = 16 +) : AbstractActor(context, parent, start, channelCapacity) { + + + /** + * Sends the message to the actor, which later will be sequentially processed by [receive]. + * Sender is suspended, if actor's channel capacity is reached. This suspension is cancellable + * and has semantics similar to [SendChannel.send] + * + * @throws ClosedSendChannelException if actor is [closed][close] + */ + suspend fun send(message: T) { + job.start() + mailbox.send(message) + } + + /** + * Attempts to send message to the actor, which later will be sequentially processed by [receive]. + * Attempt is successful if actor's channel capacity restriction is not violated. + * This method is intended to be used from synchronous callbacks with [Channel.UNLIMITED] + * + * @throws ClosedSendChannelException if actor is [closed][close] + * @return `true` if offer was successful, false otherwise + */ + fun offer(message: T): Boolean { + job.start() + return mailbox.offer(message) + } + + /** + * Handler, which handles all received messages. + * + * @throws ClassCastException if actor was casted to raw type and [send] was invoked with wrong type of the argument + */ + protected abstract suspend fun receive(message: T) + + internal override suspend fun onMessage(message: T) { + receive(message) + } +} diff --git a/core/kotlinx-coroutines-core/test/TestBase.kt b/core/kotlinx-coroutines-core/test/TestBase.kt index 2ef6cdd6b7..19e4842087 100644 --- a/core/kotlinx-coroutines-core/test/TestBase.kt +++ b/core/kotlinx-coroutines-core/test/TestBase.kt @@ -4,8 +4,8 @@ package kotlinx.coroutines.experimental -import org.junit.* import kotlinx.coroutines.experimental.scheduling.* +import org.junit.* import java.util.concurrent.atomic.* /** @@ -42,7 +42,7 @@ public actual open class TestBase actual constructor() { private var actionIndex = AtomicInteger() private var finished = AtomicBoolean() - private var error = AtomicReference() + private var error = AtomicReference() /** * Throws [IllegalStateException] like `error` in stdlib, but also ensures that the test will not @@ -103,7 +103,9 @@ public actual open class TestBase actual constructor() { @After fun onCompletion() { - error.get()?.let { throw it } + error.get()?.let { + throw IllegalStateException("Throwing error from onCompletion because error() was called)", it) + } check(actionIndex.get() == 0 || finished.get()) { "Expecting that 'finish(...)' was invoked, but it was not" } shutdownPoolsAfterTest() checkTestThreads(threadsBefore) diff --git a/core/kotlinx-coroutines-core/test/actors/ActorSequentialProcessingStressTest.kt b/core/kotlinx-coroutines-core/test/actors/ActorSequentialProcessingStressTest.kt new file mode 100644 index 0000000000..1480ac9b2f --- /dev/null +++ b/core/kotlinx-coroutines-core/test/actors/ActorSequentialProcessingStressTest.kt @@ -0,0 +1,96 @@ +/* + * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.experimental.actors + +import kotlinx.coroutines.experimental.* +import org.junit.* +import org.junit.Test +import java.util.concurrent.* +import kotlin.test.* + +class ActorSequentialProcessingStressTest : TestBase() { + + private val iterations = 10_000 * stressTestMultiplier + private val senders = 4 + + private val actorContext = newSingleThreadContext("Actor Stress Test") + private val senderContext = newFixedThreadPoolContext(senders, "Actor Stress Test") + + @After + fun tearDown() { + actorContext.close() + senderContext.close() + } + + private inner class TestActor : Actor(actorContext) { + var state = 0 + private var thread: Thread? = null + + suspend fun increment() = act { + ++state + if (thread == null) { + thread = Thread.currentThread() + } else { + assertSame(thread, Thread.currentThread()) + } + } + } + + private inner class TestTypedActor : TypedActor(actorContext) { + var state = 0 + private var thread: Thread? = null + + override suspend fun receive(message: Unit) { + ++state + if (thread == null) { + thread = Thread.currentThread() + } else { + assertSame(thread, Thread.currentThread()) + } + } + } + + @Test + fun testActor() = runTest { + val startBarrier = CyclicBarrier(5) + + val actor = TestActor() + val tasks = (1..4).map { + async(senderContext) { + startBarrier.await() + repeat(iterations) { + actor.increment() + } + } + } + + startBarrier.await() + tasks.awaitAll() + actor.close() + actor.join() + assertEquals(senders * iterations * stressTestMultiplier, actor.state) + } + + @Test + fun testTypedActor() = runTest { + val startBarrier = CyclicBarrier(5) + + val actor = TestTypedActor() + val tasks = (1..4).map { + async(senderContext) { + startBarrier.await() + repeat(iterations) { + actor.send(Unit) + } + } + } + + startBarrier.await() + tasks.awaitAll() + actor.close() + actor.join() + assertEquals(senders * iterations * stressTestMultiplier, actor.state) + } +} diff --git a/core/kotlinx-coroutines-core/test/actors/ActorTest.kt b/core/kotlinx-coroutines-core/test/actors/ActorTest.kt new file mode 100644 index 0000000000..5c7946edc9 --- /dev/null +++ b/core/kotlinx-coroutines-core/test/actors/ActorTest.kt @@ -0,0 +1,100 @@ +/* + * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.experimental.actors + +import kotlinx.coroutines.experimental.* +import org.junit.Test +import java.util.* +import kotlin.coroutines.experimental.* +import kotlin.test.* + +class ActorTest : TestBase() { + + private class DecomposingActor(ctx: CoroutineContext) : Actor(ctx) { + + private inner class WorkerActor(ctx: CoroutineContext, parent: ActorTraits) : Actor(ctx, parent.job) { + suspend fun onReceive(message: Int) = act { + if (message == 239) { + throw AssertionError() + } + result += message + + } + } + private val workers: List + var result: Int = 0 + + init { + workers = MutableList(2) { WorkerActor(ctx, this) } + } + + suspend fun onReceive(message: Int) = act { + if (message == 314) { + throw AssertionError() + } + workers[Random().nextInt(2)].onReceive(message) + } + + override fun onClose() { + workers.forEach { it.close() } + } + } + + @Test + fun testTransparentDecomposition() = runTest { + val actor = DecomposingActor(coroutineContext) + + for (i in 1..100) { + actor.onReceive(i) + } + + actor.close() + actor.join() + assertEquals(50 * 101, actor.result) + } + + @Test + fun testEagerChildActorFailure() = runTest(unhandled = unhandledFailures(3)) { + val actor = DecomposingActor(coroutineContext.minusKey(Job)) + actor.onReceive(239) + actor.join() + } + + @Test + fun testChildActorFailure() = runTest(unhandled = unhandledFailures(3)) { + val actor = DecomposingActor(coroutineContext.minusKey(Job)) + + for (i in 1..100) { + actor.onReceive(i) + } + + actor.onReceive(239) + actor.join() + assertEquals(50 * 101, actor.result) + } + + @Test + fun testEagerParentActorFailure() = runTest(unhandled = unhandledFailures(2)) { + val actor = DecomposingActor(coroutineContext.minusKey(Job)) + actor.onReceive(314) + actor.join() + } + + @Test + fun testParentActorFailure() = runTest(unhandled = unhandledFailures(2)) { + val actor = DecomposingActor(coroutineContext.minusKey(Job)) + for (i in 1..100) { + actor.onReceive(i) + } + + actor.onReceive(314) + actor.join() + assertEquals(50 * 101, actor.result) + } + + private fun unhandledFailures(count: Int): List<(Throwable) -> Boolean> { + return MutableList(count) { { e: Throwable -> e is AssertionError } } + } +} diff --git a/core/kotlinx-coroutines-core/test/actors/ActorsBaseTest.kt b/core/kotlinx-coroutines-core/test/actors/ActorsBaseTest.kt new file mode 100644 index 0000000000..6f05bb374d --- /dev/null +++ b/core/kotlinx-coroutines-core/test/actors/ActorsBaseTest.kt @@ -0,0 +1,304 @@ +/* + * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.experimental.actors + +import kotlinx.coroutines.experimental.* +import kotlinx.coroutines.experimental.channels.* +import org.junit.Test +import org.junit.runner.* +import org.junit.runners.* +import java.io.* +import kotlin.coroutines.experimental.* +import kotlin.test.* + +@RunWith(Parameterized::class) +class ActorsBaseTest(private val actorType: ActorType) : TestBase() { + + companion object { + @Parameterized.Parameters(name = "{0}") + @JvmStatic + fun params(): Collection> = + ActorType.values().map { arrayOf(it) } + } + + enum class ActorType { + Actor, // lower case for prettier test names + TypedActor + } + + private fun TestActor( + context: CoroutineContext, + capacity: Int = 2, parent: Job? = null, + whenClosed: () -> Unit = {}, + whenStarted: () -> Unit = {} + ): TestActor { + return when (actorType) { + ActorType.Actor -> ActTestActor(context.minusKey(Job), capacity, parent, whenClosed, whenStarted) + ActorType.TypedActor -> TypedTestActor(context.minusKey(Job), capacity, parent, whenClosed, whenStarted) + } + } + + // Common interface for tests + private interface TestActor { + suspend fun expectedSequence(expected: Int) + suspend fun fail() + + public fun close() + public fun cancel() + public suspend fun join() + } + + private inner class TypedTestActor( + context: CoroutineContext, + capacity: Int = 2, + parent: Job? = null, + private val whenClosed: () -> Unit = {}, + private val whenStarted: () -> Unit = {} + ) : TestActor, Actor(context, parent, channelCapacity = capacity) { + + private var isClosed = false + + override suspend fun expectedSequence(expected: Int) = act { + expect(expected) + } + + override suspend fun fail() = act { + throw IOException() + } + + override suspend fun onStart() { + whenStarted() + } + + override fun onClose() { + assertFalse(isClosed) + isClosed = true + whenClosed() + } + } + + private inner class ActTestActor( + context: CoroutineContext, + capacity: Int = 2, + parent: Job? = null, + private val whenClosed: () -> Unit = {}, + private val whenStarted: () -> Unit = {} + ) : TypedActor(context, parent, channelCapacity = capacity), TestActor { + + private lateinit var launchedJob: Job + private var isClosed = false + + override suspend fun receive(message: Any) { + when (message) { + is Int -> expect(message) + is Throwable -> throw message + else -> { + launchedJob = launch(coroutineContext) { while (true) yield() } + } + } + } + + override suspend fun onStart() { + whenStarted() + } + + override suspend fun expectedSequence(expected: Int) = send(expected) + + override suspend fun fail() = send(IOException()) + + override fun onClose() { + assertFalse(isClosed) + isClosed = true + whenClosed() + } + } + + @Test + fun testClose() = runTest { + val actor = TestActor(coroutineContext, 4) + expect(1) + actor.expectedSequence(2) + actor.expectedSequence(3) + actor.close() + actor.join() + finish(4) + } + + @Test + fun testOnClose() = runTest { + val actor = TestActor(coroutineContext, 4, whenClosed = { expect(4) }) + expect(1) + actor.expectedSequence(2) + actor.expectedSequence(3) + actor.close() + actor.join() + finish(5) + } + + @Test + fun testExternalJob() = runTest { + val job = Job() + val actor = TestActor(coroutineContext, parent = job, capacity = 1, whenClosed = { expect(6) }) + expect(1) + actor.expectedSequence(2) + actor.expectedSequence(3) + expect(4) + actor.expectedSequence(5) + job.cancel() + actor.join() + finish(7) + } + + @Test + fun testExternalJobCancellation() = runTest( + unhandled = unhandledFailures(3) + ) { + + val job = launch(coroutineContext.minusKey(Job)) { + expect(2) + while (true) { + yield() + } + } + + val actor = TestActor(coroutineContext, parent = job, capacity = 1) + expect(1) + actor.expectedSequence(3) + yield() + actor.fail() + actor.join() + assertTrue(job.isCompleted) + finish(4) + } + + @Test + fun testExternalJobWithException() = runTest { + val job = Job() + val actor = TestActor(coroutineContext, parent = job, capacity = 1, whenClosed = { expect(6) }) + expect(1) + actor.expectedSequence(2) + actor.expectedSequence(3) + expect(4) + actor.expectedSequence(5) + job.cancel(IOException()) + actor.join() + finish(7) + } + + @Test + fun testCloseWithExternalJob() = runTest { + val job = Job() + val actor = TestActor(coroutineContext, parent = job, capacity = 1, whenClosed = { expect(6) }) + expect(1) + actor.expectedSequence(2) + actor.expectedSequence(3) + expect(4) + actor.expectedSequence(5) + actor.close() + actor.join() + finish(7) + } + + @Test + fun testCancel() = runTest { + val actor = TestActor(coroutineContext, 4) + expect(1) + actor.expectedSequence(2) + actor.expectedSequence(3) + actor.cancel() + actor.join() + finish(2) + } + + @Test + fun testOnCloseCancel() = runTest { + val actor = TestActor(coroutineContext, 4, whenClosed = { expect(3) }) + expect(1) + actor.expectedSequence(2) + yield() + actor.cancel() + actor.join() + finish(4) + } + + @Test + fun testOnCloseCancelNotStarted() = runTest { + val actor = TestActor(coroutineContext, 4, whenClosed = { expect(2) }) + expect(1) + actor.expectedSequence(2) // is not invoked + actor.expectedSequence(3) // is not invoked + actor.cancel() + actor.join() + finish(3) + } + + @Test + fun testOnCloseCloseNotStarted() = runTest { + val actor = TestActor(coroutineContext, 4, whenClosed = { expect(2) }) + expect(1) + actor.close() + actor.join() + finish(3) + } + + @Test + fun testClosedActorThrows() = runTest(expected = { it is ClosedSendChannelException }) { + val actor = TestActor(coroutineContext) + actor.close() + actor.expectedSequence(1) + expectUnreached() + } + + @Test + fun testOnStart() = runTest { + val actor = TestActor(coroutineContext, whenStarted = { expect(1) }) + actor.expectedSequence(2) + actor.close() + actor.join() + finish(3) + } + + @Test + fun testOnStartNotCalled() = runTest { + val actor = TestActor(coroutineContext, whenStarted = { expectUnreached() }) + actor.cancel() + actor.join() + finish(1) + } + + @Test + fun testOnStartThrowing() = runTest(unhandled = unhandledFailures(2)) { + val actor = TestActor(coroutineContext.minusKey(Job), whenStarted = { throw IOException() }) + actor.join() + } + + @Test + fun testActorUnhandledExceptions() = runTest( + expected = { it is ClosedSendChannelException }, + unhandled = unhandledFailures(2) + ) { + val actor = TestActor(coroutineContext) + actor.fail() + yield() + actor.expectedSequence(1) + expectUnreached() + } + + @Test + fun testConflated() = runTest { + val actor = TestActor(coroutineContext, capacity = Channel.CONFLATED) + actor.expectedSequence(42) + actor.expectedSequence(-1) + actor.expectedSequence(1) + yield() + actor.close() + actor.join() + finish(2) + } + + private fun unhandledFailures(count: Int): List<(Throwable) -> Boolean> { + return MutableList(count) { { e: Throwable -> e is IOException } } + } +} diff --git a/core/kotlinx-coroutines-core/test/actors/TypedActorTest.kt b/core/kotlinx-coroutines-core/test/actors/TypedActorTest.kt new file mode 100644 index 0000000000..30c41db0f2 --- /dev/null +++ b/core/kotlinx-coroutines-core/test/actors/TypedActorTest.kt @@ -0,0 +1,109 @@ +/* + * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.experimental.actors + +import kotlinx.coroutines.experimental.* +import org.junit.Test +import java.util.* +import kotlin.coroutines.experimental.* +import kotlin.test.* + +class TypedActorTest : TestBase() { + + private class DecomposingActor(ctx: CoroutineContext) : TypedActor(ctx) { + + private val workers: List> + var result: Int = 0 + + init { + workers = MutableList(2) { _ -> + actor(ctx, parent = this) { i -> + if (i == 239) throw AssertionError() + result += i + } + } + } + + override suspend fun receive(message: Int) { + if (message == 314) { + throw AssertionError() + } + workers[Random().nextInt(2)].send(message) + } + + override fun onClose() { + workers.forEach { it.close() } + } + } + + @Test + fun testTransparentDecomposition() = runTest { + val actor = DecomposingActor(coroutineContext) + + for (i in 1..100) { + actor.send(i) + } + + actor.close() + actor.join() + assertEquals(50 * 101, actor.result) + } + + @Test + fun testEagerChildActorFailure() = runTest(unhandled = unhandledFailures(3)) { + val actor = DecomposingActor(coroutineContext.minusKey(Job)) + actor.send(239) + actor.join() + } + + @Test + fun testChildActorFailure() = runTest(unhandled = unhandledFailures(3)) { + val actor = DecomposingActor(coroutineContext.minusKey(Job)) + + for (i in 1..100) { + actor.send(i) + } + + actor.send(239) + actor.join() + assertEquals(50 * 101, actor.result) + } + + @Test + fun testEagerParentActorFailure() = runTest(unhandled = unhandledFailures(2)) { + val actor = DecomposingActor(coroutineContext.minusKey(Job)) + actor.send(314) + actor.join() + } + + @Test + fun testParentActorFailure() = runTest(unhandled = unhandledFailures(2)) { + val actor = DecomposingActor(coroutineContext.minusKey(Job)) + for (i in 1..100) { + actor.send(i) + } + + actor.send(314) + actor.join() + assertEquals(50 * 101, actor.result) + } + + @Test + fun testOffer() = runTest { + val actor = actor(coroutineContext, channelCapacity = 2) { expect(it) } + expect(1) + assertTrue(actor.offer(3)) + assertTrue(actor.offer(4)) + assertFalse(actor.offer(5)) + expect(2) + actor.close() + actor.join() + finish(5) + } + + private fun unhandledFailures(count: Int): List<(Throwable) -> Boolean> { + return MutableList(count) { { e: Throwable -> e is AssertionError } } + } +} diff --git a/core/kotlinx-coroutines-core/test/guide/example-actors-01.kt b/core/kotlinx-coroutines-core/test/guide/example-actors-01.kt new file mode 100644 index 0000000000..f5c6df550b --- /dev/null +++ b/core/kotlinx-coroutines-core/test/guide/example-actors-01.kt @@ -0,0 +1,46 @@ +/* + * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit. +package kotlinx.coroutines.experimental.guide.actors01 + +import kotlinx.coroutines.experimental.* +import kotlinx.coroutines.experimental.actors.* +import kotlinx.coroutines.experimental.guide.sync01.* + +// Message types for counter actor +sealed class CounterMsg +object IncCounter : CounterMsg() // one-way message to increment counter +class GetCounter(val response: CompletableDeferred) : CounterMsg() // a request with reply + +class CountingActor : TypedActor() { + + private var counter: Int = 0 + + override suspend fun onStart() { + println("CountingActor started") + } + + override suspend fun receive(message: CounterMsg) { + when (message) { + is IncCounter -> counter++ + is GetCounter -> message.response.complete(counter) + } + } +} + +fun main(args: Array) = runBlocking { + val counter = CountingActor() // create the actor + println("Preparing to send a lot of inc requests") + massiveRun(CommonPool) { + counter.send(IncCounter) + } + + // send a message to get a counter value from an actor + val response = CompletableDeferred() + counter.send(GetCounter(response)) + println("Counter = ${response.await()}") + counter.close() + counter.join() // shutdown the actor and wait for it +} diff --git a/core/kotlinx-coroutines-core/test/guide/example-actors-02.kt b/core/kotlinx-coroutines-core/test/guide/example-actors-02.kt new file mode 100644 index 0000000000..405de2c694 --- /dev/null +++ b/core/kotlinx-coroutines-core/test/guide/example-actors-02.kt @@ -0,0 +1,41 @@ +/* + * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit. +package kotlinx.coroutines.experimental.guide.actors02 + +import kotlinx.coroutines.experimental.* +import kotlinx.coroutines.experimental.actors.* +import kotlinx.coroutines.experimental.guide.sync01.* + +class CountingActor : Actor() { + + private var counter: Int = 0 + + override suspend fun onStart() { + println("CountingActor started") + } + + suspend fun increment() = act { // <- note act {} extension + counter++ + } + + suspend fun counter(response: CompletableDeferred) = act { + response.complete(counter) + } +} + +fun main(args: Array) = runBlocking { + val counter = CountingActor() // create the actor + println("Preparing to send a lot of inc requests") + massiveRun(CommonPool) { + counter.increment() + } + + val response = CompletableDeferred() + counter.counter(response) + println("Counter = ${response.await()}") + counter.close() + counter.join() // shutdown the actor and wait for it +} diff --git a/core/kotlinx-coroutines-core/test/guide/test/GuideTest.kt b/core/kotlinx-coroutines-core/test/guide/test/GuideTest.kt index a49ed3055f..f153ea6fcd 100644 --- a/core/kotlinx-coroutines-core/test/guide/test/GuideTest.kt +++ b/core/kotlinx-coroutines-core/test/guide/test/GuideTest.kt @@ -440,8 +440,20 @@ class GuideTest { } @Test - fun testKotlinxCoroutinesExperimentalGuideSync07() { - test("KotlinxCoroutinesExperimentalGuideSync07") { kotlinx.coroutines.experimental.guide.sync07.main(emptyArray()) }.verifyLinesArbitraryTime( + fun testKotlinxCoroutinesExperimentalGuideActors01() { + test("KotlinxCoroutinesExperimentalGuideActors01") { kotlinx.coroutines.experimental.guide.actors01.main(emptyArray()) }.verifyLinesArbitraryTime( + "Preparing to send a lot of inc requests", + "CountingActor started", + "Completed 1000000 actions in xxx ms", + "Counter = 1000000" + ) + } + + @Test + fun testKotlinxCoroutinesExperimentalGuideActors02() { + test("KotlinxCoroutinesExperimentalGuideActors02") { kotlinx.coroutines.experimental.guide.actors02.main(emptyArray()) }.verifyLinesArbitraryTime( + "Preparing to send a lot of inc requests", + "CountingActor started", "Completed 1000000 actions in xxx ms", "Counter = 1000000" ) diff --git a/coroutines-guide.md b/coroutines-guide.md index a1e1319925..784ef2d29f 100644 --- a/coroutines-guide.md +++ b/coroutines-guide.md @@ -85,7 +85,10 @@ You need to add a dependency on `kotlinx-coroutines-core` module as explained * [Thread confinement fine-grained](#thread-confinement-fine-grained) * [Thread confinement coarse-grained](#thread-confinement-coarse-grained) * [Mutual exclusion](#mutual-exclusion) - * [Actors](#actors) +* [Actors](#actors) + * [Implementing simple actor](#implementing-simple-actor) + * [Typesafe actors](#typesafe-actors) + * [Actors API](#actors-api) * [Select expression](#select-expression) * [Selecting from channels](#selecting-from-channels) * [Selecting on close](#selecting-on-close) @@ -2022,79 +2025,164 @@ The locking in this example is fine-grained, so it pays the price. However, it i where you absolutely must modify some shared state periodically, but there is no natural thread that this state is confined to. -### Actors +## Actors An [actor](https://en.wikipedia.org/wiki/Actor_model) is an entity made up of a combination of a coroutine, the state that is confined and encapsulated into this coroutine, -and a channel to communicate with other coroutines. A simple actor can be written as a function, -but an actor with a complex state is better suited for a class. +and a channel to receive messages from other coroutines. A simple actor can be written as a function, but an actor with a complex state is better suited for a class. +For the sake of simplicity, actor may be considered as a channel and a job, which receives and processes messages from the channel in a sequential manner. +If you are familiar with [Active Object](https://en.wikipedia.org/wiki/Active_object) pattern, then you may notice that actors are very similar to it. +`kotlinx.coroutines` actors differ from classic Hewitt's actors such as ones from Erlang or Akka to provide simpler programming model, but with restricted capabilities if compare with classic actors model. -There is an [actor] coroutine builder that conveniently combines actor's mailbox channel into its -scope to receive messages from and combines the send channel into the resulting job object, so that a -single reference to the actor can be carried around as its handle. -The first step of using an actor is to define a class of messages that an actor is going to process. +### Implementing simple actor + +There is an [TypedActor] base class and shortcut builder [actor] to declare actors which can process only one type of messages. Kotlin's [sealed classes](https://kotlinlang.org/docs/reference/sealed-classes.html) are well suited for that purpose. -We define `CounterMsg` sealed class with `IncCounter` message to increment a counter and `GetCounter` message -to get its value. The later needs to send a response. A [CompletableDeferred] communication -primitive, that represents a single value that will be known (communicated) in the future, -is used here for that purpose. + +Let's start with example and write counting actor. We define `CounterMsg` sealed class with `IncCounter` message to increment a counter and `GetCounter` message +to get its value. The later needs to send a response. A [CompletableDeferred] communication primitive, + that represents a single value that will be known (communicated) in the future, is used here for that purpose. + + ```kotlin -// Message types for counterActor +// Message types for counter actor sealed class CounterMsg object IncCounter : CounterMsg() // one-way message to increment counter class GetCounter(val response: CompletableDeferred) : CounterMsg() // a request with reply -``` - -Then we define a function that launches an actor using an [actor] coroutine builder: -```kotlin -// This function launches a new counter actor -fun counterActor() = actor { - var counter = 0 // actor state - for (msg in channel) { // iterate over incoming messages - when (msg) { +class CountingActor : TypedActor() { + + private var counter: Int = 0 + + override suspend fun onStart() { + println("CountingActor started") + } + + override suspend fun receive(message: CounterMsg) { + when (message) { is IncCounter -> counter++ - is GetCounter -> msg.response.complete(counter) + is GetCounter -> message.response.complete(counter) } } } ``` -The main code is straightforward: +The main code is straightforward, messages can be sent to the actor using `send` method: ```kotlin fun main(args: Array) = runBlocking { - val counter = counterActor() // create the actor + val counter = CountingActor() // create the actor + println("Preparing to send a lot of inc requests") massiveRun(CommonPool) { counter.send(IncCounter) } + // send a message to get a counter value from an actor val response = CompletableDeferred() counter.send(GetCounter(response)) println("Counter = ${response.await()}") - counter.close() // shutdown the actor + counter.close() + counter.join() // shutdown the actor and wait for it } ``` -> You can get full code [here](core/kotlinx-coroutines-core/test/guide/example-sync-07.kt) +> You can get full code [here](core/kotlinx-coroutines-core/test/guide/example-actors-01.kt) -It does not matter (for correctness) what context the actor itself is executed in. An actor is +Note that the same actor, but without `onStart` handler, can be implemented using more compact [actor] builder: +```kolin +fun createCountingActor(): TypedActor = actor { message -> + when (message) { + is IncCounter -> counter++ + is GetCounter -> message.response.complete(counter) + } +} +``` + + `counter` is not protected by any synchronization, because actor processes messages sequentially one by one. +It does not matter for correctness what context the actor itself is executed in. An actor is a coroutine and a coroutine is executed sequentially, so confinement of the state to the specific coroutine works as a solution to the problem of shared mutable state. Indeed, actors may modify their own private state, but can only affect each other through messages (avoiding the need for any locks). Actor is more efficient than locking under load, because in this case it always has work to do and it does not have to switch to a different context at all. -> Note, that an [actor] coroutine builder is a dual of [produce] coroutine builder. An actor is associated - with the channel that it receives messages from, while a producer is associated with the channel that it - sends elements to. +### Typesafe actors +What is usually counterintuitive about Actor-based API is the absence of meaningful method names and requirement to introduce `sealed` classes for every actor +which processes more than one type of messages. And in any real-world project it's very likely that any actor can process more than only one kind of requests. +For this purposes we have [Actor] base class. With [Actor], any method with any arguments can act like `send` method of `TypedActor`, the only requirement is to define methods +using [act][Actor.act]. + +Let's implement the same `CountingActor` as in the previous example; + + + +```kotlin +class CountingActor : Actor() { + + private var counter: Int = 0 + + override suspend fun onStart() { + println("CountingActor started") + } + + suspend fun increment() = act { // <- note act {} extension + counter++ + } + + suspend fun counter(response: CompletableDeferred) = act { + response.complete(counter) + } +} + +fun main(args: Array) = runBlocking { + val counter = CountingActor() // create the actor + println("Preparing to send a lot of inc requests") + massiveRun(CommonPool) { + counter.increment() + } + + val response = CompletableDeferred() + counter.counter(response) + println("Counter = ${response.await()}") + counter.close() + counter.join() // shutdown the actor and wait for it +} +``` + +> You can get full code [here](core/kotlinx-coroutines-core/test/guide/example-actors-02.kt) + + + +The idea behind `act` is to put to the actor mailbox not the arguments of the message, but `act`'s lambda with all captured method parameters. From API behaviour standpoint, `Actor` behave the same way as +`TypedActor` -- all methods are invoked in the context of the actor sequentially. + +### Actors API +From previous paragraphs it may look like actor API is very limited. But both [Actor] and [TypedActor] come with a lot of handy parameters, such as underlying channel capacity, +context to be executed in, start and close hooks. The whole description of extension points and configuration parameters can be found in [ActorTraits] interface and constructors of corresponding classes. +Actors are started lazily and always have an underlying [Job] associated with it, available in their `coroutineContext`, thus actors actors can transparently launch more actors +or jobs (even in another context), which will be organized into parent-child relationship. + + ## Select expression Select expression makes it possible to await multiple suspending functions simultaneously and _select_ @@ -2515,4 +2603,9 @@ Channel was closed [SendChannel.onSend]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/on-send.html [select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/select.html + +[TypedActor]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.actors/-typed-actor/index.html +[Actor]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.actors/-actor/index.html +[Actor.act]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.actors/-actor/act.html +[ActorTraits]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.actors/-actor-traits/index.html