Skip to content

Commit 2b4740a

Browse files
authored
feat(parser): extract CloudWatch Logs in Kinesis streams (#1726)
1 parent 206038e commit 2b4740a

File tree

2 files changed

+61
-1
lines changed

2 files changed

+61
-1
lines changed

aws_lambda_powertools/utilities/parser/models/kinesis.py

+21-1
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
1-
from typing import List, Type, Union
1+
import json
2+
import zlib
3+
from typing import Dict, List, Type, Union
24

35
from pydantic import BaseModel, validator
46

57
from aws_lambda_powertools.shared.functions import base64_decode
8+
from aws_lambda_powertools.utilities.parser.models.cloudwatch import (
9+
CloudWatchLogsDecode,
10+
)
611
from aws_lambda_powertools.utilities.parser.types import Literal
712

813

@@ -28,6 +33,21 @@ class KinesisDataStreamRecord(BaseModel):
2833
eventSourceARN: str
2934
kinesis: KinesisDataStreamRecordPayload
3035

36+
def decompress_zlib_record_data_as_json(self) -> Dict:
37+
"""Decompress Kinesis Record bytes data zlib compressed to JSON"""
38+
if not isinstance(self.kinesis.data, bytes):
39+
raise ValueError("We can only decompress bytes data, not custom models.")
40+
41+
return json.loads(zlib.decompress(self.kinesis.data, zlib.MAX_WBITS | 32))
42+
3143

3244
class KinesisDataStreamModel(BaseModel):
3345
Records: List[KinesisDataStreamRecord]
46+
47+
48+
def extract_cloudwatch_logs_from_event(event: KinesisDataStreamModel) -> List[CloudWatchLogsDecode]:
49+
return [CloudWatchLogsDecode(**record.decompress_zlib_record_data_as_json()) for record in event.Records]
50+
51+
52+
def extract_cloudwatch_logs_from_record(record: KinesisDataStreamRecord) -> CloudWatchLogsDecode:
53+
return CloudWatchLogsDecode(**record.decompress_zlib_record_data_as_json())

tests/functional/parser/test_kinesis.py

+40
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import pytest
44

55
from aws_lambda_powertools.utilities.parser import (
6+
BaseModel,
67
ValidationError,
78
envelopes,
89
event_parser,
@@ -11,6 +12,13 @@
1112
KinesisDataStreamModel,
1213
KinesisDataStreamRecordPayload,
1314
)
15+
from aws_lambda_powertools.utilities.parser.models.cloudwatch import (
16+
CloudWatchLogsDecode,
17+
)
18+
from aws_lambda_powertools.utilities.parser.models.kinesis import (
19+
extract_cloudwatch_logs_from_event,
20+
extract_cloudwatch_logs_from_record,
21+
)
1422
from aws_lambda_powertools.utilities.typing import LambdaContext
1523
from tests.functional.parser.schemas import MyKinesisBusiness
1624
from tests.functional.utils import load_event
@@ -111,3 +119,35 @@ def test_validate_event_does_not_conform_with_model():
111119
event_dict: Any = {"hello": "s"}
112120
with pytest.raises(ValidationError):
113121
handle_kinesis(event_dict, LambdaContext())
122+
123+
124+
def test_kinesis_stream_event_cloudwatch_logs_data_extraction():
125+
# GIVEN a KinesisDataStreamModel is instantiated with CloudWatch Logs compressed data
126+
event_dict = load_event("kinesisStreamCloudWatchLogsEvent.json")
127+
stream_data = KinesisDataStreamModel(**event_dict)
128+
single_record = stream_data.Records[0]
129+
130+
# WHEN we try to extract CloudWatch Logs from KinesisDataStreamRecordPayload model
131+
extracted_logs = extract_cloudwatch_logs_from_event(stream_data)
132+
individual_logs = [extract_cloudwatch_logs_from_record(record) for record in stream_data.Records]
133+
single_log = extract_cloudwatch_logs_from_record(single_record)
134+
135+
# THEN we should have extracted any potential logs as CloudWatchLogsDecode models
136+
assert len(extracted_logs) == len(individual_logs)
137+
assert isinstance(single_log, CloudWatchLogsDecode)
138+
139+
140+
def test_kinesis_stream_event_cloudwatch_logs_data_extraction_fails_with_custom_model():
141+
# GIVEN a custom model replaces Kinesis Record Data bytes
142+
class DummyModel(BaseModel):
143+
...
144+
145+
event_dict = load_event("kinesisStreamCloudWatchLogsEvent.json")
146+
stream_data = KinesisDataStreamModel(**event_dict)
147+
148+
# WHEN decompress_zlib_record_data_as_json is used
149+
# THEN ValueError should be raised
150+
with pytest.raises(ValueError, match="We can only decompress bytes data"):
151+
for record in stream_data.Records:
152+
record.kinesis.data = DummyModel()
153+
record.decompress_zlib_record_data_as_json()

0 commit comments

Comments
 (0)