7
7
from pydantic import BaseModel , Extra
8
8
from retry import retry
9
9
10
+ from aws_lambda_powertools .shared .constants import LOGGER_LAMBDA_CONTEXT_KEYS
11
+
10
12
11
13
class Log (BaseModel , extra = Extra .allow ):
12
14
level : str
@@ -22,31 +24,68 @@ class Log(BaseModel, extra=Extra.allow):
22
24
xray_trace_id : Optional [str ]
23
25
24
26
27
+ class LogFetcher :
28
+ def __init__ (
29
+ self ,
30
+ function_name : str ,
31
+ start_time : datetime ,
32
+ log_client : Optional [CloudWatchLogsClient ] = None ,
33
+ filter_expression : Optional [str ] = None ,
34
+ ) -> None :
35
+ self .function_name = function_name
36
+ self .start_time = int (start_time .timestamp ())
37
+ self .log_client = log_client or boto3 .client ("logs" )
38
+ self .filter_expression = filter_expression or "message" # Logger message key
39
+ self .log_group = f"/aws/lambda/{ self .function_name } "
40
+ self .logs : List [Log ] = self .get_logs ()
41
+
42
+ def get_logs (self ):
43
+ ret = self .log_client .filter_log_events (
44
+ logGroupName = self .log_group ,
45
+ startTime = self .start_time ,
46
+ filterPattern = self .filter_expression ,
47
+ )
48
+
49
+ if not ret ["events" ]:
50
+ raise ValueError ("Empty response from Cloudwatch Logs. Repeating..." )
51
+
52
+ filtered_logs = []
53
+ for event in ret ["events" ]:
54
+ try :
55
+ message = Log (** json .loads (event ["message" ]))
56
+ except json .decoder .JSONDecodeError :
57
+ continue
58
+ filtered_logs .append (message )
59
+
60
+ return filtered_logs
61
+
62
+ def get_log (self , key : str , value : Optional [any ] = None ) -> List [Log ]:
63
+ logs = []
64
+ for log in self .logs :
65
+ log_value = getattr (log , key , None )
66
+ if value is not None and log_value == value :
67
+ logs .append (log )
68
+ if value is None and getattr (log , key , False ):
69
+ logs .append (log )
70
+ return logs
71
+
72
+ def get_cold_start_log (self ) -> List [Log ]:
73
+ return [log for log in self .logs if log .cold_start ]
74
+
75
+ def have_logger_context_keys (self ) -> bool :
76
+ return all (getattr (log , key , False ) for log in self .logs for key in LOGGER_LAMBDA_CONTEXT_KEYS )
77
+
78
+ def __len__ (self ) -> int :
79
+ return len (self .logs )
80
+
81
+
25
82
@retry (ValueError , delay = 2 , jitter = 1.5 , tries = 10 )
26
83
def get_logs (
27
84
function_name : str ,
28
85
start_time : datetime ,
29
- log_client : Optional [CloudWatchLogsClient ] = None ,
30
86
filter_expression : Optional [str ] = None ,
31
- ) -> List [Log ]:
32
- log_client = log_client or boto3 .client ("logs" )
33
- filter_expression = filter_expression or "message" # Logger message key
34
-
35
- response = log_client .filter_log_events (
36
- logGroupName = f"/aws/lambda/{ function_name } " ,
37
- startTime = int (start_time .timestamp ()),
38
- filterPattern = filter_expression ,
87
+ log_client : Optional [CloudWatchLogsClient ] = None ,
88
+ ) -> LogFetcher :
89
+ return LogFetcher (
90
+ function_name = function_name , start_time = start_time , filter_expression = filter_expression , log_client = log_client
39
91
)
40
-
41
- if not response ["events" ]:
42
- raise ValueError ("Empty response from Cloudwatch Logs. Repeating..." )
43
-
44
- filtered_logs = []
45
- for event in response ["events" ]:
46
- try :
47
- message = Log (** json .loads (event ["message" ]))
48
- except json .decoder .JSONDecodeError :
49
- continue
50
- filtered_logs .append (message )
51
-
52
- return filtered_logs
0 commit comments