12
12
//
13
13
//===----------------------------------------------------------------------===//
14
14
15
+ import Logging
15
16
import NIOCore
16
17
import NIOHTTP1
17
18
import NIOPosix
18
- import Logging
19
19
import _NIOBase64
20
20
21
21
final actor NewLambdaRuntimeClient : LambdaRuntimeClientProtocol {
@@ -36,15 +36,15 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol {
36
36
func write( _ buffer: NIOCore . ByteBuffer ) async throws {
37
37
try await self . runtimeClient. write ( buffer)
38
38
}
39
-
39
+
40
40
func finish( ) async throws {
41
41
try await self . runtimeClient. finish ( )
42
42
}
43
-
43
+
44
44
func writeAndFinish( _ buffer: NIOCore . ByteBuffer ) async throws {
45
45
try await self . runtimeClient. writeAndFinish ( buffer)
46
46
}
47
-
47
+
48
48
func reportError( _ error: any Error ) async throws {
49
49
try await self . runtimeClient. reportError ( error)
50
50
}
@@ -120,7 +120,8 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol {
120
120
case . connecting( var array) :
121
121
// Since we do get sequential invocations this case normally should never be hit.
122
122
// We'll support it anyway.
123
- return try await withCheckedThrowingContinuation { ( continuation: CheckedContinuation < LambdaChannelHandler , any Error > ) in
123
+ return try await withCheckedThrowingContinuation {
124
+ ( continuation: CheckedContinuation < LambdaChannelHandler , any Error > ) in
124
125
array. append ( continuation)
125
126
self . connectionState = . connecting( array)
126
127
}
@@ -211,16 +212,17 @@ private final class LambdaChannelHandler {
211
212
func nextInvocation( isolation: isolated ( any Actor ) ? = #isolation) async throws -> Invocation {
212
213
switch self . state {
213
214
case . connected( let context, . idle) :
214
- return try await withCheckedThrowingContinuation { ( continuation: CheckedContinuation < Invocation , any Error > ) in
215
+ return try await withCheckedThrowingContinuation {
216
+ ( continuation: CheckedContinuation < Invocation , any Error > ) in
215
217
self . state = . connected( context, . waitingForNextInvocation( continuation) )
216
218
self . sendNextRequest ( context: context)
217
219
}
218
220
219
221
case . connected( _, . closing) ,
220
- . connected( _, . sendingResponse) ,
221
- . connected( _, . sentResponse) ,
222
- . connected( _, . waitingForNextInvocation) ,
223
- . connected( _, . waitingForResponse) :
222
+ . connected( _, . sendingResponse) ,
223
+ . connected( _, . sentResponse) ,
224
+ . connected( _, . waitingForNextInvocation) ,
225
+ . connected( _, . waitingForResponse) :
224
226
fatalError ( )
225
227
226
228
case . disconnected:
@@ -232,7 +234,7 @@ private final class LambdaChannelHandler {
232
234
func reportError( isolation: isolated ( any Actor ) ? = #isolation, _ error: any Error ) async throws {
233
235
switch self . state {
234
236
case . connected( _, . idle( . none) ) ,
235
- . connected( _, . waitingForNextInvocation) :
237
+ . connected( _, . waitingForNextInvocation) :
236
238
fatalError ( " Invalid state: \( self . state) " )
237
239
238
240
case . connected( let context, . waitingForResponse( let requestID) ) :
@@ -248,14 +250,17 @@ private final class LambdaChannelHandler {
248
250
}
249
251
250
252
case . connected( _, . idle( previousRequestID: . some( let requestID) ) ) ,
251
- . connected( _, . sentResponse( let requestID, _) ) :
253
+ . connected( _, . sentResponse( let requestID, _) ) :
252
254
// The final response has already been sent. The only way to report the unhandled error
253
255
// now is to log it. Normally this library never logs higher than debug, we make an
254
256
// exception here, as there is no other way of reporting the error otherwise.
255
- self . logger. error ( " Unhandled error after stream has finished " , metadata: [
256
- " lambda_request_id " : " \( requestID) " ,
257
- " lambda_error " : " \( String ( describing: error) ) "
258
- ] )
257
+ self . logger. error (
258
+ " Unhandled error after stream has finished " ,
259
+ metadata: [
260
+ " lambda_request_id " : " \( requestID) " ,
261
+ " lambda_error " : " \( String ( describing: error) ) " ,
262
+ ]
263
+ )
259
264
260
265
case . disconnected, . connected( _, . closing) :
261
266
// TODO: throw error here
@@ -266,7 +271,7 @@ private final class LambdaChannelHandler {
266
271
func writeResponseBodyPart( isolation: isolated ( any Actor ) ? = #isolation, _ byteBuffer: ByteBuffer ) async throws {
267
272
switch self . state {
268
273
case . connected( _, . idle( . none) ) ,
269
- . connected( _, . waitingForNextInvocation) :
274
+ . connected( _, . waitingForNextInvocation) :
270
275
fatalError ( " Invalid state: \( self . state) " )
271
276
272
277
case . connected( let context, . waitingForResponse( let requestID) ) :
@@ -277,11 +282,10 @@ private final class LambdaChannelHandler {
277
282
try await self . sendResponseBodyPart ( byteBuffer, sendHeadWithRequestID: nil , context: context)
278
283
279
284
case . connected( _, . idle( previousRequestID: . some( let requestID) ) ) ,
280
- . connected( _, . sentResponse( let requestID, _) ) :
285
+ . connected( _, . sentResponse( let requestID, _) ) :
281
286
// TODO: throw error here – user tries to write after the stream has been finished
282
287
fatalError ( )
283
288
284
-
285
289
case . disconnected, . connected( _, . closing) :
286
290
// TODO: throw error here
287
291
fatalError ( )
@@ -291,7 +295,7 @@ private final class LambdaChannelHandler {
291
295
func finishResponseRequest( isolation: isolated ( any Actor ) ? = #isolation, finalData: ByteBuffer ? ) async throws {
292
296
switch self . state {
293
297
case . connected( _, . idle( . none) ) ,
294
- . connected( _, . waitingForNextInvocation) :
298
+ . connected( _, . waitingForNextInvocation) :
295
299
fatalError ( " Invalid state: \( self . state) " )
296
300
297
301
case . connected( let context, . waitingForResponse( let requestID) ) :
@@ -307,11 +311,10 @@ private final class LambdaChannelHandler {
307
311
}
308
312
309
313
case . connected( _, . idle( previousRequestID: . some( let requestID) ) ) ,
310
- . connected( _, . sentResponse( let requestID, _) ) :
314
+ . connected( _, . sentResponse( let requestID, _) ) :
311
315
// TODO: throw error here – user tries to write after the stream has been finished
312
316
fatalError ( )
313
317
314
-
315
318
case . disconnected, . connected( _, . closing) :
316
319
// TODO: throw error here
317
320
fatalError ( )
@@ -355,14 +358,15 @@ private final class LambdaChannelHandler {
355
358
// TODO: This feels super expensive. We should be able to make this cheaper. requestIDs are fixed length
356
359
let url = Consts . invocationURLPrefix + " / " + requestID + Consts. postResponseURLSuffix
357
360
358
- let headers : HTTPHeaders = if byteBuffer? . readableBytes ?? 0 < 6_000_000 {
359
- [
360
- " user-agent " : " Swift-Lambda/Unknown " ,
361
- " content-length " : " \( byteBuffer? . readableBytes ?? 0 ) " ,
362
- ]
363
- } else {
364
- LambdaRuntimeClient . streamingHeaders
365
- }
361
+ let headers : HTTPHeaders =
362
+ if byteBuffer? . readableBytes ?? 0 < 6_000_000 {
363
+ [
364
+ " user-agent " : " Swift-Lambda/Unknown " ,
365
+ " content-length " : " \( byteBuffer? . readableBytes ?? 0 ) " ,
366
+ ]
367
+ } else {
368
+ LambdaRuntimeClient . streamingHeaders
369
+ }
366
370
367
371
let httpRequest = HTTPRequestHead (
368
372
version: . http1_1,
@@ -383,7 +387,12 @@ private final class LambdaChannelHandler {
383
387
}
384
388
385
389
private func sendNextRequest( context: ChannelHandlerContext ) {
386
- let httpRequest = HTTPRequestHead ( version: . http1_1, method: . GET, uri: self . nextInvocationPath, headers: LambdaRuntimeClient . defaultHeaders)
390
+ let httpRequest = HTTPRequestHead (
391
+ version: . http1_1,
392
+ method: . GET,
393
+ uri: self . nextInvocationPath,
394
+ headers: LambdaRuntimeClient . defaultHeaders
395
+ )
387
396
388
397
context. write ( self . wrapOutboundOut ( . head( httpRequest) ) , promise: nil )
389
398
context. write ( self . wrapOutboundOut ( . end( nil ) ) , promise: nil )
@@ -478,40 +487,40 @@ extension LambdaChannelHandler: ChannelInboundHandler {
478
487
break
479
488
}
480
489
481
- // // As defined in RFC 7230 Section 6.3:
482
- // // HTTP/1.1 defaults to the use of "persistent connections", allowing
483
- // // multiple requests and responses to be carried over a single
484
- // // connection. The "close" connection option is used to signal that a
485
- // // connection will not persist after the current request/response. HTTP
486
- // // implementations SHOULD support persistent connections.
487
- // //
488
- // // That's why we only assume the connection shall be closed if we receive
489
- // // a "connection = close" header.
490
- // let serverCloseConnection =
491
- // response.head.headers["connection"].contains(where: { $0.lowercased() == "close" })
492
- //
493
- // let closeConnection = serverCloseConnection || response.head.version != .http1_1
494
- //
495
- // if closeConnection {
496
- // // If we were succeeding the request promise here directly and closing the connection
497
- // // after succeeding the promise we may run into a race condition:
498
- // //
499
- // // The lambda runtime will ask for the next work item directly after a succeeded post
500
- // // response request. The desire for the next work item might be faster than the attempt
501
- // // to close the connection. This will lead to a situation where we try to the connection
502
- // // but the next request has already been scheduled on the connection that we want to
503
- // // close. For this reason we postpone succeeding the promise until the connection has
504
- // // been closed. This codepath will only be hit in the very, very unlikely event of the
505
- // // Lambda control plane demanding to close connection. (It's more or less only
506
- // // implemented to support http1.1 correctly.) This behavior is ensured with the test
507
- // // `LambdaTest.testNoKeepAliveServer`.
508
- // self.state = .waitForConnectionClose(httpResponse, promise)
509
- // _ = context.channel.close()
510
- // return
511
- // } else {
512
- // self.state = .idle
513
- // promise.succeed(httpResponse)
514
- // }
490
+ // // As defined in RFC 7230 Section 6.3:
491
+ // // HTTP/1.1 defaults to the use of "persistent connections", allowing
492
+ // // multiple requests and responses to be carried over a single
493
+ // // connection. The "close" connection option is used to signal that a
494
+ // // connection will not persist after the current request/response. HTTP
495
+ // // implementations SHOULD support persistent connections.
496
+ // //
497
+ // // That's why we only assume the connection shall be closed if we receive
498
+ // // a "connection = close" header.
499
+ // let serverCloseConnection =
500
+ // response.head.headers["connection"].contains(where: { $0.lowercased() == "close" })
501
+ //
502
+ // let closeConnection = serverCloseConnection || response.head.version != .http1_1
503
+ //
504
+ // if closeConnection {
505
+ // // If we were succeeding the request promise here directly and closing the connection
506
+ // // after succeeding the promise we may run into a race condition:
507
+ // //
508
+ // // The lambda runtime will ask for the next work item directly after a succeeded post
509
+ // // response request. The desire for the next work item might be faster than the attempt
510
+ // // to close the connection. This will lead to a situation where we try to the connection
511
+ // // but the next request has already been scheduled on the connection that we want to
512
+ // // close. For this reason we postpone succeeding the promise until the connection has
513
+ // // been closed. This codepath will only be hit in the very, very unlikely event of the
514
+ // // Lambda control plane demanding to close connection. (It's more or less only
515
+ // // implemented to support http1.1 correctly.) This behavior is ensured with the test
516
+ // // `LambdaTest.testNoKeepAliveServer`.
517
+ // self.state = .waitForConnectionClose(httpResponse, promise)
518
+ // _ = context.channel.close()
519
+ // return
520
+ // } else {
521
+ // self.state = .idle
522
+ // promise.succeed(httpResponse)
523
+ // }
515
524
}
516
525
517
526
func errorCaught( context: ChannelHandlerContext , error: Error ) {
@@ -524,19 +533,19 @@ extension LambdaChannelHandler: ChannelInboundHandler {
524
533
// fail any pending responses with last error or assume peer disconnected
525
534
context. fireChannelInactive ( )
526
535
527
- // switch self.state {
528
- // case .idle:
529
- // break
530
- //
531
- // case .running(let promise, let timeout):
532
- // self.state = .idle
533
- // timeout?.cancel()
534
- // promise.fail(self.lastError ?? HTTPClient.Errors.connectionResetByPeer)
535
- //
536
- // case .waitForConnectionClose(let response, let promise):
537
- // self.state = .idle
538
- // promise.succeed(response)
539
- // }
536
+ // switch self.state {
537
+ // case .idle:
538
+ // break
539
+ //
540
+ // case .running(let promise, let timeout):
541
+ // self.state = .idle
542
+ // timeout?.cancel()
543
+ // promise.fail(self.lastError ?? HTTPClient.Errors.connectionResetByPeer)
544
+ //
545
+ // case .waitForConnectionClose(let response, let promise):
546
+ // self.state = .idle
547
+ // promise.succeed(response)
548
+ // }
540
549
}
541
550
}
542
551
0 commit comments