-
Notifications
You must be signed in to change notification settings - Fork 113
[Draft] Detached tasks #334
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
e02b9b6
31c7845
c6d766d
22483e0
224039a
7934a0f
2491f6f
a059ec5
9804c04
fb6e19e
b87bfed
888af29
c4aff5c
7e647e8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,169 @@ | ||
//===----------------------------------------------------------------------===// | ||
// | ||
// This source file is part of the SwiftAWSLambdaRuntime open source project | ||
// | ||
// Copyright (c) 2022 Apple Inc. and the SwiftAWSLambdaRuntime project authors | ||
// Licensed under Apache License v2.0 | ||
// | ||
// See LICENSE.txt for license information | ||
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors | ||
// | ||
// SPDX-License-Identifier: Apache-2.0 | ||
// | ||
//===----------------------------------------------------------------------===// | ||
import Foundation | ||
import NIOConcurrencyHelpers | ||
import NIOCore | ||
import Logging | ||
|
||
/// A container that allows tasks to finish after a synchronous invocation | ||
/// has produced its response. | ||
public final class DetachedTasksContainer { | ||
|
||
struct Context { | ||
let eventLoop: EventLoop | ||
let logger: Logger | ||
} | ||
|
||
private var context: Context | ||
private var storage: Storage | ||
|
||
init(context: Context) { | ||
self.storage = Storage() | ||
self.context = context | ||
} | ||
|
||
/// Adds a detached task that runs on the given event loop. | ||
/// | ||
/// - Parameters: | ||
/// - name: The name of the task. | ||
/// - task: The task to execute. It receives an `EventLoop` and returns an `EventLoopFuture<Void>`. | ||
/// - Returns: A `RegistrationKey` for the registered task. | ||
@discardableResult | ||
public func detached(name: String, task: @escaping (EventLoop) -> EventLoopFuture<Void>) -> RegistrationKey { | ||
Buratti marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let key = RegistrationKey() | ||
let task = task(context.eventLoop).always { _ in | ||
self.storage.remove(key) | ||
} | ||
self.storage.add(key: key, name: name, task: task) | ||
return key | ||
} | ||
|
||
/// Adds a detached async task. | ||
/// | ||
/// - Parameters: | ||
/// - name: The name of the task. | ||
/// - task: The async task to execute. | ||
/// - Returns: A `RegistrationKey` for the registered task. | ||
@discardableResult | ||
public func detached(name: String, task: @Sendable @escaping () async throws -> Void) -> RegistrationKey { | ||
Buratti marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let key = RegistrationKey() | ||
let promise = context.eventLoop.makePromise(of: Void.self) | ||
promise.completeWithTask(task) | ||
let task = promise.futureResult.always { result in | ||
switch result { | ||
case .success: | ||
break | ||
case .failure(let failure): | ||
self.context.logger.warning( | ||
"Execution of detached task failed with error.", | ||
metadata: [ | ||
"taskName": "\(name)", | ||
"error": "\(failure)" | ||
] | ||
) | ||
} | ||
self.storage.remove(key) | ||
} | ||
self.storage.add(key: key, name: name, task: task) | ||
return key | ||
} | ||
|
||
/// Informs the runtime that the specified task should not be awaited anymore. | ||
/// | ||
/// - Warning: This method does not actually stop the execution of the | ||
/// detached task, it only prevents the runtime from waiting for it before | ||
/// `/next` is invoked. | ||
/// | ||
/// - Parameter key: The `RegistrationKey` of the task to cancel. | ||
public func unsafeCancel(_ key: RegistrationKey) { | ||
Buratti marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// To discuss: | ||
// Canceling the execution doesn't seem to be an easy | ||
// task https://github.com/apple/swift-nio/issues/2087 | ||
// | ||
// While removing the handler will allow the runtime | ||
// to invoke `/next` without actually awaiting for the | ||
// task to complete, it does not actually cancel | ||
// the execution of the dispatched task. | ||
// Since this is a bit counter-intuitive, we might not | ||
// want this method to exist at all. | ||
self.storage.remove(key) | ||
} | ||
|
||
/// Awaits all registered tasks to complete. | ||
/// | ||
/// - Returns: An `EventLoopFuture<Void>` that completes when all tasks have finished. | ||
internal func awaitAll() -> EventLoopFuture<Void> { | ||
let tasks = storage.tasks | ||
if tasks.isEmpty { | ||
return context.eventLoop.makeSucceededVoidFuture() | ||
} else { | ||
return EventLoopFuture.andAllComplete(tasks.map(\.value.task), on: context.eventLoop).flatMap { | ||
self.awaitAll() | ||
} | ||
} | ||
} | ||
} | ||
|
||
extension DetachedTasksContainer { | ||
/// Lambda detached task registration key. | ||
public struct RegistrationKey: Hashable, CustomStringConvertible { | ||
Buratti marked this conversation as resolved.
Show resolved
Hide resolved
|
||
var value: String | ||
|
||
init() { | ||
// UUID basically | ||
self.value = UUID().uuidString | ||
} | ||
|
||
public var description: String { | ||
self.value | ||
} | ||
} | ||
} | ||
|
||
extension DetachedTasksContainer { | ||
fileprivate final class Storage { | ||
private let lock: NIOLock | ||
Buratti marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
private var map: [RegistrationKey: (name: String, task: EventLoopFuture<Void>)] | ||
|
||
init() { | ||
self.lock = .init() | ||
self.map = [:] | ||
} | ||
|
||
func add(key: RegistrationKey, name: String, task: EventLoopFuture<Void>) { | ||
self.lock.withLock { | ||
self.map[key] = (name: name, task: task) | ||
} | ||
} | ||
|
||
func remove(_ key: RegistrationKey) { | ||
self.lock.withLock { | ||
self.map[key] = nil | ||
} | ||
} | ||
|
||
var tasks: [RegistrationKey: (name: String, task: EventLoopFuture<Void>)] { | ||
self.lock.withLock { | ||
self.map | ||
} | ||
} | ||
} | ||
} | ||
|
||
// Ideally this would not be @unchecked Sendable, but Sendable checks do not understand locks | ||
// We can transition this to an actor once we drop support for older Swift versions | ||
extension DetachedTasksContainer: @unchecked Sendable {} | ||
extension DetachedTasksContainer.Storage: @unchecked Sendable {} | ||
extension DetachedTasksContainer.RegistrationKey: Sendable {} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We must make sure that all those types are There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe we can, I took inspiration from Terminator.swift for that part, but I'll change it. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -95,13 +95,19 @@ internal final class LambdaRunner { | |
if case .failure(let error) = result { | ||
logger.warning("lambda handler returned an error: \(error)") | ||
} | ||
return (invocation, result) | ||
return (invocation, result, context) | ||
} | ||
}.flatMap { invocation, result in | ||
}.flatMap { invocation, result, context in | ||
// 3. report results to runtime engine | ||
self.runtimeClient.reportResults(logger: logger, invocation: invocation, result: result).peekError { error in | ||
logger.error("could not report results to lambda runtime engine: \(error)") | ||
} | ||
// To discuss: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the only point that remains open @fabianfett. When the runtime fails to report a result to AWS Lambda, do we want to wait for the background tasks to complete before stopping the execution of the whole process? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the error case we should await all subtasks. |
||
// Do we want to await the tasks in this case? | ||
return context.tasks.awaitAll() | ||
}.map { _ in context } | ||
} | ||
.flatMap { context in | ||
context.tasks.awaitAll() | ||
} | ||
} | ||
|
||
|
Uh oh!
There was an error while loading. Please reload this page.