-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathCopyPast.kt
200 lines (158 loc) · 6.69 KB
/
CopyPast.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
package io.ktor.util.pipeline
import io.ktor.util.KtorDsl
import kotlinx.coroutines.CoroutineScope
import kotlin.coroutines.Continuation
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
import kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn
import kotlin.coroutines.jvm.internal.CoroutineStackFrame
import kotlin.coroutines.resumeWithException
// THIS IS JUST COPYPAST TO GET COMPILED WITH internal SuspendFunctionGun
internal typealias PipelineInterceptorFunction<TSubject, TContext> =
(PipelineContext<TSubject, TContext>, TSubject, Continuation<Unit>) -> Any?
internal object StackWalkingFailedFrame : CoroutineStackFrame, Continuation<Nothing> {
override val callerFrame: CoroutineStackFrame? get() = null
override fun getStackTraceElement(): StackTraceElement? {
return null
}
override val context: CoroutineContext
get() = EmptyCoroutineContext
override fun resumeWith(result: Result<Nothing>) {
""
}
}
@KtorDsl
public abstract class PipelineContext<TSubject : Any, TContext : Any>(
public val context: TContext
) : CoroutineScope {
/**
* Subject of this pipeline execution that goes along the pipeline
*/
public abstract var subject: TSubject
/**
* Finishes current pipeline execution
*/
public abstract fun finish()
/**
* Continues execution of the pipeline with the given subject
*/
public abstract suspend fun proceedWith(subject: TSubject): TSubject
/**
* Continues execution of the pipeline with the same subject
*/
public abstract suspend fun proceed(): TSubject
internal abstract suspend fun execute(initial: TSubject): TSubject
}
class SuspendFunctionGun<TSubject : Any, TContext : Any>(
initial: TSubject,
context: TContext,
private val blocks: List<PipelineInterceptorFunction<TSubject, TContext>>
) : io.ktor.util.pipeline.PipelineContext<TSubject, TContext>(context) {
override val coroutineContext: CoroutineContext get() = continuation.context
// this is impossible to inline because of property name clash
// between PipelineContext.context and Continuation.context
private val continuation: Continuation<Unit> = object : Continuation<Unit>, CoroutineStackFrame {
override val callerFrame: CoroutineStackFrame? get() = peekContinuation() as? CoroutineStackFrame
var currentIndex: Int = Int.MIN_VALUE
override fun getStackTraceElement(): StackTraceElement? = null
private fun peekContinuation(): Continuation<*>? {
if (currentIndex == Int.MIN_VALUE) currentIndex = lastSuspensionIndex
if (currentIndex < 0) {
currentIndex = Int.MIN_VALUE
return null
}
// this is only invoked by debug agent during job state probes
// lastPeekedIndex is non-volatile intentionally
// and the list of continuations is not synchronized too
// so this is not guaranteed to work properly (may produce incorrect trace),
// but the only we care is to not crash here
// and simply return StackWalkingFailedFrame on any unfortunate accident
try {
val result = suspensions[currentIndex] ?: return io.ktor.util.pipeline.StackWalkingFailedFrame
currentIndex -= 1
return result
} catch (_: Throwable) {
return io.ktor.util.pipeline.StackWalkingFailedFrame
}
}
override val context: CoroutineContext
get() = suspensions[lastSuspensionIndex]?.context ?: error("Not started")
override fun resumeWith(result: Result<Unit>) {
if (result.isFailure) {
resumeRootWith(Result.failure(result.exceptionOrNull()!!))
return
}
loop(false)
}
}
override var subject: TSubject = initial
private val suspensions: Array<Continuation<TSubject>?> = arrayOfNulls(blocks.size)
private var lastSuspensionIndex: Int = -1
private var index = 0
override fun finish() {
index = blocks.size
}
override suspend fun proceed(): TSubject = suspendCoroutineUninterceptedOrReturn { continuation ->
if (index == blocks.size) return@suspendCoroutineUninterceptedOrReturn subject
addContinuation(continuation)
if (loop(true)) {
discardLastRootContinuation()
return@suspendCoroutineUninterceptedOrReturn subject
}
COROUTINE_SUSPENDED
}
override suspend fun proceedWith(subject: TSubject): TSubject {
this.subject = subject
return proceed()
}
override suspend fun execute(initial: TSubject): TSubject {
index = 0
if (index == blocks.size) return initial
subject = initial
if (lastSuspensionIndex >= 0) throw IllegalStateException("Already started")
return proceed()
}
/**
* @return `true` if it is possible to return result immediately
*/
private fun loop(direct: Boolean): Boolean {
do {
val currentIndex = index // it is important to read index every time
if (currentIndex == blocks.size) {
if (!direct) {
resumeRootWith(Result.success(subject))
return false
}
return true
}
index = currentIndex + 1 // it is important to increase it before function invocation
val next = blocks[currentIndex]
try {
val result = next(this, subject, continuation)
if (result === COROUTINE_SUSPENDED) return false
} catch (cause: Throwable) {
resumeRootWith(Result.failure(cause))
return false
}
} while (true)
}
private fun resumeRootWith(result: Result<TSubject>) {
if (lastSuspensionIndex < 0) error("No more continuations to resume")
val next = suspensions[lastSuspensionIndex]!!
suspensions[lastSuspensionIndex--] = null
if (!result.isFailure) {
next.resumeWith(result)
} else {
// val exception = recoverStackTraceBridge(result.exceptionOrNull()!!, next)
next.resumeWithException(result.exceptionOrNull()!!)
}
}
private fun discardLastRootContinuation() {
if (lastSuspensionIndex < 0) throw IllegalStateException("No more continuations to resume")
suspensions[lastSuspensionIndex--] = null
}
private fun addContinuation(continuation: Continuation<TSubject>) {
suspensions[++lastSuspensionIndex] = continuation
}
}