1
1
import json
2
2
import logging
3
3
import os
4
- from typing import Dict , Iterable , Optional , Union
4
+ import time
5
+ from abc import ABCMeta , abstractmethod
6
+ from functools import partial
7
+ from typing import Any , Callable , Dict , Iterable , List , Optional , Tuple , Union
5
8
6
9
from ..shared import constants
7
10
8
- STD_LOGGING_KEYS = (
11
+ RESERVED_LOG_ATTRS = (
9
12
"name" ,
10
13
"msg" ,
11
14
"args" ,
15
+ "level" ,
12
16
"levelname" ,
13
17
"levelno" ,
14
18
"pathname" ,
27
31
"processName" ,
28
32
"process" ,
29
33
"asctime" ,
34
+ "location" ,
35
+ "timestamp" ,
30
36
)
31
37
32
38
33
- class JsonFormatter (logging .Formatter ):
34
- """AWS Lambda Logging formatter.
39
+ class BasePowertoolsFormatter (logging .Formatter , metaclass = ABCMeta ):
40
+ @abstractmethod
41
+ def append_keys (self , ** additional_keys ):
42
+ raise NotImplementedError ()
35
43
36
- Formats the log message as a JSON encoded string. If the message is a
37
- dict it will be used directly. If the message can be parsed as JSON, then
38
- the parse d value is used in the output record.
44
+ @ abstractmethod
45
+ def remove_keys ( self , keys : Iterable [ str ]):
46
+ raise NotImplementedError ()
39
47
40
- Originally taken from https://gitlab.com/hadrien/aws_lambda_logging/
41
48
42
- """
49
+ class LambdaPowertoolsFormatter (BasePowertoolsFormatter ):
50
+ """AWS Lambda Powertools Logging formatter.
43
51
44
- def __init__ (self , ** kwargs ):
45
- """Return a JsonFormatter instance.
52
+ Formats the log message as a JSON encoded string. If the message is a
53
+ dict it will be used directly.
54
+ """
46
55
47
- The `json_default` kwarg is used to specify a formatter for otherwise
48
- unserializable values. It must not throw. Defaults to a function that
49
- coerces the value to a string.
56
+ default_time_format = "%Y-%m-%d %H:%M:%S.%F%z" # '2021-04-17 18:19:57.656+0200'
57
+ custom_ms_time_directive = "%F"
58
+
59
+ def __init__ (
60
+ self ,
61
+ json_serializer : Optional [Callable [[Any ], Any ]] = None ,
62
+ json_deserializer : Optional [Callable [[Any ], Any ]] = None ,
63
+ json_default : Optional [Callable [[Any ], Any ]] = None ,
64
+ datefmt : str = None ,
65
+ log_record_order : List [str ] = None ,
66
+ utc : bool = False ,
67
+ ** kwargs
68
+ ):
69
+ """Return a LambdaPowertoolsFormatter instance.
50
70
51
71
The `log_record_order` kwarg is used to specify the order of the keys used in
52
72
the structured json logs. By default the order is: "level", "location", "message", "timestamp",
53
73
"service" and "sampling_rate".
54
74
55
75
Other kwargs are used to specify log field format strings.
76
+
77
+ Parameters
78
+ ----------
79
+ json_serializer : Callable, optional
80
+ function to serialize `obj` to a JSON formatted `str`, by default json.dumps
81
+ json_deserializer : Callable, optional
82
+ function to deserialize `str`, `bytes`, bytearray` containing a JSON document to a Python `obj`,
83
+ by default json.loads
84
+ json_default : Callable, optional
85
+ function to coerce unserializable values, by default str
86
+
87
+ Only used when no custom JSON encoder is set
88
+
89
+ datefmt : str, optional
90
+ String directives (strftime) to format log timestamp
91
+
92
+ See https://docs.python.org/3/library/time.html#time.strftime
93
+ utc : bool, optional
94
+ set logging timestamp to UTC, by default False to continue to use local time as per stdlib
95
+ log_record_order : list, optional
96
+ set order of log keys when logging, by default ["level", "location", "message", "timestamp"]
97
+ kwargs
98
+ Key-value to be included in log messages
56
99
"""
57
- # Set the default unserializable function, by default values will be cast as str.
58
- self .default_json_formatter = kwargs .pop ("json_default" , str )
59
- # Set the insertion order for the log messages
60
- self .log_format = dict .fromkeys (kwargs .pop ("log_record_order" , ["level" , "location" , "message" , "timestamp" ]))
61
- self .reserved_keys = ["timestamp" , "level" , "location" ]
62
- # Set the date format used by `asctime`
63
- super (JsonFormatter , self ).__init__ (datefmt = kwargs .pop ("datefmt" , None ))
100
+ self .json_deserializer = json_deserializer or json .loads
101
+ self .json_default = json_default or str
102
+ self .json_serializer = json_serializer or partial (json .dumps , default = self .json_default , separators = ("," , ":" ))
103
+ self .datefmt = datefmt
104
+ self .utc = utc
105
+ self .log_record_order = log_record_order or ["level" , "location" , "message" , "timestamp" ]
106
+ self .log_format = dict .fromkeys (self .log_record_order ) # Set the insertion order for the log messages
107
+ self .update_formatter = self .append_keys # alias to old method
64
108
65
- self .log_format .update (self ._build_root_keys (** kwargs ))
109
+ if self .utc :
110
+ self .converter = time .gmtime
111
+
112
+ super (LambdaPowertoolsFormatter , self ).__init__ (datefmt = self .datefmt )
113
+
114
+ keys_combined = {** self ._build_default_keys (), ** kwargs }
115
+ self .log_format .update (** keys_combined )
116
+
117
+ def format (self , record : logging .LogRecord ) -> str : # noqa: A003
118
+ """Format logging record as structured JSON str"""
119
+ formatted_log = self ._extract_log_keys (log_record = record )
120
+ formatted_log ["message" ] = self ._extract_log_message (log_record = record )
121
+ formatted_log ["exception" ], formatted_log ["exception_name" ] = self ._extract_log_exception (log_record = record )
122
+ formatted_log ["xray_trace_id" ] = self ._get_latest_trace_id ()
123
+ formatted_log = self ._strip_none_records (records = formatted_log )
124
+
125
+ return self .json_serializer (formatted_log )
126
+
127
+ def formatTime (self , record : logging .LogRecord , datefmt : Optional [str ] = None ) -> str :
128
+ record_ts = self .converter (record .created )
129
+ if datefmt :
130
+ return time .strftime (datefmt , record_ts )
131
+
132
+ # NOTE: Python `time.strftime` doesn't provide msec directives
133
+ # so we create a custom one (%F) and replace logging record ts
134
+ # Reason 2 is that std logging doesn't support msec after TZ
135
+ msecs = "%03d" % record .msecs
136
+ custom_fmt = self .default_time_format .replace (self .custom_ms_time_directive , msecs )
137
+ return time .strftime (custom_fmt , record_ts )
138
+
139
+ def append_keys (self , ** additional_keys ):
140
+ self .log_format .update (additional_keys )
141
+
142
+ def remove_keys (self , keys : Iterable [str ]):
143
+ for key in keys :
144
+ self .log_format .pop (key , None )
66
145
67
146
@staticmethod
68
- def _build_root_keys ( ** kwargs ):
147
+ def _build_default_keys ( ):
69
148
return {
70
149
"level" : "%(levelname)s" ,
71
150
"location" : "%(funcName)s:%(lineno)d" ,
72
151
"timestamp" : "%(asctime)s" ,
73
- ** kwargs ,
74
152
}
75
153
76
154
@staticmethod
77
155
def _get_latest_trace_id ():
78
156
xray_trace_id = os .getenv (constants .XRAY_TRACE_ID_ENV )
79
157
return xray_trace_id .split (";" )[0 ].replace ("Root=" , "" ) if xray_trace_id else None
80
158
81
- def update_formatter (self , ** kwargs ):
82
- self .log_format .update (kwargs )
83
-
84
- @staticmethod
85
- def _extract_log_message (log_record : logging .LogRecord ) -> Union [Dict , str , bool , Iterable ]:
86
- """Extract message from log record and attempt to JSON decode it
159
+ def _extract_log_message (self , log_record : logging .LogRecord ) -> Union [Dict [str , Any ], str , bool , Iterable ]:
160
+ """Extract message from log record and attempt to JSON decode it if str
87
161
88
162
Parameters
89
163
----------
@@ -95,20 +169,19 @@ def _extract_log_message(log_record: logging.LogRecord) -> Union[Dict, str, bool
95
169
message: Union[Dict, str, bool, Iterable]
96
170
Extracted message
97
171
"""
98
- if isinstance (log_record .msg , dict ):
99
- return log_record .msg
172
+ message = log_record .msg
173
+ if isinstance (message , dict ):
174
+ return message
100
175
101
- message : str = log_record .getMessage ()
102
-
103
- # Attempt to decode non-str messages e.g. msg = '{"x": "y"}'
104
- try :
105
- message = json .loads (log_record .msg )
106
- except (json .decoder .JSONDecodeError , TypeError , ValueError ):
107
- pass
176
+ if isinstance (message , str ): # could be a JSON string
177
+ try :
178
+ message = self .json_deserializer (message )
179
+ except (json .decoder .JSONDecodeError , TypeError , ValueError ):
180
+ pass
108
181
109
182
return message
110
183
111
- def _extract_log_exception (self , log_record : logging .LogRecord ) -> Optional [ str ]:
184
+ def _extract_log_exception (self , log_record : logging .LogRecord ) -> Union [ Tuple [ str , str ], Tuple [ None , None ] ]:
112
185
"""Format traceback information, if available
113
186
114
187
Parameters
@@ -118,33 +191,15 @@ def _extract_log_exception(self, log_record: logging.LogRecord) -> Optional[str]
118
191
119
192
Returns
120
193
-------
121
- log_record: Optional[str]
122
- Log record with constant traceback info
194
+ log_record: Optional[Tuple[ str, str] ]
195
+ Log record with constant traceback info and exception name
123
196
"""
124
197
if log_record .exc_info :
125
- return self .formatException (log_record .exc_info )
126
-
127
- return None
198
+ return self .formatException (log_record .exc_info ), log_record .exc_info [0 ].__name__
128
199
129
- def _extract_log_exception_name (self , log_record : logging .LogRecord ) -> Optional [str ]:
130
- """Extract the exception name, if available
200
+ return None , None
131
201
132
- Parameters
133
- ----------
134
- log_record : logging.LogRecord
135
- Log record to extract exception name from
136
-
137
- Returns
138
- -------
139
- log_record: Optional[str]
140
- Log record with exception name
141
- """
142
- if log_record .exc_info :
143
- return log_record .exc_info [0 ].__name__
144
-
145
- return None
146
-
147
- def _extract_log_keys (self , log_record : logging .LogRecord ) -> Dict :
202
+ def _extract_log_keys (self , log_record : logging .LogRecord ) -> Dict [str , Any ]:
148
203
"""Extract and parse custom and reserved log keys
149
204
150
205
Parameters
@@ -157,36 +212,27 @@ def _extract_log_keys(self, log_record: logging.LogRecord) -> Dict:
157
212
formatted_log: Dict
158
213
Structured log as dictionary
159
214
"""
160
- record_dict = log_record .__dict__ .copy () # has extra kwargs we are after
161
- record_dict ["asctime" ] = self .formatTime (log_record , self .datefmt )
215
+ record_dict = log_record .__dict__ .copy ()
216
+ record_dict ["asctime" ] = self .formatTime (record = log_record , datefmt = self .datefmt )
217
+ extras = {k : v for k , v in record_dict .items () if k not in RESERVED_LOG_ATTRS }
162
218
163
- formatted_log = {}
219
+ formatted_log = {** extras }
164
220
165
- # We have to iterate over a default or existing log structure
166
- # then replace any logging expression for reserved keys e.g. '%(level)s' to 'INFO'
167
- # and lastly add or replace incoming keys (those added within the constructor or .structure_logs method)
221
+ # Iterate over a default or existing log structure
222
+ # then replace any std log attribute e.g. '%(level)s' to 'INFO', '%(process)d to '4773 '
223
+ # lastly add or replace incoming keys (those added within the constructor or .structure_logs method)
168
224
for key , value in self .log_format .items ():
169
- if value and key in self . reserved_keys :
225
+ if value and key in RESERVED_LOG_ATTRS :
170
226
formatted_log [key ] = value % record_dict
171
227
else :
172
228
formatted_log [key ] = value
173
229
174
- # pick up extra keys when logging a new message e.g. log.info("my message", extra={"additional_key": "value"}
175
- # these messages will be added to the root of the final structure not within `message` key
176
- for key , value in record_dict .items ():
177
- if key not in STD_LOGGING_KEYS :
178
- formatted_log [key ] = value
179
-
180
230
return formatted_log
181
231
182
- def format (self , record ): # noqa: A003
183
- formatted_log = self ._extract_log_keys (log_record = record )
184
- formatted_log ["message" ] = self ._extract_log_message (log_record = record )
185
- formatted_log ["exception_name" ] = self ._extract_log_exception_name (log_record = record )
186
- formatted_log ["exception" ] = self ._extract_log_exception (log_record = record )
187
- formatted_log .update ({"xray_trace_id" : self ._get_latest_trace_id ()}) # fetch latest Trace ID, if any
232
+ @staticmethod
233
+ def _strip_none_records (records : Dict [str , Any ]) -> Dict [str , Any ]:
234
+ """Remove any key with None as value"""
235
+ return {k : v for k , v in records .items () if v is not None }
188
236
189
- # Filter out top level key with values that are None
190
- formatted_log = {k : v for k , v in formatted_log .items () if v is not None }
191
237
192
- return json . dumps ( formatted_log , default = self . default_json_formatter )
238
+ JsonFormatter = LambdaPowertoolsFormatter # alias to previous formatter
0 commit comments