Skip to content

feat: Add cloudwatch lambda event support to Parser utility #231

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Dec 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion aws_lambda_powertools/utilities/parser/envelopes/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
from .base import BaseEnvelope
from .cloudwatch import CloudWatchLogsEnvelope
from .dynamodb import DynamoDBStreamEnvelope
from .event_bridge import EventBridgeEnvelope
from .sns import SnsEnvelope
from .sqs import SqsEnvelope

__all__ = ["DynamoDBStreamEnvelope", "EventBridgeEnvelope", "SnsEnvelope", "SqsEnvelope", "BaseEnvelope"]
__all__ = [
"CloudWatchLogsEnvelope",
"DynamoDBStreamEnvelope",
"EventBridgeEnvelope",
"SnsEnvelope",
"SqsEnvelope",
"BaseEnvelope",
]
42 changes: 42 additions & 0 deletions aws_lambda_powertools/utilities/parser/envelopes/cloudwatch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import logging
from typing import Any, Dict, List, Optional, Union

from ..models import CloudWatchLogsModel
from ..types import Model
from .base import BaseEnvelope

logger = logging.getLogger(__name__)


class CloudWatchLogsEnvelope(BaseEnvelope):
"""CloudWatch Envelope to extract a List of log records.

The record's body parameter is a string (after being base64 decoded and gzipped),
though it can also be a JSON encoded string.
Regardless of its type it'll be parsed into a BaseModel object.

Note: The record will be parsed the same way so if model is str
"""

def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Model) -> List[Optional[Model]]:
"""Parses records found with model provided

Parameters
----------
data : Dict
Lambda event to be parsed
model : Model
Data model provided to parse after extracting data using envelope

Returns
-------
List
List of records parsed with model provided
"""
logger.debug(f"Parsing incoming data with SNS model {CloudWatchLogsModel}")
parsed_envelope = CloudWatchLogsModel.parse_obj(data)
logger.debug(f"Parsing CloudWatch records in `body` with {model}")
output = []
for record in parsed_envelope.awslogs.decoded_data.logEvents:
output.append(self._parse(data=record.message, model=model))
return output
5 changes: 5 additions & 0 deletions aws_lambda_powertools/utilities/parser/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
from .alb import AlbModel, AlbRequestContext, AlbRequestContextData
from .cloudwatch import CloudWatchLogsData, CloudWatchLogsDecode, CloudWatchLogsLogEvent, CloudWatchLogsModel
from .dynamodb import DynamoDBStreamChangedRecordModel, DynamoDBStreamModel, DynamoDBStreamRecordModel
from .event_bridge import EventBridgeModel
from .ses import SesModel, SesRecordModel
from .sns import SnsModel, SnsNotificationModel, SnsRecordModel
from .sqs import SqsModel, SqsRecordModel

__all__ = [
"CloudWatchLogsData",
"CloudWatchLogsDecode",
"CloudWatchLogsLogEvent",
"CloudWatchLogsModel",
"AlbModel",
"AlbRequestContext",
"AlbRequestContextData",
Expand Down
44 changes: 44 additions & 0 deletions aws_lambda_powertools/utilities/parser/models/cloudwatch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import base64
import json
import logging
import zlib
from datetime import datetime
from typing import List

from pydantic import BaseModel, Field, validator

logger = logging.getLogger(__name__)


class CloudWatchLogsLogEvent(BaseModel):
id: str # noqa AA03 VNE003
timestamp: datetime
message: str


class CloudWatchLogsDecode(BaseModel):
messageType: str
owner: str
logGroup: str
logStream: str
subscriptionFilters: List[str]
logEvents: List[CloudWatchLogsLogEvent]


class CloudWatchLogsData(BaseModel):
decoded_data: CloudWatchLogsDecode = Field(None, alias="data")

@validator("decoded_data", pre=True)
def prepare_data(cls, value):
try:
logger.debug("Decoding base64 cloudwatch log data before parsing")
payload = base64.b64decode(value)
logger.debug("Decompressing cloudwatch log data before parsing")
uncompressed = zlib.decompress(payload, zlib.MAX_WBITS | 32)
return json.loads(uncompressed.decode("utf-8"))
except Exception:
raise ValueError("unable to decompress data")


class CloudWatchLogsModel(BaseModel):
awslogs: CloudWatchLogsData
2 changes: 2 additions & 0 deletions docs/content/utilities/parser.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ Model name | Description
**EventBridgeModel** | Lambda Event Source payload for Amazon EventBridge
**SqsModel** | Lambda Event Source payload for Amazon SQS
**AlbModel** | Lambda Event Source payload for Amazon Application Load Balancer
**CloudwatchLogsModel** | Lambda Event Source payload for Amazon CloudWatch Logs

You can extend them to include your own models, and yet have all other known fields parsed along the way.

Expand Down Expand Up @@ -293,6 +294,7 @@ Envelope name | Behaviour | Return
**DynamoDBStreamEnvelope** | 1. Parses data using `DynamoDBStreamModel`. <br/> 2. Parses records in `NewImage` and `OldImage` keys using your model. <br/> 3. Returns a list with a dictionary containing `NewImage` and `OldImage` keys | `List[Dict[str, Optional[Model]]]`
**EventBridgeEnvelope** | 1. Parses data using `EventBridgeModel`. <br/> 2. Parses `detail` key using your model and returns it. | `Model`
**SqsEnvelope** | 1. Parses data using `SqsModel`. <br/> 2. Parses records in `body` key using your model and return them in a list. | `List[Model]`
**CloudWatchLogsEnvelope** | 1. Parses data using `CloudwatchLogsModel` which will base64 decode and decompress it. <br/> 2. Parses records in `message` key using your model and return them in a list. | `List[Model]`

### Bringing your own envelope

Expand Down
5 changes: 5 additions & 0 deletions tests/functional/parser/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,8 @@ class MyAdvancedSnsRecordModel(SnsRecordModel):

class MyAdvancedSnsBusiness(SnsModel):
Records: List[MyAdvancedSnsRecordModel]


class MyCloudWatchBusiness(BaseModel):
my_message: str
user: str
74 changes: 74 additions & 0 deletions tests/functional/parser/test_cloudwatch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import base64
import json
import zlib
from typing import List

import pytest

from aws_lambda_powertools.utilities.parser import ValidationError, envelopes, event_parser
from aws_lambda_powertools.utilities.parser.models import CloudWatchLogsLogEvent, CloudWatchLogsModel
from aws_lambda_powertools.utilities.typing import LambdaContext
from tests.functional.parser.schemas import MyCloudWatchBusiness
from tests.functional.parser.utils import load_event


@event_parser(model=MyCloudWatchBusiness, envelope=envelopes.CloudWatchLogsEnvelope)
def handle_cloudwatch_logs(event: List[MyCloudWatchBusiness], _: LambdaContext):
assert len(event) == 1
log: MyCloudWatchBusiness = event[0]
assert log.my_message == "hello"
assert log.user == "test"


@event_parser(model=CloudWatchLogsModel)
def handle_cloudwatch_logs_no_envelope(event: CloudWatchLogsModel, _: LambdaContext):
assert event.awslogs.decoded_data.owner == "123456789123"
assert event.awslogs.decoded_data.logGroup == "testLogGroup"
assert event.awslogs.decoded_data.logStream == "testLogStream"
assert event.awslogs.decoded_data.subscriptionFilters == ["testFilter"]
assert event.awslogs.decoded_data.messageType == "DATA_MESSAGE"

assert len(event.awslogs.decoded_data.logEvents) == 2
log_record: CloudWatchLogsLogEvent = event.awslogs.decoded_data.logEvents[0]
assert log_record.id == "eventId1"
convert_time = int(round(log_record.timestamp.timestamp() * 1000))
assert convert_time == 1440442987000
assert log_record.message == "[ERROR] First test message"
log_record: CloudWatchLogsLogEvent = event.awslogs.decoded_data.logEvents[1]
assert log_record.id == "eventId2"
convert_time = int(round(log_record.timestamp.timestamp() * 1000))
assert convert_time == 1440442987001
assert log_record.message == "[ERROR] Second test message"


def test_validate_event_user_model_with_envelope():
my_log_message = {"my_message": "hello", "user": "test"}
inner_event_dict = {
"messageType": "DATA_MESSAGE",
"owner": "123456789123",
"logGroup": "testLogGroup",
"logStream": "testLogStream",
"subscriptionFilters": ["testFilter"],
"logEvents": [{"id": "eventId1", "timestamp": 1440442987000, "message": json.dumps(my_log_message)}],
}
dict_str = json.dumps(inner_event_dict)
compressesd_str = zlib.compress(str.encode(dict_str), -1)
event_dict = {"awslogs": {"data": base64.b64encode(compressesd_str)}}

handle_cloudwatch_logs(event_dict, LambdaContext())


def test_validate_event_does_not_conform_with_user_dict_model():
event_dict = load_event("cloudWatchLogEvent.json")
with pytest.raises(ValidationError):
handle_cloudwatch_logs(event_dict, LambdaContext())


def test_handle_cloudwatch_trigger_event_no_envelope():
event_dict = load_event("cloudWatchLogEvent.json")
handle_cloudwatch_logs_no_envelope(event_dict, LambdaContext())


def test_handle_invalid_event_with_envelope():
with pytest.raises(ValidationError):
handle_cloudwatch_logs(event={}, context=LambdaContext())