Skip to content

Commit b5986dd

Browse files
author
Artem Krivonos
committed
Extend framing protocol
1 parent 076dbf5 commit b5986dd

File tree

3 files changed

+87
-11
lines changed

3 files changed

+87
-11
lines changed

awslambdaric/bootstrap.py

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -256,9 +256,14 @@ def __init__(self, log_sink):
256256
logging.Handler.__init__(self)
257257
self.log_sink = log_sink
258258

259-
def emit(self, record):
259+
def emit(self, record: logging.LogRecord):
260260
msg = self.format(record)
261-
self.log_sink.log(msg)
261+
262+
self.log_sink.log(
263+
msg,
264+
log_level=record.levelno,
265+
log_format=getattr(record, "log_format", "TEXT"),
266+
)
262267

263268

264269
class LambdaLoggerFilter(logging.Filter):
@@ -299,7 +304,7 @@ def __enter__(self):
299304
def __exit__(self, exc_type, exc_value, exc_tb):
300305
pass
301306

302-
def log(self, msg):
307+
def log(self, msg, log_level=None, log_format=None):
303308
sys.stdout.write(msg)
304309

305310
def log_error(self, message_lines):
@@ -323,9 +328,19 @@ class FramedTelemetryLogSink(object):
323328
The next 'len' bytes contain the message. The byte order is big-endian.
324329
"""
325330

331+
LEVEL_TO_MASK = {
332+
logging.NOTSET: 0b00000,
333+
logging.DEBUG: 0b01000,
334+
logging.INFO: 0b01100,
335+
logging.WARNING: 0b10000,
336+
logging.ERROR: 0b10100,
337+
logging.FATAL: 0b11000,
338+
}
339+
DEFAULT_LEVEL_MASK = 0b00000
340+
326341
def __init__(self, fd):
327342
self.fd = int(fd)
328-
self.frame_type = 0xA55A0003.to_bytes(4, "big")
343+
self.frame_type = 0xA55A0002
329344

330345
def __enter__(self):
331346
self.file = os.fdopen(self.fd, "wb", 0)
@@ -334,11 +349,28 @@ def __enter__(self):
334349
def __exit__(self, exc_type, exc_value, exc_tb):
335350
self.file.close()
336351

337-
def log(self, msg):
352+
def set_log_level(self, frame_type, log_level):
353+
mask = self.LEVEL_TO_MASK.get(log_level, self.DEFAULT_LEVEL_MASK)
354+
frame_type |= mask
355+
356+
return frame_type
357+
358+
@staticmethod
359+
def set_log_format(frame_type, log_format):
360+
if log_format == "JSON":
361+
mask = 0b00
362+
else:
363+
mask = 0b01
364+
return frame_type | mask
365+
366+
def log(self, msg, log_level=logging.NOTSET, log_format="TEXT"):
367+
frame_type = self.set_log_level(self.frame_type, log_level)
368+
frame_type = self.set_log_format(frame_type, log_format)
369+
338370
encoded_msg = msg.encode("utf8")
339371
timestamp = int(time.time_ns() / 1000) # UNIX timestamp in microseconds
340372
log_msg = (
341-
self.frame_type
373+
frame_type.to_bytes(4, "big")
342374
+ len(encoded_msg).to_bytes(4, "big")
343375
+ timestamp.to_bytes(8, "big")
344376
+ encoded_msg
@@ -347,7 +379,7 @@ def log(self, msg):
347379

348380
def log_error(self, message_lines):
349381
error_message = "\n".join(message_lines)
350-
self.log(error_message)
382+
self.log(error_message, log_level=logging.FATAL)
351383

352384

353385
def update_xray_env_variable(xray_trace_id):

awslambdaric/lambda_runtime_log_utils.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
"processName",
3030
"process",
3131
"aws_request_id",
32+
"log_format",
3233
}
3334

3435

@@ -64,6 +65,8 @@ def format_location(record: logging.LogRecord):
6465
return f"{record.pathname}:{record.funcName}:{record.lineno}"
6566

6667
def format(self, record: logging.LogRecord) -> str:
68+
record.log_format = "JSON"
69+
6770
result = {
6871
"timestamp": self.formatTime(record, self.datefmt),
6972
"level": record.levelname,

tests/test_bootstrap.py

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -938,7 +938,7 @@ def test_log_error_framed_log_sink(self):
938938
content = f.read()
939939

940940
frame_type = int.from_bytes(content[:4], "big")
941-
self.assertEqual(frame_type, 0xA55A0003)
941+
self.assertEqual(frame_type, 0xA55A001B)
942942

943943
length = int.from_bytes(content[4:8], "big")
944944
self.assertEqual(length, len(expected_logged_error.encode("utf8")))
@@ -984,7 +984,7 @@ def test_log_error_indentation_framed_log_sink(self):
984984
content = f.read()
985985

986986
frame_type = int.from_bytes(content[:4], "big")
987-
self.assertEqual(frame_type, 0xA55A0003)
987+
self.assertEqual(frame_type, 0xA55A001B)
988988

989989
length = int.from_bytes(content[4:8], "big")
990990
self.assertEqual(length, len(expected_logged_error.encode("utf8")))
@@ -1027,7 +1027,7 @@ def test_log_error_empty_stacktrace_line_framed_log_sink(self):
10271027
content = f.read()
10281028

10291029
frame_type = int.from_bytes(content[:4], "big")
1030-
self.assertEqual(frame_type, 0xA55A0003)
1030+
self.assertEqual(frame_type, 0xA55A001B)
10311031

10321032
length = int.from_bytes(content[4:8], "big")
10331033
self.assertEqual(length, len(expected_logged_error))
@@ -1064,7 +1064,7 @@ def test_log_error_invokeId_line_framed_log_sink(self):
10641064
content = f.read()
10651065

10661066
frame_type = int.from_bytes(content[:4], "big")
1067-
self.assertEqual(frame_type, 0xA55A0003)
1067+
self.assertEqual(frame_type, 0xA55A001B)
10681068

10691069
length = int.from_bytes(content[4:8], "big")
10701070
self.assertEqual(length, len(expected_logged_error))
@@ -1119,6 +1119,47 @@ def test_create_framed_telemetry_log_sinks(self):
11191119
self.assertEqual(actual.fd, fd)
11201120
self.assertFalse("_LAMBDA_TELEMETRY_LOG_FD" in os.environ)
11211121

1122+
def test_log_level_frame_type(self):
1123+
test_cases = [
1124+
(logging.NOTSET, 0xA55A0003),
1125+
(logging.DEBUG, 0xA55A000B),
1126+
(logging.INFO, 0xA55A000F),
1127+
(logging.WARNING, 0xA55A0013),
1128+
(logging.ERROR, 0xA55A0017),
1129+
(logging.FATAL, 0xA55A001B),
1130+
]
1131+
1132+
for level, expected_frame_type in test_cases:
1133+
with NamedTemporaryFile() as temp_file:
1134+
with bootstrap.FramedTelemetryLogSink(
1135+
os.open(temp_file.name, os.O_CREAT | os.O_RDWR)
1136+
) as ls:
1137+
ls.log("hello world", log_level=level)
1138+
with open(temp_file.name, "rb") as f:
1139+
content = f.read()
1140+
1141+
frame_type = int.from_bytes(content[:4], "big")
1142+
self.assertEqual(frame_type, expected_frame_type, hex(frame_type))
1143+
1144+
def test_log_format_frame_type(self):
1145+
test_cases = [
1146+
("TEXT", 0xA55A0003),
1147+
("WRONG_FORMAT", 0xA55A0003),
1148+
("JSON", 0xA55A0002),
1149+
]
1150+
1151+
for fmt, expected_frame_type in test_cases:
1152+
with NamedTemporaryFile() as temp_file:
1153+
with bootstrap.FramedTelemetryLogSink(
1154+
os.open(temp_file.name, os.O_CREAT | os.O_RDWR)
1155+
) as ls:
1156+
ls.log("hello world", log_format=fmt)
1157+
with open(temp_file.name, "rb") as f:
1158+
content = f.read()
1159+
1160+
frame_type = int.from_bytes(content[:4], "big")
1161+
self.assertEqual(frame_type, expected_frame_type, hex(frame_type))
1162+
11221163
def test_single_frame(self):
11231164
with NamedTemporaryFile() as temp_file:
11241165
message = "hello world\nsomething on a new line!\n"

0 commit comments

Comments
 (0)