Skip to content

Commit e02b9b6

Browse files
committed
First prototype
1 parent cb09b95 commit e02b9b6

File tree

3 files changed

+200
-6
lines changed

3 files changed

+200
-6
lines changed
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the SwiftAWSLambdaRuntime open source project
4+
//
5+
// Copyright (c) 2022 Apple Inc. and the SwiftAWSLambdaRuntime project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
import Foundation
15+
import NIOConcurrencyHelpers
16+
import NIOCore
17+
import Logging
18+
19+
/// A container that allows tasks to finish after a synchronous invocation
20+
/// has produced its response.
21+
public final class DetachedTasksContainer {
22+
23+
struct Context {
24+
let eventLoop: EventLoop
25+
let logger: Logger
26+
}
27+
28+
private var context: Context
29+
private var storage: Storage
30+
31+
init(context: Context) {
32+
self.storage = Storage()
33+
self.context = context
34+
}
35+
36+
/// Adds a detached task that runs on the given event loop.
37+
///
38+
/// - Parameters:
39+
/// - name: The name of the task.
40+
/// - task: The task to execute. It receives an `EventLoop` and returns an `EventLoopFuture<Void>`.
41+
/// - Returns: A `RegistrationKey` for the registered task.
42+
@discardableResult
43+
public func detached(name: String, task: @escaping (EventLoop) -> EventLoopFuture<Void>) -> RegistrationKey {
44+
let key = RegistrationKey()
45+
let task = task(context.eventLoop).always { _ in
46+
self.storage.remove(key)
47+
}
48+
self.storage.add(key: key, name: name, task: task)
49+
return key
50+
}
51+
52+
/// Adds a detached async task.
53+
///
54+
/// - Parameters:
55+
/// - name: The name of the task.
56+
/// - task: The async task to execute.
57+
/// - Returns: A `RegistrationKey` for the registered task.
58+
@discardableResult
59+
public func detached(name: String, task: @Sendable @escaping () async throws -> Void) -> RegistrationKey {
60+
let key = RegistrationKey()
61+
let promise = context.eventLoop.makePromise(of: Void.self)
62+
promise.completeWithTask(task)
63+
let task = promise.futureResult.always { result in
64+
switch result {
65+
case .success:
66+
break
67+
case .failure(let failure):
68+
self.context.logger.warning(
69+
"Execution of detached task failed with error.",
70+
metadata: [
71+
"taskName": "\(name)",
72+
"error": "\(failure)"
73+
]
74+
)
75+
}
76+
self.storage.remove(key)
77+
}
78+
self.storage.add(key: key, name: name, task: task)
79+
return key
80+
}
81+
82+
/// Informs the runtime that the specified task should not be awaited anymore.
83+
///
84+
/// - Warning: This method does not actually stop the execution of the
85+
/// detached task, it only prevents the runtime from waiting for it before
86+
/// `/next` is invoked.
87+
///
88+
/// - Parameter key: The `RegistrationKey` of the task to cancel.
89+
public func unsafeCancel(_ key: RegistrationKey) {
90+
// To discuss:
91+
// Canceling the execution doesn't seem to be an easy
92+
// task https://github.com/apple/swift-nio/issues/2087
93+
//
94+
// While removing the handler will allow the runtime
95+
// to invoke `/next` without actually awaiting for the
96+
// task to complete, it does not actually cancel
97+
// the execution of the dispatched task.
98+
// Since this is a bit counter-intuitive, we might not
99+
// want this method to exist at all.
100+
self.storage.remove(key)
101+
}
102+
103+
/// Awaits all registered tasks to complete.
104+
///
105+
/// - Returns: An `EventLoopFuture<Void>` that completes when all tasks have finished.
106+
internal func awaitAll() -> EventLoopFuture<Void> {
107+
let tasks = storage.tasks
108+
if tasks.isEmpty {
109+
return context.eventLoop.makeSucceededVoidFuture()
110+
} else {
111+
return EventLoopFuture.andAllComplete(tasks.map(\.value.task), on: context.eventLoop).flatMap {
112+
self.awaitAll()
113+
}
114+
}
115+
}
116+
}
117+
118+
extension DetachedTasksContainer {
119+
/// Lambda detached task registration key.
120+
public struct RegistrationKey: Hashable, CustomStringConvertible {
121+
var value: String
122+
123+
init() {
124+
// UUID basically
125+
self.value = UUID().uuidString
126+
}
127+
128+
public var description: String {
129+
self.value
130+
}
131+
}
132+
}
133+
134+
extension DetachedTasksContainer {
135+
fileprivate final class Storage {
136+
private let lock: NIOLock
137+
138+
private var map: [RegistrationKey: (name: String, task: EventLoopFuture<Void>)]
139+
140+
init() {
141+
self.lock = .init()
142+
self.map = [:]
143+
}
144+
145+
func add(key: RegistrationKey, name: String, task: EventLoopFuture<Void>) {
146+
self.lock.withLock {
147+
self.map[key] = (name: name, task: task)
148+
}
149+
}
150+
151+
func remove(_ key: RegistrationKey) {
152+
self.lock.withLock {
153+
self.map[key] = nil
154+
}
155+
}
156+
157+
var tasks: [RegistrationKey: (name: String, task: EventLoopFuture<Void>)] {
158+
self.lock.withLock {
159+
self.map
160+
}
161+
}
162+
}
163+
}
164+
165+
// Ideally this would not be @unchecked Sendable, but Sendable checks do not understand locks
166+
// We can transition this to an actor once we drop support for older Swift versions
167+
extension DetachedTasksContainer: @unchecked Sendable {}
168+
extension DetachedTasksContainer.Storage: @unchecked Sendable {}
169+
extension DetachedTasksContainer.RegistrationKey: Sendable {}

Sources/AWSLambdaRuntimeCore/LambdaContext.swift

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ public struct LambdaContext: CustomDebugStringConvertible, Sendable {
8181
let logger: Logger
8282
let eventLoop: EventLoop
8383
let allocator: ByteBufferAllocator
84+
let tasks: DetachedTasksContainer
8485

8586
init(
8687
requestID: String,
@@ -91,7 +92,8 @@ public struct LambdaContext: CustomDebugStringConvertible, Sendable {
9192
clientContext: String?,
9293
logger: Logger,
9394
eventLoop: EventLoop,
94-
allocator: ByteBufferAllocator
95+
allocator: ByteBufferAllocator,
96+
tasks: DetachedTasksContainer
9597
) {
9698
self.requestID = requestID
9799
self.traceID = traceID
@@ -102,6 +104,7 @@ public struct LambdaContext: CustomDebugStringConvertible, Sendable {
102104
self.logger = logger
103105
self.eventLoop = eventLoop
104106
self.allocator = allocator
107+
self.tasks = tasks
105108
}
106109
}
107110

@@ -158,6 +161,10 @@ public struct LambdaContext: CustomDebugStringConvertible, Sendable {
158161
public var allocator: ByteBufferAllocator {
159162
self.storage.allocator
160163
}
164+
165+
public var tasks: DetachedTasksContainer {
166+
self.storage.tasks
167+
}
161168

162169
init(requestID: String,
163170
traceID: String,
@@ -177,7 +184,13 @@ public struct LambdaContext: CustomDebugStringConvertible, Sendable {
177184
clientContext: clientContext,
178185
logger: logger,
179186
eventLoop: eventLoop,
180-
allocator: allocator
187+
allocator: allocator,
188+
tasks: DetachedTasksContainer(
189+
context: DetachedTasksContainer.Context(
190+
eventLoop: eventLoop,
191+
logger: logger
192+
)
193+
)
181194
)
182195
}
183196

@@ -209,7 +222,13 @@ public struct LambdaContext: CustomDebugStringConvertible, Sendable {
209222
deadline: .now() + timeout,
210223
logger: logger,
211224
eventLoop: eventLoop,
212-
allocator: ByteBufferAllocator()
225+
allocator: ByteBufferAllocator(),
226+
tasks: DetachedTasksContainer(
227+
context: DetachedTasksContainer.Context(
228+
eventLoop: eventLoop,
229+
logger: logger
230+
)
231+
)
213232
)
214233
}
215234
}

Sources/AWSLambdaRuntimeCore/LambdaRunner.swift

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,13 +95,19 @@ internal final class LambdaRunner {
9595
if case .failure(let error) = result {
9696
logger.warning("lambda handler returned an error: \(error)")
9797
}
98-
return (invocation, result)
98+
return (invocation, result, context)
9999
}
100-
}.flatMap { invocation, result in
100+
}.flatMap { invocation, result, context in
101101
// 3. report results to runtime engine
102102
self.runtimeClient.reportResults(logger: logger, invocation: invocation, result: result).peekError { error in
103103
logger.error("could not report results to lambda runtime engine: \(error)")
104-
}
104+
// To discuss:
105+
// Do we want to await the tasks in this case?
106+
return context.tasks.awaitAll()
107+
}.map { _ in context }
108+
}
109+
.flatMap { context in
110+
context.tasks.awaitAll()
105111
}
106112
}
107113

0 commit comments

Comments
 (0)