Skip to content

Commit 88bd2e0

Browse files
authored
Merge pull request #231 from risenberg-cyberark/cloudwatch
feat: Add cloudwatch lambda event support to Parser utility
2 parents 415bfc6 + ccd2a00 commit 88bd2e0

File tree

7 files changed

+181
-1
lines changed

7 files changed

+181
-1
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,15 @@
11
from .base import BaseEnvelope
2+
from .cloudwatch import CloudWatchLogsEnvelope
23
from .dynamodb import DynamoDBStreamEnvelope
34
from .event_bridge import EventBridgeEnvelope
45
from .sns import SnsEnvelope
56
from .sqs import SqsEnvelope
67

7-
__all__ = ["DynamoDBStreamEnvelope", "EventBridgeEnvelope", "SnsEnvelope", "SqsEnvelope", "BaseEnvelope"]
8+
__all__ = [
9+
"CloudWatchLogsEnvelope",
10+
"DynamoDBStreamEnvelope",
11+
"EventBridgeEnvelope",
12+
"SnsEnvelope",
13+
"SqsEnvelope",
14+
"BaseEnvelope",
15+
]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import logging
2+
from typing import Any, Dict, List, Optional, Union
3+
4+
from ..models import CloudWatchLogsModel
5+
from ..types import Model
6+
from .base import BaseEnvelope
7+
8+
logger = logging.getLogger(__name__)
9+
10+
11+
class CloudWatchLogsEnvelope(BaseEnvelope):
12+
"""CloudWatch Envelope to extract a List of log records.
13+
14+
The record's body parameter is a string (after being base64 decoded and gzipped),
15+
though it can also be a JSON encoded string.
16+
Regardless of its type it'll be parsed into a BaseModel object.
17+
18+
Note: The record will be parsed the same way so if model is str
19+
"""
20+
21+
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Model) -> List[Optional[Model]]:
22+
"""Parses records found with model provided
23+
24+
Parameters
25+
----------
26+
data : Dict
27+
Lambda event to be parsed
28+
model : Model
29+
Data model provided to parse after extracting data using envelope
30+
31+
Returns
32+
-------
33+
List
34+
List of records parsed with model provided
35+
"""
36+
logger.debug(f"Parsing incoming data with SNS model {CloudWatchLogsModel}")
37+
parsed_envelope = CloudWatchLogsModel.parse_obj(data)
38+
logger.debug(f"Parsing CloudWatch records in `body` with {model}")
39+
output = []
40+
for record in parsed_envelope.awslogs.decoded_data.logEvents:
41+
output.append(self._parse(data=record.message, model=model))
42+
return output

aws_lambda_powertools/utilities/parser/models/__init__.py

+5
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
11
from .alb import AlbModel, AlbRequestContext, AlbRequestContextData
2+
from .cloudwatch import CloudWatchLogsData, CloudWatchLogsDecode, CloudWatchLogsLogEvent, CloudWatchLogsModel
23
from .dynamodb import DynamoDBStreamChangedRecordModel, DynamoDBStreamModel, DynamoDBStreamRecordModel
34
from .event_bridge import EventBridgeModel
45
from .ses import SesModel, SesRecordModel
56
from .sns import SnsModel, SnsNotificationModel, SnsRecordModel
67
from .sqs import SqsModel, SqsRecordModel
78

89
__all__ = [
10+
"CloudWatchLogsData",
11+
"CloudWatchLogsDecode",
12+
"CloudWatchLogsLogEvent",
13+
"CloudWatchLogsModel",
914
"AlbModel",
1015
"AlbRequestContext",
1116
"AlbRequestContextData",
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import base64
2+
import json
3+
import logging
4+
import zlib
5+
from datetime import datetime
6+
from typing import List
7+
8+
from pydantic import BaseModel, Field, validator
9+
10+
logger = logging.getLogger(__name__)
11+
12+
13+
class CloudWatchLogsLogEvent(BaseModel):
14+
id: str # noqa AA03 VNE003
15+
timestamp: datetime
16+
message: str
17+
18+
19+
class CloudWatchLogsDecode(BaseModel):
20+
messageType: str
21+
owner: str
22+
logGroup: str
23+
logStream: str
24+
subscriptionFilters: List[str]
25+
logEvents: List[CloudWatchLogsLogEvent]
26+
27+
28+
class CloudWatchLogsData(BaseModel):
29+
decoded_data: CloudWatchLogsDecode = Field(None, alias="data")
30+
31+
@validator("decoded_data", pre=True)
32+
def prepare_data(cls, value):
33+
try:
34+
logger.debug("Decoding base64 cloudwatch log data before parsing")
35+
payload = base64.b64decode(value)
36+
logger.debug("Decompressing cloudwatch log data before parsing")
37+
uncompressed = zlib.decompress(payload, zlib.MAX_WBITS | 32)
38+
return json.loads(uncompressed.decode("utf-8"))
39+
except Exception:
40+
raise ValueError("unable to decompress data")
41+
42+
43+
class CloudWatchLogsModel(BaseModel):
44+
awslogs: CloudWatchLogsData

docs/content/utilities/parser.mdx

+2
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ Model name | Description
157157
**EventBridgeModel** | Lambda Event Source payload for Amazon EventBridge
158158
**SqsModel** | Lambda Event Source payload for Amazon SQS
159159
**AlbModel** | Lambda Event Source payload for Amazon Application Load Balancer
160+
**CloudwatchLogsModel** | Lambda Event Source payload for Amazon CloudWatch Logs
160161

161162
You can extend them to include your own models, and yet have all other known fields parsed along the way.
162163

@@ -293,6 +294,7 @@ Envelope name | Behaviour | Return
293294
**DynamoDBStreamEnvelope** | 1. Parses data using `DynamoDBStreamModel`. <br/> 2. Parses records in `NewImage` and `OldImage` keys using your model. <br/> 3. Returns a list with a dictionary containing `NewImage` and `OldImage` keys | `List[Dict[str, Optional[Model]]]`
294295
**EventBridgeEnvelope** | 1. Parses data using `EventBridgeModel`. <br/> 2. Parses `detail` key using your model and returns it. | `Model`
295296
**SqsEnvelope** | 1. Parses data using `SqsModel`. <br/> 2. Parses records in `body` key using your model and return them in a list. | `List[Model]`
297+
**CloudWatchLogsEnvelope** | 1. Parses data using `CloudwatchLogsModel` which will base64 decode and decompress it. <br/> 2. Parses records in `message` key using your model and return them in a list. | `List[Model]`
296298

297299
### Bringing your own envelope
298300

tests/functional/parser/schemas.py

+5
Original file line numberDiff line numberDiff line change
@@ -71,3 +71,8 @@ class MyAdvancedSnsRecordModel(SnsRecordModel):
7171

7272
class MyAdvancedSnsBusiness(SnsModel):
7373
Records: List[MyAdvancedSnsRecordModel]
74+
75+
76+
class MyCloudWatchBusiness(BaseModel):
77+
my_message: str
78+
user: str
+74
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
import base64
2+
import json
3+
import zlib
4+
from typing import List
5+
6+
import pytest
7+
8+
from aws_lambda_powertools.utilities.parser import ValidationError, envelopes, event_parser
9+
from aws_lambda_powertools.utilities.parser.models import CloudWatchLogsLogEvent, CloudWatchLogsModel
10+
from aws_lambda_powertools.utilities.typing import LambdaContext
11+
from tests.functional.parser.schemas import MyCloudWatchBusiness
12+
from tests.functional.parser.utils import load_event
13+
14+
15+
@event_parser(model=MyCloudWatchBusiness, envelope=envelopes.CloudWatchLogsEnvelope)
16+
def handle_cloudwatch_logs(event: List[MyCloudWatchBusiness], _: LambdaContext):
17+
assert len(event) == 1
18+
log: MyCloudWatchBusiness = event[0]
19+
assert log.my_message == "hello"
20+
assert log.user == "test"
21+
22+
23+
@event_parser(model=CloudWatchLogsModel)
24+
def handle_cloudwatch_logs_no_envelope(event: CloudWatchLogsModel, _: LambdaContext):
25+
assert event.awslogs.decoded_data.owner == "123456789123"
26+
assert event.awslogs.decoded_data.logGroup == "testLogGroup"
27+
assert event.awslogs.decoded_data.logStream == "testLogStream"
28+
assert event.awslogs.decoded_data.subscriptionFilters == ["testFilter"]
29+
assert event.awslogs.decoded_data.messageType == "DATA_MESSAGE"
30+
31+
assert len(event.awslogs.decoded_data.logEvents) == 2
32+
log_record: CloudWatchLogsLogEvent = event.awslogs.decoded_data.logEvents[0]
33+
assert log_record.id == "eventId1"
34+
convert_time = int(round(log_record.timestamp.timestamp() * 1000))
35+
assert convert_time == 1440442987000
36+
assert log_record.message == "[ERROR] First test message"
37+
log_record: CloudWatchLogsLogEvent = event.awslogs.decoded_data.logEvents[1]
38+
assert log_record.id == "eventId2"
39+
convert_time = int(round(log_record.timestamp.timestamp() * 1000))
40+
assert convert_time == 1440442987001
41+
assert log_record.message == "[ERROR] Second test message"
42+
43+
44+
def test_validate_event_user_model_with_envelope():
45+
my_log_message = {"my_message": "hello", "user": "test"}
46+
inner_event_dict = {
47+
"messageType": "DATA_MESSAGE",
48+
"owner": "123456789123",
49+
"logGroup": "testLogGroup",
50+
"logStream": "testLogStream",
51+
"subscriptionFilters": ["testFilter"],
52+
"logEvents": [{"id": "eventId1", "timestamp": 1440442987000, "message": json.dumps(my_log_message)}],
53+
}
54+
dict_str = json.dumps(inner_event_dict)
55+
compressesd_str = zlib.compress(str.encode(dict_str), -1)
56+
event_dict = {"awslogs": {"data": base64.b64encode(compressesd_str)}}
57+
58+
handle_cloudwatch_logs(event_dict, LambdaContext())
59+
60+
61+
def test_validate_event_does_not_conform_with_user_dict_model():
62+
event_dict = load_event("cloudWatchLogEvent.json")
63+
with pytest.raises(ValidationError):
64+
handle_cloudwatch_logs(event_dict, LambdaContext())
65+
66+
67+
def test_handle_cloudwatch_trigger_event_no_envelope():
68+
event_dict = load_event("cloudWatchLogEvent.json")
69+
handle_cloudwatch_logs_no_envelope(event_dict, LambdaContext())
70+
71+
72+
def test_handle_invalid_event_with_envelope():
73+
with pytest.raises(ValidationError):
74+
handle_cloudwatch_logs(event={}, context=LambdaContext())

0 commit comments

Comments
 (0)