Skip to content

Commit ddc538d

Browse files
committed
Fix missing host header, implement runtime's run function, fix defaultEventLoop
1 parent a64395c commit ddc538d

File tree

4 files changed

+63
-31
lines changed

4 files changed

+63
-31
lines changed

Sources/AWSLambdaRuntimeCore/NewLambda.swift

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,5 @@ extension Lambda {
4747
}
4848

4949
/// The default EventLoop the Lambda is scheduled on.
50-
package static var defaultEventLoop: any EventLoop {
51-
get {
52-
NIOSingletons.posixEventLoopGroup.next()
53-
}
54-
}
50+
package static var defaultEventLoop: any EventLoop = NIOSingletons.posixEventLoopGroup.next()
5551
}

Sources/AWSLambdaRuntimeCore/NewLambdaRuntime.swift

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import NIOCore
1919
import Synchronization
2020

2121
package final class NewLambdaRuntime<Handler>: Sendable where Handler: StreamingLambdaHandler {
22-
let handlerMutex: Mutex<Handler>
22+
let handlerMutex: Mutex<Handler?>
2323
let logger: Logger
2424
let eventLoop: EventLoop
2525

@@ -34,5 +34,35 @@ package final class NewLambdaRuntime<Handler>: Sendable where Handler: Streaming
3434
}
3535

3636
package func run() async throws {
37+
guard let runtimeEndpoint = Lambda.env("AWS_LAMBDA_RUNTIME_API") else {
38+
throw NewLambdaRuntimeError(code: .cannotStartLambdaRuntime)
39+
}
40+
41+
let ipAndPort = runtimeEndpoint.split(separator: ":", maxSplits: 1)
42+
let ip = String(ipAndPort[0])
43+
let port = Int(ipAndPort[1])!
44+
45+
let handler = self.handlerMutex.withLock { maybeHandler in
46+
defer {
47+
maybeHandler = nil
48+
}
49+
return maybeHandler
50+
}
51+
52+
guard let handler else {
53+
throw NewLambdaRuntimeError(code: .runtimeCanOnlyBeStartedOnce)
54+
}
55+
56+
try await NewLambdaRuntimeClient.withRuntimeClient(
57+
configuration: .init(ip: ip, port: port),
58+
eventLoop: self.eventLoop,
59+
logger: self.logger
60+
) { runtimeClient in
61+
try await Lambda.runLoop(
62+
runtimeClient: runtimeClient,
63+
handler: handler,
64+
logger: self.logger
65+
)
66+
}
3767
}
3868
}

Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift

Lines changed: 29 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol {
303303
NIOHTTPClientResponseAggregator(maxContentLength: 6 * 1024 * 1024)
304304
)
305305
try channel.pipeline.syncOperations.addHandler(
306-
LambdaChannelHandler(delegate: self, logger: self.logger)
306+
LambdaChannelHandler(delegate: self, logger: self.logger, configuration: self.configuration)
307307
)
308308
return channel.eventLoop.makeSucceededFuture(())
309309
} catch {
@@ -433,10 +433,32 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
433433
private var reusableErrorBuffer: ByteBuffer?
434434
private let logger: Logger
435435
private let delegate: Delegate
436+
private let configuration: NewLambdaRuntimeClient.Configuration
436437

437-
init(delegate: Delegate, logger: Logger) {
438+
let defaultHeaders: HTTPHeaders
439+
/// These headers must be sent along an invocation or initialization error report
440+
let errorHeaders: HTTPHeaders
441+
/// These headers must be sent along an invocation or initialization error report
442+
let streamingHeaders: HTTPHeaders
443+
444+
init(delegate: Delegate, logger: Logger, configuration: NewLambdaRuntimeClient.Configuration) {
438445
self.delegate = delegate
439446
self.logger = logger
447+
self.configuration = configuration
448+
self.defaultHeaders = [
449+
"host": "\(self.configuration.ip):\(self.configuration.port)",
450+
"user-agent": "Swift-Lambda/Unknown",
451+
]
452+
self.errorHeaders = [
453+
"host": "\(self.configuration.ip):\(self.configuration.port)",
454+
"user-agent": "Swift-Lambda/Unknown",
455+
"lambda-runtime-function-error-type": "Unhandled",
456+
]
457+
self.streamingHeaders = [
458+
"host": "\(self.configuration.ip):\(self.configuration.port)",
459+
"user-agent": "Swift-Lambda/Unknown",
460+
"transfer-encoding": "streaming",
461+
]
440462
}
441463

442464
func nextInvocation(isolation: isolated (any Actor)? = #isolation) async throws -> Invocation {
@@ -578,7 +600,7 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
578600
version: .http1_1,
579601
method: .POST,
580602
uri: url,
581-
headers: NewLambdaRuntimeClient.streamingHeaders
603+
headers: self.streamingHeaders
582604
)
583605

584606
context.write(self.wrapOutboundOut(.head(httpRequest)), promise: nil)
@@ -604,11 +626,12 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
604626
let headers: HTTPHeaders =
605627
if byteBuffer?.readableBytes ?? 0 < 6_000_000 {
606628
[
629+
"host": "\(self.configuration.ip):\(self.configuration.port)",
607630
"user-agent": "Swift-Lambda/Unknown",
608631
"content-length": "\(byteBuffer?.readableBytes ?? 0)",
609632
]
610633
} else {
611-
NewLambdaRuntimeClient.streamingHeaders
634+
self.streamingHeaders
612635
}
613636

614637
let httpRequest = HTTPRequestHead(
@@ -634,7 +657,7 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
634657
version: .http1_1,
635658
method: .GET,
636659
uri: self.nextInvocationPath,
637-
headers: NewLambdaRuntimeClient.defaultHeaders
660+
headers: self.defaultHeaders
638661
)
639662

640663
context.write(self.wrapOutboundOut(.head(httpRequest)), promise: nil)
@@ -650,7 +673,7 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
650673
version: .http1_1,
651674
method: .POST,
652675
uri: url,
653-
headers: NewLambdaRuntimeClient.errorHeaders
676+
headers: self.errorHeaders
654677
)
655678

656679
if self.reusableErrorBuffer == nil {
@@ -797,22 +820,3 @@ extension LambdaChannelHandler: ChannelInboundHandler {
797820
context.fireChannelInactive()
798821
}
799822
}
800-
801-
extension NewLambdaRuntimeClient {
802-
static let defaultHeaders: HTTPHeaders = [
803-
"user-agent": "Swift-Lambda/Unknown"
804-
]
805-
806-
/// These headers must be sent along an invocation or initialization error report
807-
static let errorHeaders: HTTPHeaders = [
808-
"user-agent": "Swift-Lambda/Unknown",
809-
"lambda-runtime-function-error-type": "Unhandled",
810-
]
811-
812-
/// These headers must be sent along an invocation or initialization error report
813-
static let streamingHeaders: HTTPHeaders = [
814-
"user-agent": "Swift-Lambda/Unknown",
815-
"transfer-encoding": "streaming",
816-
]
817-
818-
}

Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeError.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ package struct NewLambdaRuntimeError: Error {
2929
case nextInvocationMissingHeaderDeadline
3030
case nextInvocationMissingHeaderInvokeFuctionARN
3131

32+
case cannotStartLambdaRuntime
33+
case runtimeCanOnlyBeStartedOnce
3234
}
3335

3436
package init(code: Code, underlying: (any Error)? = nil) {

0 commit comments

Comments
 (0)