Skip to content

Introduce typesafe actors abstraction #485

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,39 @@ public final class kotlinx/coroutines/experimental/YieldKt {
public static final fun yield (Lkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object;
}

public abstract class kotlinx/coroutines/experimental/actors/Actor {
public fun <init> ()V
public fun <init> (Lkotlin/coroutines/experimental/CoroutineContext;Lkotlinx/coroutines/experimental/Job;Lkotlinx/coroutines/experimental/CoroutineStart;I)V
public synthetic fun <init> (Lkotlin/coroutines/experimental/CoroutineContext;Lkotlinx/coroutines/experimental/Job;Lkotlinx/coroutines/experimental/CoroutineStart;IILkotlin/jvm/internal/DefaultConstructorMarker;)V
protected final fun act (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object;
}

public abstract class kotlinx/coroutines/experimental/actors/ActorTraits {
public fun <init> ()V
public abstract fun cancel ()V
public abstract fun close ()V
public abstract fun getJob ()Lkotlinx/coroutines/experimental/Job;
public final fun join (Lkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object;
protected fun onClose ()V
protected fun onStart (Lkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object;
}

public final class kotlinx/coroutines/experimental/actors/ActorsKt {
public static final fun actor (Lkotlin/coroutines/experimental/CoroutineContext;Lkotlinx/coroutines/experimental/Job;Lkotlinx/coroutines/experimental/CoroutineStart;ILkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/experimental/actors/TypedActor;
public static final fun actor (Lkotlin/coroutines/experimental/CoroutineContext;Lkotlinx/coroutines/experimental/actors/ActorTraits;Lkotlinx/coroutines/experimental/CoroutineStart;ILkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/experimental/actors/TypedActor;
public static synthetic fun actor$default (Lkotlin/coroutines/experimental/CoroutineContext;Lkotlinx/coroutines/experimental/Job;Lkotlinx/coroutines/experimental/CoroutineStart;ILkotlin/jvm/functions/Function3;ILjava/lang/Object;)Lkotlinx/coroutines/experimental/actors/TypedActor;
public static synthetic fun actor$default (Lkotlin/coroutines/experimental/CoroutineContext;Lkotlinx/coroutines/experimental/actors/ActorTraits;Lkotlinx/coroutines/experimental/CoroutineStart;ILkotlin/jvm/functions/Function3;ILjava/lang/Object;)Lkotlinx/coroutines/experimental/actors/TypedActor;
}

public abstract class kotlinx/coroutines/experimental/actors/TypedActor {
public fun <init> ()V
public fun <init> (Lkotlin/coroutines/experimental/CoroutineContext;Lkotlinx/coroutines/experimental/Job;Lkotlinx/coroutines/experimental/CoroutineStart;I)V
public synthetic fun <init> (Lkotlin/coroutines/experimental/CoroutineContext;Lkotlinx/coroutines/experimental/Job;Lkotlinx/coroutines/experimental/CoroutineStart;IILkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun offer (Ljava/lang/Object;)Z
protected abstract fun receive (Ljava/lang/Object;Lkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object;
public final fun send (Ljava/lang/Object;Lkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object;
}

public abstract class kotlinx/coroutines/experimental/channels/AbstractChannel : kotlinx/coroutines/experimental/channels/AbstractSendChannel, kotlinx/coroutines/experimental/channels/Channel {
public fun <init> ()V
public fun cancel (Ljava/lang/Throwable;)Z
Expand Down
76 changes: 76 additions & 0 deletions core/kotlinx-coroutines-core/src/actors/AbstractActor.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.experimental.actors

import kotlinx.atomicfu.*
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.*
import kotlin.coroutines.experimental.*

/**
* Base class for actors implementation, which provides implementation for [ActorTraits]
* This class is not designed to be extended outside of kotlinx.coroutines, so it's internal
*
* @param T type of messages which are stored in the mailbox
*/
internal abstract class AbstractActor<T>(
context: CoroutineContext = DefaultDispatcher,
parent: Job? = null,
start: CoroutineStart = CoroutineStart.LAZY,
channelCapacity: Int = 16
) : ActorTraits() {

internal val mailbox = Channel<T>(channelCapacity)
public final override val job: Job = launch(context, start, parent) { actorLoop() }

/*
* Guard for onClose.
* It's necessary to invoke onClose in the end of actor body even when we have job completion:
* if actor decides to decompose its work, then onClose should be called *before* actor's body end,
* otherwise delegated work will never be closed, because job completion will await all created children
* to complete
*/
private val onCloseInvoked = atomic(0)

// Save an allocation
private inner class OnCloseNode : JobNode<Job>(job) {
override fun invoke(cause: Throwable?) {
if (onCloseInvoked.compareAndSet(0, 1)) {
onClose()
}
}
}

init {
job.invokeOnCompletion(OnCloseNode())
}

public override fun close() {
mailbox.close()
}

public override fun cancel() {
job.cancel()
mailbox.cancel()
}

private suspend fun actorLoop() {
try {
onStart()
for (message in mailbox) {
onMessage(message)
}
} catch (e: Throwable) {
handleCoroutineException(coroutineContext, e)
} finally {
mailbox.close()
if (onCloseInvoked.compareAndSet(0, 1)) {
onClose()
}
}
}

internal abstract suspend fun onMessage(message: T)
}
65 changes: 65 additions & 0 deletions core/kotlinx-coroutines-core/src/actors/Actor.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.experimental.actors

import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.*
import kotlin.coroutines.experimental.*

/**
* [Actor] is the base for all stateful actors, who have to process more than one type of messages.
* [Actor] has well-defined lifecycle described in [ActorTraits].
*
* To declare message handler, actor should have methods declared using [act],
* which are used to send message "Send message which handler invokes `act` body"
*
* Example, where the actor asynchronously processes two types of messages:
* ```
* class ExampleActor : Actor() {
*
* suspend fun sendInt(number: Int) = act {
* println("Received $number")
* }
*
* suspend fun sendString(string: String) = act {
* println("Received $string")
* }
* }
*
*
* // Sender
* exampleActor.sendInt(42)
* ```
*
* @param context context in which actor's job will be launched
* @param parent optional parent of actor's job
* @param start start mode of actor's job
* @param channelCapacity capacity of actor's mailbox aka maximum count of pending messages
*/
@Suppress("EXPOSED_SUPER_CLASS")
abstract class Actor(
context: CoroutineContext = DefaultDispatcher,
parent: Job? = null,
start: CoroutineStart = CoroutineStart.LAZY,
channelCapacity: Int = 16
) : AbstractActor<suspend () -> Unit>(context, parent, start, channelCapacity) {

/**
* Schedules [block] as a message to the actor mailbox.
* All messages sent via [act] will be processed sequentially in the actor context.
* Act semantics is equivalent to sending lambda to channel with receiver, which invokes
* all sent lambdas.
*
* @throws ClosedSendChannelException if actor is [closed][close]
*/
protected suspend fun act(block: suspend () -> Unit) {
job.start()
mailbox.send(block)
}

internal override suspend fun onMessage(message: suspend () -> Unit) {
message()
}
}
82 changes: 82 additions & 0 deletions core/kotlinx-coroutines-core/src/actors/ActorTraits.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.experimental.actors

import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.*
import kotlin.coroutines.experimental.*

/**
* Actor traits, common for [Actor] and [TypedActor].
* Simply speaking, actor is a high-level abstraction for [channel][ReceiveChannel] and coroutine, which
* sequentially processes messages from the channel.
*
* Actors are inspired by the Actor Model: <a href="http://en.wikipedia.org/wiki/Actor_model">http://en.wikipedia.org/wiki/Actor_model</a>,
* but have slightly different semantics to expose type-safety over address transparency.
*
* Every actor has a [Job] associated with it, which lifecycle is tightly bound with actor lifecycle.
*
* Any actor has well-defined lifecycle:
* -- Not started. Note that by default actors are started [lazily][CoroutineStart.LAZY]
* -- Active. Actor is running and processing incoming messages
* -- Closing. Actor's channel is closed for new messages, but actor is processing all pending messages,
* then invokes [onClose]. Can be triggered by [close] call
* -- Closed. Actor and all its children (both actors and launched jobs) are completed, [job] is completed.
* -- Cancelled. Actor's channel is closed for new messages, its job is cancelled, pending messages are not processed and
* hang in the channel, [onClose] is invoked. Can be triggered by [cancel] call
*
* Note:
* [ActorTraits] doesn't have any variations of `send` method, because different implementations
* have different ways to expose mailbox to provide static typing.
*/
abstract class ActorTraits {

/**
* Job identifying current actor and available from its [coroutineContext]
*
* Lifecycle:
* If job is cancelled, actor is effectively killed
* If actor is closed, job is completed as soon as all messages are processed and all launched children are completed
* If actor is cancelled, job is cancelled immediately
*/
public abstract val job: Job

/**
* Close the actor and its channel.
* Before closing, the actor processes all pending messages and calls [onClose]
*/
public abstract fun close()

/**
* Cancel the actor and its channel without letting the actor to process pending messages.
* This is a last ditch way to stop the actor which shouldn't be used normally.
* It's guaranteed that [onClose] will be called.
*/
public abstract fun cancel()

/**
* Handler which is invoked when actor is started.
* Actor is started according to its [start mode][CoroutineStart].
* This method will not be invoked is actor is started lazily and is cancelled before receiving any messages.
* If [onStart] throws an exception, actor is immediately [cancelled][cancel].
*/
protected open suspend fun onStart() {}

/**
* Handler which is invoked when actor is being closed or killed.
* It's guaranteed that on the moment of invocation no more messages will be processed by the actor
* and no more messages can be sent.
* This handler is invoked even if actor wasn't started to properly cleanup resources owned by the actor.
*
* Handler is invoked before associated [job] is completed or cancelled to allow graceful shutdown
* and ability to shutdown child tasks.
*/
protected open fun onClose() {}

/**
* Waits until the actor is completed or cancelled
*/
public suspend fun join(): Unit = job.join()
}
49 changes: 49 additions & 0 deletions core/kotlinx-coroutines-core/src/actors/Actors.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.experimental.actors

import kotlinx.coroutines.experimental.*
import kotlin.coroutines.experimental.*


/**
* Creates a new [TypedActor] with given [block] as [message handler][TypedActor.receive]
*
* @param context context in which actor's job will be launched
* @param parent optional parent of actor's job
* @param start start mode of actor's job
* @param channelCapacity capacity of actor's mailbox aka maximum count of pending messages
* @param block actor's message handler
*/
public fun <T> actor(
context: CoroutineContext = DefaultDispatcher,
parent: ActorTraits,
start: CoroutineStart = CoroutineStart.LAZY,
channelCapacity: Int = 16, block: suspend TypedActor<T>.(T) -> Unit
): TypedActor<T> {
return actor(context, parent.job, start, channelCapacity, block)
}

/**
* Creates a new [TypedActor] with given [block] as [message handler][TypedActor.receive]
*
* @param context context in which actor's job will be launched
* @param parent optional parent of actor's job
* @param start start mode of actor's job
* @param channelCapacity capacity of actor's mailbox aka maximum count of pending messages
* @param block actor's message handler
*/
public fun <T> actor(
context: CoroutineContext = DefaultDispatcher,
parent: Job? = null,
start: CoroutineStart = CoroutineStart.LAZY,
channelCapacity: Int = 16, block: suspend TypedActor<T>.(T) -> Unit
): TypedActor<T> {
return object : TypedActor<T>(context, parent, start, channelCapacity) {
override suspend fun receive(message: T) {
block(message)
}
}
}
80 changes: 80 additions & 0 deletions core/kotlinx-coroutines-core/src/actors/TypedActor.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.experimental.actors

import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.*
import kotlin.coroutines.experimental.*

/**
* [TypedActor] is the base for all stateful actors, which can process only one [type][T] of messages.
* [TypedActor] has well-defined lifecycle described in [ActorTraits].
* [TypedActor.receive] method is used to declare a message handler, which is parametrized by [T]
* to provide better compile-type safety.
*
* Example:
* ```
* class ExampleActor : TypedActor<String>() {
*
* override suspend fun receive(string: String) = act {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Except I still need to use act right?

No, TypedActor doesn't have that method

Then I guess this example is wrong and act is not needed here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, documentation leftover

* println("Received $string")
* }
* }
*
* // Sender
* exampleActor.send("foo")
* ```
*
* @param T type of the message this actor can handle
* @param context context in which actor's job will be launched
* @param parent optional parent of actor's job
* @param start start mode of actor's job
* @param channelCapacity capacity of actor's mailbox aka maximum count of pending messages
*/
@Suppress("EXPOSED_SUPER_CLASS")
abstract class TypedActor<T>(
context: CoroutineContext = DefaultDispatcher,
parent: Job? = null,
start: CoroutineStart = CoroutineStart.LAZY,
channelCapacity: Int = 16
) : AbstractActor<T>(context, parent, start, channelCapacity) {


/**
* Sends the message to the actor, which later will be sequentially processed by [receive].
* Sender is suspended, if actor's channel capacity is reached. This suspension is cancellable
* and has semantics similar to [SendChannel.send]
*
* @throws ClosedSendChannelException if actor is [closed][close]
*/
suspend fun send(message: T) {
job.start()
mailbox.send(message)
}

/**
* Attempts to send message to the actor, which later will be sequentially processed by [receive].
* Attempt is successful if actor's channel capacity restriction is not violated.
* This method is intended to be used from synchronous callbacks with [Channel.UNLIMITED]
*
* @throws ClosedSendChannelException if actor is [closed][close]
* @return `true` if offer was successful, false otherwise
*/
fun offer(message: T): Boolean {
job.start()
return mailbox.offer(message)
}

/**
* Handler, which handles all received messages.
*
* @throws ClassCastException if actor was casted to raw type and [send] was invoked with wrong type of the argument
*/
protected abstract suspend fun receive(message: T)

internal override suspend fun onMessage(message: T) {
receive(message)
}
}
Loading