Skip to content

Commit 836c066

Browse files
author
Ran Isenberg
committed
feat: Add cloudwatch lambda event support to Parser utility
1 parent fa34d82 commit 836c066

File tree

6 files changed

+174
-1
lines changed

6 files changed

+174
-1
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,15 @@
11
from .base import BaseEnvelope
2+
from .cloudwatch import CloudatchEnvelope
23
from .dynamodb import DynamoDBStreamEnvelope
34
from .event_bridge import EventBridgeEnvelope
45
from .sns import SnsEnvelope
56
from .sqs import SqsEnvelope
67

7-
__all__ = ["DynamoDBStreamEnvelope", "EventBridgeEnvelope", "SnsEnvelope", "SqsEnvelope", "BaseEnvelope"]
8+
__all__ = [
9+
"CloudatchEnvelope",
10+
"DynamoDBStreamEnvelope",
11+
"EventBridgeEnvelope",
12+
"SnsEnvelope",
13+
"SqsEnvelope",
14+
"BaseEnvelope",
15+
]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import logging
2+
from typing import Any, Dict, List, Optional, Union
3+
4+
from ..models import CloudWatchLogsModel
5+
from ..types import Model
6+
from .base import BaseEnvelope
7+
8+
logger = logging.getLogger(__name__)
9+
10+
11+
class CloudatchEnvelope(BaseEnvelope):
12+
"""Cloudatch Envelope to extract a List of log records.
13+
14+
The record's body parameter is a string (after being base64 decoded and gzipped),
15+
though it can also be a JSON encoded string.
16+
Regardless of its type it'll be parsed into a BaseModel object.
17+
18+
Note: The record will be parsed the same way so if model is str
19+
"""
20+
21+
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Model) -> List[Optional[Model]]:
22+
"""Parses records found with model provided
23+
24+
Parameters
25+
----------
26+
data : Dict
27+
Lambda event to be parsed
28+
model : Model
29+
Data model provided to parse after extracting data using envelope
30+
31+
Returns
32+
-------
33+
List
34+
List of records parsed with model provided
35+
"""
36+
logger.debug(f"Parsing incoming data with SNS model {CloudWatchLogsModel}")
37+
parsed_envelope = CloudWatchLogsModel.parse_obj(data)
38+
logger.debug(f"Parsing CloudWatch records in `body` with {model}")
39+
output = []
40+
for record in parsed_envelope.awslogs.decoded_data.logEvents:
41+
output.append(self._parse(data=record.message, model=model))
42+
return output

aws_lambda_powertools/utilities/parser/models/__init__.py

+5
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
1+
from .cloudwatch import CloudWatchLogsData, CloudWatchLogsDecode, CloudWatchLogsLogEvent, CloudWatchLogsModel
12
from .dynamodb import DynamoDBStreamChangedRecordModel, DynamoDBStreamModel, DynamoDBStreamRecordModel
23
from .event_bridge import EventBridgeModel
34
from .ses import SesModel, SesRecordModel
45
from .sns import SnsModel, SnsNotificationModel, SnsRecordModel
56
from .sqs import SqsModel, SqsRecordModel
67

78
__all__ = [
9+
"CloudWatchLogsData",
10+
"CloudWatchLogsDecode",
11+
"CloudWatchLogsLogEvent",
12+
"CloudWatchLogsModel",
813
"DynamoDBStreamModel",
914
"EventBridgeModel",
1015
"DynamoDBStreamChangedRecordModel",
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import base64
2+
import json
3+
import zlib
4+
from datetime import datetime
5+
from typing import List
6+
7+
from pydantic import BaseModel, Field, validator
8+
9+
10+
class CloudWatchLogsLogEvent(BaseModel):
11+
id: str # noqa AA03 VNE003
12+
timestamp: datetime
13+
message: str
14+
15+
16+
class CloudWatchLogsDecode(BaseModel):
17+
messageType: str
18+
owner: str
19+
logGroup: str
20+
logStream: str
21+
subscriptionFilters: List[str]
22+
logEvents: List[CloudWatchLogsLogEvent]
23+
24+
25+
class CloudWatchLogsData(BaseModel):
26+
decoded_data: CloudWatchLogsDecode = Field(None, alias="data")
27+
28+
@validator("decoded_data", pre=True)
29+
def prepare_data(cls, value):
30+
try:
31+
payload = base64.b64decode(value)
32+
uncompressed = zlib.decompress(payload, zlib.MAX_WBITS | 32)
33+
return json.loads(uncompressed.decode("UTF-8"))
34+
except Exception:
35+
raise ValueError("unable to decompress data")
36+
37+
38+
class CloudWatchLogsModel(BaseModel):
39+
awslogs: CloudWatchLogsData

tests/functional/parser/schemas.py

+5
Original file line numberDiff line numberDiff line change
@@ -71,3 +71,8 @@ class MyAdvancedSnsRecordModel(SnsRecordModel):
7171

7272
class MyAdvancedSnsBusiness(SnsModel):
7373
Records: List[MyAdvancedSnsRecordModel]
74+
75+
76+
class MyCloudWatchBusiness(BaseModel):
77+
my_message: str
78+
user: str
+74
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
import base64
2+
import json
3+
import zlib
4+
from typing import List
5+
6+
import pytest
7+
8+
from aws_lambda_powertools.utilities.parser import ValidationError, envelopes, event_parser
9+
from aws_lambda_powertools.utilities.parser.models import CloudWatchLogsLogEvent, CloudWatchLogsModel
10+
from aws_lambda_powertools.utilities.typing import LambdaContext
11+
from tests.functional.parser.schemas import MyCloudWatchBusiness
12+
from tests.functional.parser.utils import load_event
13+
14+
15+
@event_parser(model=MyCloudWatchBusiness, envelope=envelopes.CloudatchEnvelope)
16+
def handle_cloudwatch_logs(event: List[MyCloudWatchBusiness], _: LambdaContext):
17+
assert len(event) == 1
18+
log: MyCloudWatchBusiness = event[0]
19+
assert log.my_message == "hello"
20+
assert log.user == "test"
21+
22+
23+
@event_parser(model=CloudWatchLogsModel)
24+
def handle_cloudwatch_logs_no_envelope(event: CloudWatchLogsModel, _: LambdaContext):
25+
assert event.awslogs.decoded_data.owner == "123456789123"
26+
assert event.awslogs.decoded_data.logGroup == "testLogGroup"
27+
assert event.awslogs.decoded_data.logStream == "testLogStream"
28+
assert event.awslogs.decoded_data.subscriptionFilters == ["testFilter"]
29+
assert event.awslogs.decoded_data.messageType == "DATA_MESSAGE"
30+
31+
assert len(event.awslogs.decoded_data.logEvents) == 2
32+
log_record: CloudWatchLogsLogEvent = event.awslogs.decoded_data.logEvents[0]
33+
assert log_record.id == "eventId1"
34+
convert_time = int(round(log_record.timestamp.timestamp() * 1000))
35+
assert convert_time == 1440442987000
36+
assert log_record.message == "[ERROR] First test message"
37+
log_record: CloudWatchLogsLogEvent = event.awslogs.decoded_data.logEvents[1]
38+
assert log_record.id == "eventId2"
39+
convert_time = int(round(log_record.timestamp.timestamp() * 1000))
40+
assert convert_time == 1440442987001
41+
assert log_record.message == "[ERROR] Second test message"
42+
43+
44+
def test_validate_event_user_model_with_envelope():
45+
my_log_message = {"my_message": "hello", "user": "test"}
46+
inner_event_dict = {
47+
"messageType": "DATA_MESSAGE",
48+
"owner": "123456789123",
49+
"logGroup": "testLogGroup",
50+
"logStream": "testLogStream",
51+
"subscriptionFilters": ["testFilter"],
52+
"logEvents": [{"id": "eventId1", "timestamp": 1440442987000, "message": json.dumps(my_log_message)}],
53+
}
54+
dict_str = json.dumps(inner_event_dict)
55+
compressesd_str = zlib.compress(str.encode(dict_str), -1)
56+
event_dict = {"awslogs": {"data": base64.b64encode(compressesd_str)}}
57+
58+
handle_cloudwatch_logs(event_dict, LambdaContext())
59+
60+
61+
def test_validate_event_does_not_conform_with_user_dict_model():
62+
event_dict = load_event("cloudWatchLogEvent.json")
63+
with pytest.raises(ValidationError):
64+
handle_cloudwatch_logs(event_dict, LambdaContext())
65+
66+
67+
def test_handle_cloudwatch_trigger_event_no_envelope():
68+
event_dict = load_event("cloudWatchLogEvent.json")
69+
handle_cloudwatch_logs_no_envelope(event_dict, LambdaContext())
70+
71+
72+
def test_handle_invalid_event_with_envelope():
73+
with pytest.raises(ValidationError):
74+
handle_cloudwatch_logs(event={}, context=LambdaContext())

0 commit comments

Comments
 (0)