-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
Copy pathBuilders.common.kt
260 lines (236 loc) · 11.5 KB
/
Builders.common.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:JvmMultifileClass
@file:JvmName("BuildersKt")
package kotlinx.coroutines.experimental
import kotlinx.coroutines.experimental.internal.*
import kotlinx.coroutines.experimental.intrinsics.*
import kotlin.coroutines.experimental.*
import kotlin.coroutines.experimental.intrinsics.*
// --------------- basic coroutine builders ---------------
/**
* Launches new coroutine without blocking current thread and returns a reference to the coroutine as a [Job].
* The coroutine is cancelled when the resulting job is [cancelled][Job.cancel].
*
* Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
* The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
* with corresponding [coroutineContext] element.
*
* By default, the coroutine is immediately scheduled for execution.
* Other start options can be specified via `start` parameter. See [CoroutineStart] for details.
* An optional [start] parameter can be set to [CoroutineStart.LAZY] to start coroutine _lazily_. In this case,
* the coroutine [Job] is created in _new_ state. It can be explicitly started with [start][Job.start] function
* and will be started implicitly on the first invocation of [join][Job.join].
*
* Uncaught exceptions in this coroutine cancel parent job in the context by default
* (unless [CoroutineExceptionHandler] is explicitly specified), which means that when `launch` is used with
* the context of another coroutine, then any uncaught exception leads to the cancellation of parent coroutine.
*
* See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
*
* @param context additional to [CoroutineScope.coroutineContext] context of the coroutine.
* @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].
* @param onCompletion optional completion handler for the coroutine (see [Job.invokeOnCompletion]).
* @param block the coroutine code which will be invoked in the context of the provided scope.
**/
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
onCompletion: CompletionHandler? = null,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
coroutine.start(start, coroutine, block)
return coroutine
}
/**
* Launches new coroutine without blocking current thread and returns a reference to the coroutine as a [Job].
* @suppress **Deprecated** Use [CoroutineScope.launch] instead.
*/
@Deprecated(
message = "Standalone coroutine builders are deprecated, use extensions on CoroutineScope instead",
replaceWith = ReplaceWith("GlobalScope.launch(context, start, onCompletion, block)", imports = ["kotlinx.coroutines.experimental.*"])
)
public fun launch(
context: CoroutineContext = Dispatchers.Default,
start: CoroutineStart = CoroutineStart.DEFAULT,
onCompletion: CompletionHandler? = null,
block: suspend CoroutineScope.() -> Unit
): Job =
GlobalScope.launch(context, start, onCompletion, block)
/**
* Launches new coroutine without blocking current thread and returns a reference to the coroutine as a [Job].
* @suppress **Deprecated** Use [CoroutineScope.launch] instead.
*/
@Deprecated(
message = "Standalone coroutine builders are deprecated, use extensions on CoroutineScope instead",
replaceWith = ReplaceWith("GlobalScope.launch(context + parent, start, onCompletion, block)", imports = ["kotlinx.coroutines.experimental.*"])
)
public fun launch(
context: CoroutineContext = Dispatchers.Default,
start: CoroutineStart = CoroutineStart.DEFAULT,
parent: Job? = null, // nullable for binary compatibility
onCompletion: CompletionHandler? = null,
block: suspend CoroutineScope.() -> Unit
): Job =
GlobalScope.launch(context + (parent ?: EmptyCoroutineContext), start, onCompletion, block)
/** @suppress **Deprecated**: Binary compatibility */
@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
public fun launch(
context: CoroutineContext = Dispatchers.Default,
start: CoroutineStart = CoroutineStart.DEFAULT,
parent: Job? = null,
block: suspend CoroutineScope.() -> Unit
): Job =
GlobalScope.launch(context + (parent ?: EmptyCoroutineContext), start, block = block)
/** @suppress **Deprecated**: Binary compatibility */
@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
public fun launch(
context: CoroutineContext = Dispatchers.Default,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job =
GlobalScope.launch(context, start, block = block)
/**
* @suppress **Deprecated**: Use `start = CoroutineStart.XXX` parameter
*/
@Deprecated(message = "Use `start = CoroutineStart.XXX` parameter",
replaceWith = ReplaceWith("launch(context, if (start) CoroutineStart.DEFAULT else CoroutineStart.LAZY, block)"))
public fun launch(context: CoroutineContext, start: Boolean, block: suspend CoroutineScope.() -> Unit): Job =
GlobalScope.launch(context, if (start) CoroutineStart.DEFAULT else CoroutineStart.LAZY, block = block)
/**
* Calls the specified suspending block with a given coroutine context, suspends until it completes, and returns
* the result.
*
* This function immediately applies dispatcher from the new context, shifting execution of the block into the
* different thread inside the block, and back when it completes.
* The specified [context] is added onto the current coroutine context for the execution of the block.
*
* An optional `start` parameter is used only if the specified `context` uses a different [CoroutineDispatcher] than
* a current one, otherwise it is ignored.
* By default, the coroutine is immediately scheduled for execution and can be cancelled
* while it is waiting to be executed and it can be cancelled while the result is scheduled
* to be processed by the invoker context.
* Other options can be specified via `start` parameter. See [CoroutineStart] for details.
* A value of [CoroutineStart.LAZY] is not supported and produces [IllegalArgumentException].
*/
public suspend fun <T> withContext(
context: CoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): T =
// todo: optimize fast-path to work without allocation (when there is a already a coroutine implementing scope)
withContextImpl(context, start) {
currentScope {
block()
}
}
// todo: optimize it to reduce allocations
private suspend fun <T> withContextImpl(
context: CoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend () -> T
): T = suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
val oldContext = uCont.context
// fast path #1 if there is no change in the actual context:
if (context === oldContext || context is CoroutineContext.Element && oldContext[context.key] === context)
return@sc block.startCoroutineUninterceptedOrReturn(uCont)
// compute new context
val newContext = oldContext + context
// fast path #2 if the result is actually the same
if (newContext === oldContext)
return@sc block.startCoroutineUninterceptedOrReturn(uCont)
// fast path #3 if the new dispatcher is the same as the old one.
// `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher)
if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
val newContinuation = RunContinuationUnintercepted(newContext, uCont)
// There are some other changes in the context, so this thread needs to be updated
withCoroutineContext(newContext) {
return@sc block.startCoroutineUninterceptedOrReturn(newContinuation)
}
}
// slowest path otherwise -- use new interceptor, sync to its result via a full-blown instance of RunCompletion
require(!start.isLazy) { "$start start is not supported" }
val completion = RunCompletion(
context = newContext,
delegate = uCont.intercepted(), // delegate to continuation intercepted with old dispatcher on completion
resumeMode = if (start == CoroutineStart.ATOMIC) MODE_ATOMIC_DEFAULT else MODE_CANCELLABLE
)
completion.initParentJobInternal(newContext[Job]) // attach to job
start(block, completion)
completion.getResult()
}
/** @suppress **Deprecated**: Binary compatibility */
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Binary compatibility")
@JvmName("withContext")
public suspend fun <T> withContext0(
context: CoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend () -> T
): T =
withContextImpl(context, start, block)
/** @suppress **Deprecated**: Renamed to [withContext]. */
@Deprecated(message = "Renamed to `withContext`", level=DeprecationLevel.WARNING,
replaceWith = ReplaceWith("withContext(context, start, block)"))
public suspend fun <T> run(
context: CoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend () -> T
): T =
withContextImpl(context, start, block)
/** @suppress **Deprecated** */
@Deprecated(message = "It is here for binary compatibility only", level=DeprecationLevel.HIDDEN)
public suspend fun <T> run(context: CoroutineContext, block: suspend () -> T): T =
withContextImpl(context, start = CoroutineStart.ATOMIC, block = block)
// --------------- implementation ---------------
private open class StandaloneCoroutine(
private val parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active) {
override fun hasOnFinishingHandler(update: Any?) = update is CompletedExceptionally
override fun handleJobException(exception: Throwable) {
handleCoroutineException(parentContext, exception, this)
}
override fun onFinishingInternal(update: Any?) {
if (update is CompletedExceptionally && update.cause !is CancellationException) {
parentContext[Job]?.cancel(update.cause)
}
}
}
private class LazyStandaloneCoroutine(
parentContext: CoroutineContext,
private val block: suspend CoroutineScope.() -> Unit
) : StandaloneCoroutine(parentContext, active = false) {
override fun onStart() {
block.startCoroutineCancellable(this, this)
}
}
private class RunContinuationUnintercepted<in T>(
override val context: CoroutineContext,
private val continuation: Continuation<T>
): Continuation<T> {
override fun resume(value: T) {
withCoroutineContext(continuation.context) {
continuation.resume(value)
}
}
override fun resumeWithException(exception: Throwable) {
withCoroutineContext(continuation.context) {
continuation.resumeWithException(exception)
}
}
}
@Suppress("UNCHECKED_CAST")
private class RunCompletion<in T>(
override val context: CoroutineContext,
delegate: Continuation<T>,
resumeMode: Int
) : AbstractContinuation<T>(delegate, resumeMode) {
override val useCancellingState: Boolean get() = true
}