From 836c066add5722bc08ee0c5dff743561a05bff63 Mon Sep 17 00:00:00 2001 From: Ran Isenberg Date: Mon, 30 Nov 2020 15:55:52 +0200 Subject: [PATCH 1/5] feat: Add cloudwatch lambda event support to Parser utility --- .../utilities/parser/envelopes/__init__.py | 10 ++- .../utilities/parser/envelopes/cloudwatch.py | 42 +++++++++++ .../utilities/parser/models/__init__.py | 5 ++ .../utilities/parser/models/cloudwatch.py | 39 ++++++++++ tests/functional/parser/schemas.py | 5 ++ tests/functional/parser/test_cloudwatch.py | 74 +++++++++++++++++++ 6 files changed, 174 insertions(+), 1 deletion(-) create mode 100644 aws_lambda_powertools/utilities/parser/envelopes/cloudwatch.py create mode 100644 aws_lambda_powertools/utilities/parser/models/cloudwatch.py create mode 100644 tests/functional/parser/test_cloudwatch.py diff --git a/aws_lambda_powertools/utilities/parser/envelopes/__init__.py b/aws_lambda_powertools/utilities/parser/envelopes/__init__.py index 4be73363b0f..dc1c51d71c2 100644 --- a/aws_lambda_powertools/utilities/parser/envelopes/__init__.py +++ b/aws_lambda_powertools/utilities/parser/envelopes/__init__.py @@ -1,7 +1,15 @@ from .base import BaseEnvelope +from .cloudwatch import CloudatchEnvelope from .dynamodb import DynamoDBStreamEnvelope from .event_bridge import EventBridgeEnvelope from .sns import SnsEnvelope from .sqs import SqsEnvelope -__all__ = ["DynamoDBStreamEnvelope", "EventBridgeEnvelope", "SnsEnvelope", "SqsEnvelope", "BaseEnvelope"] +__all__ = [ + "CloudatchEnvelope", + "DynamoDBStreamEnvelope", + "EventBridgeEnvelope", + "SnsEnvelope", + "SqsEnvelope", + "BaseEnvelope", +] diff --git a/aws_lambda_powertools/utilities/parser/envelopes/cloudwatch.py b/aws_lambda_powertools/utilities/parser/envelopes/cloudwatch.py new file mode 100644 index 00000000000..74f9d6acc4e --- /dev/null +++ b/aws_lambda_powertools/utilities/parser/envelopes/cloudwatch.py @@ -0,0 +1,42 @@ +import logging +from typing import Any, Dict, List, Optional, Union + +from ..models import CloudWatchLogsModel +from ..types import Model +from .base import BaseEnvelope + +logger = logging.getLogger(__name__) + + +class CloudatchEnvelope(BaseEnvelope): + """Cloudatch Envelope to extract a List of log records. + + The record's body parameter is a string (after being base64 decoded and gzipped), + though it can also be a JSON encoded string. + Regardless of its type it'll be parsed into a BaseModel object. + + Note: The record will be parsed the same way so if model is str + """ + + def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Model) -> List[Optional[Model]]: + """Parses records found with model provided + + Parameters + ---------- + data : Dict + Lambda event to be parsed + model : Model + Data model provided to parse after extracting data using envelope + + Returns + ------- + List + List of records parsed with model provided + """ + logger.debug(f"Parsing incoming data with SNS model {CloudWatchLogsModel}") + parsed_envelope = CloudWatchLogsModel.parse_obj(data) + logger.debug(f"Parsing CloudWatch records in `body` with {model}") + output = [] + for record in parsed_envelope.awslogs.decoded_data.logEvents: + output.append(self._parse(data=record.message, model=model)) + return output diff --git a/aws_lambda_powertools/utilities/parser/models/__init__.py b/aws_lambda_powertools/utilities/parser/models/__init__.py index 36ba05240b0..aa4a9e83e35 100644 --- a/aws_lambda_powertools/utilities/parser/models/__init__.py +++ b/aws_lambda_powertools/utilities/parser/models/__init__.py @@ -1,3 +1,4 @@ +from .cloudwatch import CloudWatchLogsData, CloudWatchLogsDecode, CloudWatchLogsLogEvent, CloudWatchLogsModel from .dynamodb import DynamoDBStreamChangedRecordModel, DynamoDBStreamModel, DynamoDBStreamRecordModel from .event_bridge import EventBridgeModel from .ses import SesModel, SesRecordModel @@ -5,6 +6,10 @@ from .sqs import SqsModel, SqsRecordModel __all__ = [ + "CloudWatchLogsData", + "CloudWatchLogsDecode", + "CloudWatchLogsLogEvent", + "CloudWatchLogsModel", "DynamoDBStreamModel", "EventBridgeModel", "DynamoDBStreamChangedRecordModel", diff --git a/aws_lambda_powertools/utilities/parser/models/cloudwatch.py b/aws_lambda_powertools/utilities/parser/models/cloudwatch.py new file mode 100644 index 00000000000..d43cde4e793 --- /dev/null +++ b/aws_lambda_powertools/utilities/parser/models/cloudwatch.py @@ -0,0 +1,39 @@ +import base64 +import json +import zlib +from datetime import datetime +from typing import List + +from pydantic import BaseModel, Field, validator + + +class CloudWatchLogsLogEvent(BaseModel): + id: str # noqa AA03 VNE003 + timestamp: datetime + message: str + + +class CloudWatchLogsDecode(BaseModel): + messageType: str + owner: str + logGroup: str + logStream: str + subscriptionFilters: List[str] + logEvents: List[CloudWatchLogsLogEvent] + + +class CloudWatchLogsData(BaseModel): + decoded_data: CloudWatchLogsDecode = Field(None, alias="data") + + @validator("decoded_data", pre=True) + def prepare_data(cls, value): + try: + payload = base64.b64decode(value) + uncompressed = zlib.decompress(payload, zlib.MAX_WBITS | 32) + return json.loads(uncompressed.decode("UTF-8")) + except Exception: + raise ValueError("unable to decompress data") + + +class CloudWatchLogsModel(BaseModel): + awslogs: CloudWatchLogsData diff --git a/tests/functional/parser/schemas.py b/tests/functional/parser/schemas.py index bfc601e3537..23614a138b4 100644 --- a/tests/functional/parser/schemas.py +++ b/tests/functional/parser/schemas.py @@ -71,3 +71,8 @@ class MyAdvancedSnsRecordModel(SnsRecordModel): class MyAdvancedSnsBusiness(SnsModel): Records: List[MyAdvancedSnsRecordModel] + + +class MyCloudWatchBusiness(BaseModel): + my_message: str + user: str diff --git a/tests/functional/parser/test_cloudwatch.py b/tests/functional/parser/test_cloudwatch.py new file mode 100644 index 00000000000..786ebebb89b --- /dev/null +++ b/tests/functional/parser/test_cloudwatch.py @@ -0,0 +1,74 @@ +import base64 +import json +import zlib +from typing import List + +import pytest + +from aws_lambda_powertools.utilities.parser import ValidationError, envelopes, event_parser +from aws_lambda_powertools.utilities.parser.models import CloudWatchLogsLogEvent, CloudWatchLogsModel +from aws_lambda_powertools.utilities.typing import LambdaContext +from tests.functional.parser.schemas import MyCloudWatchBusiness +from tests.functional.parser.utils import load_event + + +@event_parser(model=MyCloudWatchBusiness, envelope=envelopes.CloudatchEnvelope) +def handle_cloudwatch_logs(event: List[MyCloudWatchBusiness], _: LambdaContext): + assert len(event) == 1 + log: MyCloudWatchBusiness = event[0] + assert log.my_message == "hello" + assert log.user == "test" + + +@event_parser(model=CloudWatchLogsModel) +def handle_cloudwatch_logs_no_envelope(event: CloudWatchLogsModel, _: LambdaContext): + assert event.awslogs.decoded_data.owner == "123456789123" + assert event.awslogs.decoded_data.logGroup == "testLogGroup" + assert event.awslogs.decoded_data.logStream == "testLogStream" + assert event.awslogs.decoded_data.subscriptionFilters == ["testFilter"] + assert event.awslogs.decoded_data.messageType == "DATA_MESSAGE" + + assert len(event.awslogs.decoded_data.logEvents) == 2 + log_record: CloudWatchLogsLogEvent = event.awslogs.decoded_data.logEvents[0] + assert log_record.id == "eventId1" + convert_time = int(round(log_record.timestamp.timestamp() * 1000)) + assert convert_time == 1440442987000 + assert log_record.message == "[ERROR] First test message" + log_record: CloudWatchLogsLogEvent = event.awslogs.decoded_data.logEvents[1] + assert log_record.id == "eventId2" + convert_time = int(round(log_record.timestamp.timestamp() * 1000)) + assert convert_time == 1440442987001 + assert log_record.message == "[ERROR] Second test message" + + +def test_validate_event_user_model_with_envelope(): + my_log_message = {"my_message": "hello", "user": "test"} + inner_event_dict = { + "messageType": "DATA_MESSAGE", + "owner": "123456789123", + "logGroup": "testLogGroup", + "logStream": "testLogStream", + "subscriptionFilters": ["testFilter"], + "logEvents": [{"id": "eventId1", "timestamp": 1440442987000, "message": json.dumps(my_log_message)}], + } + dict_str = json.dumps(inner_event_dict) + compressesd_str = zlib.compress(str.encode(dict_str), -1) + event_dict = {"awslogs": {"data": base64.b64encode(compressesd_str)}} + + handle_cloudwatch_logs(event_dict, LambdaContext()) + + +def test_validate_event_does_not_conform_with_user_dict_model(): + event_dict = load_event("cloudWatchLogEvent.json") + with pytest.raises(ValidationError): + handle_cloudwatch_logs(event_dict, LambdaContext()) + + +def test_handle_cloudwatch_trigger_event_no_envelope(): + event_dict = load_event("cloudWatchLogEvent.json") + handle_cloudwatch_logs_no_envelope(event_dict, LambdaContext()) + + +def test_handle_invalid_event_with_envelope(): + with pytest.raises(ValidationError): + handle_cloudwatch_logs(event={}, context=LambdaContext()) From b95a833d05e100160fdd93f649d8c17d77aeed17 Mon Sep 17 00:00:00 2001 From: Ran Isenberg Date: Tue, 1 Dec 2020 16:06:55 +0200 Subject: [PATCH 2/5] cr fixes --- .../utilities/parser/envelopes/__init__.py | 4 ++-- .../utilities/parser/envelopes/cloudwatch.py | 2 +- .../utilities/parser/models/cloudwatch.py | 7 ++++++- tests/functional/parser/test_cloudwatch.py | 2 +- 4 files changed, 10 insertions(+), 5 deletions(-) diff --git a/aws_lambda_powertools/utilities/parser/envelopes/__init__.py b/aws_lambda_powertools/utilities/parser/envelopes/__init__.py index dc1c51d71c2..3338dd661d0 100644 --- a/aws_lambda_powertools/utilities/parser/envelopes/__init__.py +++ b/aws_lambda_powertools/utilities/parser/envelopes/__init__.py @@ -1,12 +1,12 @@ from .base import BaseEnvelope -from .cloudwatch import CloudatchEnvelope +from .cloudwatch import CloudatchLogsEnvelope from .dynamodb import DynamoDBStreamEnvelope from .event_bridge import EventBridgeEnvelope from .sns import SnsEnvelope from .sqs import SqsEnvelope __all__ = [ - "CloudatchEnvelope", + "CloudatchLogsEnvelope", "DynamoDBStreamEnvelope", "EventBridgeEnvelope", "SnsEnvelope", diff --git a/aws_lambda_powertools/utilities/parser/envelopes/cloudwatch.py b/aws_lambda_powertools/utilities/parser/envelopes/cloudwatch.py index 74f9d6acc4e..163e32b837a 100644 --- a/aws_lambda_powertools/utilities/parser/envelopes/cloudwatch.py +++ b/aws_lambda_powertools/utilities/parser/envelopes/cloudwatch.py @@ -8,7 +8,7 @@ logger = logging.getLogger(__name__) -class CloudatchEnvelope(BaseEnvelope): +class CloudatchLogsEnvelope(BaseEnvelope): """Cloudatch Envelope to extract a List of log records. The record's body parameter is a string (after being base64 decoded and gzipped), diff --git a/aws_lambda_powertools/utilities/parser/models/cloudwatch.py b/aws_lambda_powertools/utilities/parser/models/cloudwatch.py index d43cde4e793..26eeef5b56f 100644 --- a/aws_lambda_powertools/utilities/parser/models/cloudwatch.py +++ b/aws_lambda_powertools/utilities/parser/models/cloudwatch.py @@ -1,11 +1,14 @@ import base64 import json +import logging import zlib from datetime import datetime from typing import List from pydantic import BaseModel, Field, validator +logger = logging.getLogger(__name__) + class CloudWatchLogsLogEvent(BaseModel): id: str # noqa AA03 VNE003 @@ -28,9 +31,11 @@ class CloudWatchLogsData(BaseModel): @validator("decoded_data", pre=True) def prepare_data(cls, value): try: + logger.debug("Decoding base64 cloudwatch log data before parsing") payload = base64.b64decode(value) + logger.debug("Decompressing cloudwatch log data before parsing") uncompressed = zlib.decompress(payload, zlib.MAX_WBITS | 32) - return json.loads(uncompressed.decode("UTF-8")) + return json.loads(uncompressed.decode("utf-8")) except Exception: raise ValueError("unable to decompress data") diff --git a/tests/functional/parser/test_cloudwatch.py b/tests/functional/parser/test_cloudwatch.py index 786ebebb89b..c4083e42631 100644 --- a/tests/functional/parser/test_cloudwatch.py +++ b/tests/functional/parser/test_cloudwatch.py @@ -12,7 +12,7 @@ from tests.functional.parser.utils import load_event -@event_parser(model=MyCloudWatchBusiness, envelope=envelopes.CloudatchEnvelope) +@event_parser(model=MyCloudWatchBusiness, envelope=envelopes.CloudatchLogsEnvelope) def handle_cloudwatch_logs(event: List[MyCloudWatchBusiness], _: LambdaContext): assert len(event) == 1 log: MyCloudWatchBusiness = event[0] From 647f903b70667ee69c6ba9b26a0e3a152509c09c Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Fri, 4 Dec 2020 14:00:22 +0100 Subject: [PATCH 3/5] docs: add CW Logs as a supported model --- docs/content/utilities/parser.mdx | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/content/utilities/parser.mdx b/docs/content/utilities/parser.mdx index 0c571bc257a..91cf573bf77 100644 --- a/docs/content/utilities/parser.mdx +++ b/docs/content/utilities/parser.mdx @@ -156,6 +156,7 @@ Model name | Description **DynamoDBStreamModel** | Lambda Event Source payload for Amazon DynamoDB Streams **EventBridgeModel** | Lambda Event Source payload for Amazon EventBridge **SqsModel** | Lambda Event Source payload for Amazon SQS +**CloudwatchLogsModel** | Lambda Event Source payload for Amazon CloudWatch Logs You can extend them to include your own models, and yet have all other known fields parsed along the way. From d07db543d05b5e2ff54967848dfac630b0954a4a Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Fri, 4 Dec 2020 14:16:25 +0100 Subject: [PATCH 4/5] fix: cloudwatch logs envelope typo --- .../utilities/parser/envelopes/__init__.py | 4 ++-- .../utilities/parser/envelopes/cloudwatch.py | 6 +++--- tests/functional/parser/test_cloudwatch.py | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/aws_lambda_powertools/utilities/parser/envelopes/__init__.py b/aws_lambda_powertools/utilities/parser/envelopes/__init__.py index 3338dd661d0..8f68f0cca36 100644 --- a/aws_lambda_powertools/utilities/parser/envelopes/__init__.py +++ b/aws_lambda_powertools/utilities/parser/envelopes/__init__.py @@ -1,12 +1,12 @@ from .base import BaseEnvelope -from .cloudwatch import CloudatchLogsEnvelope +from .cloudwatch import CloudWatchLogsEnvelope from .dynamodb import DynamoDBStreamEnvelope from .event_bridge import EventBridgeEnvelope from .sns import SnsEnvelope from .sqs import SqsEnvelope __all__ = [ - "CloudatchLogsEnvelope", + "CloudWatchLogsEnvelope", "DynamoDBStreamEnvelope", "EventBridgeEnvelope", "SnsEnvelope", diff --git a/aws_lambda_powertools/utilities/parser/envelopes/cloudwatch.py b/aws_lambda_powertools/utilities/parser/envelopes/cloudwatch.py index 163e32b837a..e4ecdd8b5ac 100644 --- a/aws_lambda_powertools/utilities/parser/envelopes/cloudwatch.py +++ b/aws_lambda_powertools/utilities/parser/envelopes/cloudwatch.py @@ -8,10 +8,10 @@ logger = logging.getLogger(__name__) -class CloudatchLogsEnvelope(BaseEnvelope): - """Cloudatch Envelope to extract a List of log records. +class CloudWatchLogsEnvelope(BaseEnvelope): + """CloudWatch Envelope to extract a List of log records. - The record's body parameter is a string (after being base64 decoded and gzipped), + The record's body parameter is a string (after being base64 decoded and gzipped), though it can also be a JSON encoded string. Regardless of its type it'll be parsed into a BaseModel object. diff --git a/tests/functional/parser/test_cloudwatch.py b/tests/functional/parser/test_cloudwatch.py index c4083e42631..dd7fd503f39 100644 --- a/tests/functional/parser/test_cloudwatch.py +++ b/tests/functional/parser/test_cloudwatch.py @@ -12,7 +12,7 @@ from tests.functional.parser.utils import load_event -@event_parser(model=MyCloudWatchBusiness, envelope=envelopes.CloudatchLogsEnvelope) +@event_parser(model=MyCloudWatchBusiness, envelope=envelopes.CloudWatchLogsEnvelope) def handle_cloudwatch_logs(event: List[MyCloudWatchBusiness], _: LambdaContext): assert len(event) == 1 log: MyCloudWatchBusiness = event[0] From ccd2a006697de01ea5f41b910790b636682fa646 Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Fri, 4 Dec 2020 14:16:40 +0100 Subject: [PATCH 5/5] docs: add CW Logs as a supported envelope --- docs/content/utilities/parser.mdx | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/content/utilities/parser.mdx b/docs/content/utilities/parser.mdx index 9de8db35e0a..29f81336d5f 100644 --- a/docs/content/utilities/parser.mdx +++ b/docs/content/utilities/parser.mdx @@ -294,6 +294,7 @@ Envelope name | Behaviour | Return **DynamoDBStreamEnvelope** | 1. Parses data using `DynamoDBStreamModel`.
2. Parses records in `NewImage` and `OldImage` keys using your model.
3. Returns a list with a dictionary containing `NewImage` and `OldImage` keys | `List[Dict[str, Optional[Model]]]` **EventBridgeEnvelope** | 1. Parses data using `EventBridgeModel`.
2. Parses `detail` key using your model and returns it. | `Model` **SqsEnvelope** | 1. Parses data using `SqsModel`.
2. Parses records in `body` key using your model and return them in a list. | `List[Model]` +**CloudWatchLogsEnvelope** | 1. Parses data using `CloudwatchLogsModel` which will base64 decode and decompress it.
2. Parses records in `message` key using your model and return them in a list. | `List[Model]` ### Bringing your own envelope