1
+ import functools
2
+ import time
3
+ from concurrent .futures import Future , ThreadPoolExecutor
1
4
from datetime import datetime
2
- from typing import Optional , Tuple
5
+ from typing import List , Optional , Tuple
3
6
4
7
import boto3
5
8
import requests
6
9
from mypy_boto3_lambda import LambdaClient
7
10
from mypy_boto3_lambda .type_defs import InvocationResponseTypeDef
11
+ from pydantic import BaseModel
8
12
from requests import Request , Response
9
13
from requests .exceptions import RequestException
10
14
from retry import retry
11
15
16
+ GetLambdaResponse = Tuple [InvocationResponseTypeDef , datetime ]
17
+
18
+
19
+ class GetLambdaResponseOptions (BaseModel ):
20
+ lambda_arn : str
21
+ payload : Optional [str ] = None
22
+ client : Optional [LambdaClient ] = None
23
+ raise_on_error : bool = True
24
+
25
+ # Maintenance: Pydantic v2 deprecated it; we should update in v3
26
+ class Config :
27
+ arbitrary_types_allowed = True
28
+
12
29
13
30
def get_lambda_response (
14
31
lambda_arn : str ,
15
32
payload : Optional [str ] = None ,
16
33
client : Optional [LambdaClient ] = None ,
17
- ) -> Tuple [InvocationResponseTypeDef , datetime ]:
34
+ raise_on_error : bool = True ,
35
+ ) -> GetLambdaResponse :
36
+ """Invoke function synchronously
37
+
38
+ Parameters
39
+ ----------
40
+ lambda_arn : str
41
+ Lambda function ARN to invoke
42
+ payload : Optional[str], optional
43
+ JSON payload for Lambda invocation, by default None
44
+ client : Optional[LambdaClient], optional
45
+ Boto3 Lambda SDK client, by default None
46
+ raise_on_error : bool, optional
47
+ Whether to raise exception upon invocation error, by default True
48
+
49
+ Returns
50
+ -------
51
+ Tuple[InvocationResponseTypeDef, datetime]
52
+ Function response and approximate execution time
53
+
54
+ Raises
55
+ ------
56
+ RuntimeError
57
+ Function invocation error details
58
+ """
18
59
client = client or boto3 .client ("lambda" )
19
60
payload = payload or ""
20
61
execution_time = datetime .utcnow ()
21
- return client .invoke (FunctionName = lambda_arn , InvocationType = "RequestResponse" , Payload = payload ), execution_time
62
+ response : InvocationResponseTypeDef = client .invoke (
63
+ FunctionName = lambda_arn ,
64
+ InvocationType = "RequestResponse" ,
65
+ Payload = payload ,
66
+ )
67
+
68
+ has_error = response .get ("FunctionError" , "" ) == "Unhandled"
69
+ if has_error and raise_on_error :
70
+ error_payload = response ["Payload" ].read ().decode ()
71
+ raise RuntimeError (f"Function failed invocation: { error_payload } " )
72
+
73
+ return response , execution_time
22
74
23
75
24
76
@retry (RequestException , delay = 2 , jitter = 1.5 , tries = 5 )
@@ -27,3 +79,39 @@ def get_http_response(request: Request) -> Response:
27
79
result = session .send (request .prepare ())
28
80
result .raise_for_status ()
29
81
return result
82
+
83
+
84
+ def get_lambda_response_in_parallel (
85
+ get_lambda_response_options : List [GetLambdaResponseOptions ],
86
+ ) -> List [GetLambdaResponse ]:
87
+ """Invoke functions in parallel
88
+
89
+ Parameters
90
+ ----------
91
+ get_lambda_response_options : List[GetLambdaResponseOptions]
92
+ List of options to call get_lambda_response with
93
+
94
+ Returns
95
+ -------
96
+ List[GetLambdaResponse]
97
+ Function responses and approximate execution time
98
+ """
99
+ result_list = []
100
+ with ThreadPoolExecutor () as executor :
101
+ running_tasks : List [Future ] = []
102
+ for options in get_lambda_response_options :
103
+ # Sleep 0.5, 1, 1.5, ... seconds between each invocation. This way
104
+ # we can guarantee that lambdas are executed in parallel, but they are
105
+ # called in the same "order" as they are passed in, thus guaranteeing that
106
+ # we can assert on the correct output.
107
+ time .sleep (0.5 * len (running_tasks ))
108
+
109
+ get_lambda_response_callback = functools .partial (get_lambda_response , ** options .dict ())
110
+ running_tasks .append (
111
+ executor .submit (get_lambda_response_callback ),
112
+ )
113
+
114
+ executor .shutdown (wait = True )
115
+ result_list .extend (running_task .result () for running_task in running_tasks )
116
+
117
+ return result_list
0 commit comments