Skip to content

Add new LambdaRuntime #353

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Sep 5, 2024
Merged
110 changes: 110 additions & 0 deletions Sources/AWSLambdaRuntime/Lambda+Codable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,113 @@ extension LambdaCodableAdapter {
)
}
}

extension NewLambdaRuntime {
/// Initialize an instance with a ``NewLambdaHandler`` defined in the form of a closure **with a non-`Void` return type**, an encoder, and a decoder.
/// - Parameter body: The handler in the form of a closure.
/// - Parameter encoder: The encoder object that will be used to encode the generic ``Output`` into a ``ByteBuffer``.
/// - Parameter decoder: The decoder object that will be used to decode the incoming ``ByteBuffer`` event into the generic ``Event`` type.
package convenience init<
Event: Decodable,
Output: Encodable,
Encoder: LambdaOutputEncoder,
Decoder: LambdaEventDecoder
>(
encoder: Encoder,
decoder: Decoder,
body: @escaping (Event, NewLambdaContext) async throws -> Output
)
where
Handler == LambdaCodableAdapter<
LambdaHandlerAdapter<Event, Output, ClosureHandler<Event, Output>>,
Event,
Output,
Decoder,
Encoder
>
{
let handler = LambdaCodableAdapter(
encoder: encoder,
decoder: decoder,
handler: LambdaHandlerAdapter(handler: ClosureHandler(body: body))
)

self.init(handler: handler)
}

/// Initialize an instance with a ``NewLambdaHandler`` defined in the form of a closure **with a `Void` return type**, an encoder, and a decoder.
/// - Parameter body: The handler in the form of a closure.
/// - Parameter encoder: The encoder object that will be used to encode the generic ``Output`` into a ``ByteBuffer``.
/// - Parameter decoder: The decoder object that will be used to decode the incoming ``ByteBuffer`` event into the generic ``Event`` type.
package convenience init<Event: Decodable, Decoder: LambdaEventDecoder>(
decoder: Decoder,
body: @escaping (Event, NewLambdaContext) async throws -> Void
)
where
Handler == LambdaCodableAdapter<
LambdaHandlerAdapter<Event, Void, ClosureHandler<Event, Void>>,
Event,
Void,
Decoder,
VoidEncoder
>
{
let handler = LambdaCodableAdapter(
decoder: decoder,
handler: LambdaHandlerAdapter(handler: ClosureHandler(body: body))
)

self.init(handler: handler)
}

/// Initialize an instance with a ``NewLambdaHandler`` defined in the form of a closure **with a non-`Void` return type**.
/// - Parameter body: The handler in the form of a closure.
/// - Parameter encoder: The encoder object that will be used to encode the generic ``Output`` into a ``ByteBuffer``. ``JSONEncoder()`` used as default.
/// - Parameter decoder: The decoder object that will be used to decode the incoming ``ByteBuffer`` event into the generic ``Event`` type. ``JSONDecoder()`` used as default.
package convenience init<Event: Decodable, Output>(
body: @escaping (Event, NewLambdaContext) async throws -> Output,
encoder: JSONEncoder = JSONEncoder(),
decoder: JSONDecoder = JSONDecoder()
)
where
Handler == LambdaCodableAdapter<
LambdaHandlerAdapter<Event, Output, ClosureHandler<Event, Output>>,
Event,
Output,
JSONDecoder,
LambdaJSONOutputEncoder<Output>
>
{
let handler = LambdaCodableAdapter(
encoder: encoder,
decoder: decoder,
handler: LambdaHandlerAdapter(handler: ClosureHandler(body: body))
)

self.init(handler: handler)
}

/// Initialize an instance with a ``NewLambdaHandler`` defined in the form of a closure **with a `Void` return type**.
/// - Parameter body: The handler in the form of a closure.
/// - Parameter decoder: The decoder object that will be used to decode the incoming ``ByteBuffer`` event into the generic ``Event`` type. ``JSONDecoder()`` used as default.
package convenience init<Event: Decodable>(
body: @escaping (Event, NewLambdaContext) async throws -> Void,
decoder: JSONDecoder = JSONDecoder()
)
where
Handler == LambdaCodableAdapter<
LambdaHandlerAdapter<Event, Void, ClosureHandler<Event, Void>>,
Event,
Void,
JSONDecoder,
VoidEncoder
>
{
let handler = LambdaCodableAdapter(
decoder: decoder,
handler: LambdaHandlerAdapter(handler: ClosureHandler(body: body))
)

self.init(handler: handler)
}
}
10 changes: 10 additions & 0 deletions Sources/AWSLambdaRuntimeCore/NewLambda+JSON.swift
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,13 @@ where Output == Encoder.Output {
try await self.underlyingStreamWriter.writeAndFinish(outputBuffer)
}
}

extension NewLambdaRuntime {
/// Initialize an instance with a ``StreamingLambdaHandler`` in the form of a closure.
/// - Parameter body: The handler in the form of a closure.
package convenience init(
body: @Sendable @escaping (ByteBuffer, LambdaResponseStreamWriter, NewLambdaContext) async throws -> Void
) where Handler == StreamingClosureHandler {
self.init(handler: StreamingClosureHandler(body: body))
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be in the handler file I guess.

4 changes: 4 additions & 0 deletions Sources/AWSLambdaRuntimeCore/NewLambda.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import Dispatch
import Logging
import NIOCore

extension Lambda {
package static func runLoop<RuntimeClient: LambdaRuntimeClientProtocol, Handler>(
Expand Down Expand Up @@ -44,4 +45,7 @@ extension Lambda {
}
}
}

/// The default EventLoop the Lambda is scheduled on.
package static var defaultEventLoop: any EventLoop = NIOSingletons.posixEventLoopGroup.next()
}
68 changes: 68 additions & 0 deletions Sources/AWSLambdaRuntimeCore/NewLambdaRuntime.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2024 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Foundation
//import ServiceLifecycle
import Logging
import NIOCore
import Synchronization

package final class NewLambdaRuntime<Handler>: Sendable where Handler: StreamingLambdaHandler {
let handlerMutex: Mutex<Handler?>
let logger: Logger
let eventLoop: EventLoop

package init(
handler: sending Handler,
eventLoop: EventLoop = Lambda.defaultEventLoop,
logger: Logger = Logger(label: "LambdaRuntime")
) {
self.handlerMutex = Mutex(handler)
self.eventLoop = eventLoop
self.logger = logger
}

package func run() async throws {
guard let runtimeEndpoint = Lambda.env("AWS_LAMBDA_RUNTIME_API") else {
throw NewLambdaRuntimeError(code: .missingLambdaRuntimeAPIEnvironmentVariable)
}

let ipAndPort = runtimeEndpoint.split(separator: ":", maxSplits: 1)
let ip = String(ipAndPort[0])
guard let port = Int(ipAndPort[1]) else { throw NewLambdaRuntimeError(code: .invalidPort) }

let handler = self.handlerMutex.withLock { maybeHandler in
defer {
maybeHandler = nil
}
return maybeHandler
}

guard let handler else {
throw NewLambdaRuntimeError(code: .runtimeCanOnlyBeStartedOnce)
}

try await NewLambdaRuntimeClient.withRuntimeClient(
configuration: .init(ip: ip, port: port),
eventLoop: self.eventLoop,
logger: self.logger
) { runtimeClient in
try await Lambda.runLoop(
runtimeClient: runtimeClient,
handler: handler,
logger: self.logger
)
}
}
}
55 changes: 30 additions & 25 deletions Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol {
NIOHTTPClientResponseAggregator(maxContentLength: 6 * 1024 * 1024)
)
try channel.pipeline.syncOperations.addHandler(
LambdaChannelHandler(delegate: self, logger: self.logger)
LambdaChannelHandler(delegate: self, logger: self.logger, configuration: self.configuration)
)
return channel.eventLoop.makeSucceededFuture(())
} catch {
Expand Down Expand Up @@ -433,10 +433,33 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
private var reusableErrorBuffer: ByteBuffer?
private let logger: Logger
private let delegate: Delegate
private let configuration: NewLambdaRuntimeClient.Configuration

init(delegate: Delegate, logger: Logger) {
/// These are the default headers that must be sent along an invocation
let defaultHeaders: HTTPHeaders
/// These headers must be sent along an invocation or initialization error report
let errorHeaders: HTTPHeaders
/// These headers must be sent when streaming a response
let streamingHeaders: HTTPHeaders

init(delegate: Delegate, logger: Logger, configuration: NewLambdaRuntimeClient.Configuration) {
self.delegate = delegate
self.logger = logger
self.configuration = configuration
self.defaultHeaders = [
"host": "\(self.configuration.ip):\(self.configuration.port)",
"user-agent": "Swift-Lambda/Unknown",
]
self.errorHeaders = [
"host": "\(self.configuration.ip):\(self.configuration.port)",
"user-agent": "Swift-Lambda/Unknown",
"lambda-runtime-function-error-type": "Unhandled",
]
self.streamingHeaders = [
"host": "\(self.configuration.ip):\(self.configuration.port)",
"user-agent": "Swift-Lambda/Unknown",
"transfer-encoding": "chunked",
]
}

func nextInvocation(isolation: isolated (any Actor)? = #isolation) async throws -> Invocation {
Expand Down Expand Up @@ -578,7 +601,7 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
version: .http1_1,
method: .POST,
uri: url,
headers: NewLambdaRuntimeClient.streamingHeaders
headers: self.streamingHeaders
)

context.write(self.wrapOutboundOut(.head(httpRequest)), promise: nil)
Expand All @@ -604,11 +627,12 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
let headers: HTTPHeaders =
if byteBuffer?.readableBytes ?? 0 < 6_000_000 {
[
"host": "\(self.configuration.ip):\(self.configuration.port)",
"user-agent": "Swift-Lambda/Unknown",
"content-length": "\(byteBuffer?.readableBytes ?? 0)",
]
} else {
NewLambdaRuntimeClient.streamingHeaders
self.streamingHeaders
}

let httpRequest = HTTPRequestHead(
Expand All @@ -634,7 +658,7 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
version: .http1_1,
method: .GET,
uri: self.nextInvocationPath,
headers: NewLambdaRuntimeClient.defaultHeaders
headers: self.defaultHeaders
)

context.write(self.wrapOutboundOut(.head(httpRequest)), promise: nil)
Expand All @@ -650,7 +674,7 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
version: .http1_1,
method: .POST,
uri: url,
headers: NewLambdaRuntimeClient.errorHeaders
headers: self.errorHeaders
)

if self.reusableErrorBuffer == nil {
Expand Down Expand Up @@ -797,22 +821,3 @@ extension LambdaChannelHandler: ChannelInboundHandler {
context.fireChannelInactive()
}
}

extension NewLambdaRuntimeClient {
static let defaultHeaders: HTTPHeaders = [
"user-agent": "Swift-Lambda/Unknown"
]

/// These headers must be sent along an invocation or initialization error report
static let errorHeaders: HTTPHeaders = [
"user-agent": "Swift-Lambda/Unknown",
"lambda-runtime-function-error-type": "Unhandled",
]

/// These headers must be sent along an invocation or initialization error report
static let streamingHeaders: HTTPHeaders = [
"user-agent": "Swift-Lambda/Unknown",
"transfer-encoding": "streaming",
]

}
3 changes: 3 additions & 0 deletions Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeError.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ package struct NewLambdaRuntimeError: Error {
case nextInvocationMissingHeaderDeadline
case nextInvocationMissingHeaderInvokeFuctionARN

case missingLambdaRuntimeAPIEnvironmentVariable
case runtimeCanOnlyBeStartedOnce
case invalidPort
}

package init(code: Code, underlying: (any Error)? = nil) {
Expand Down
Loading