diff --git a/aws_lambda_powertools/utilities/parser/envelopes/__init__.py b/aws_lambda_powertools/utilities/parser/envelopes/__init__.py index 8f68f0cca36..d9d820aede0 100644 --- a/aws_lambda_powertools/utilities/parser/envelopes/__init__.py +++ b/aws_lambda_powertools/utilities/parser/envelopes/__init__.py @@ -2,6 +2,7 @@ from .cloudwatch import CloudWatchLogsEnvelope from .dynamodb import DynamoDBStreamEnvelope from .event_bridge import EventBridgeEnvelope +from .kinesis import KinesisDataStreamEnvelope from .sns import SnsEnvelope from .sqs import SqsEnvelope @@ -9,6 +10,7 @@ "CloudWatchLogsEnvelope", "DynamoDBStreamEnvelope", "EventBridgeEnvelope", + "KinesisDataStreamEnvelope", "SnsEnvelope", "SqsEnvelope", "BaseEnvelope", diff --git a/aws_lambda_powertools/utilities/parser/envelopes/kinesis.py b/aws_lambda_powertools/utilities/parser/envelopes/kinesis.py new file mode 100644 index 00000000000..97ad7bffec7 --- /dev/null +++ b/aws_lambda_powertools/utilities/parser/envelopes/kinesis.py @@ -0,0 +1,43 @@ +import logging +from typing import Any, Dict, List, Optional, Union + +from ..models import KinesisDataStreamModel +from ..types import Model +from .base import BaseEnvelope + +logger = logging.getLogger(__name__) + + +class KinesisDataStreamEnvelope(BaseEnvelope): + """Kinesis Data Stream Envelope to extract array of Records + + The record's data parameter is a base64 encoded string which is parsed into a bytes array, + though it can also be a JSON encoded string. + Regardless of its type it'll be parsed into a BaseModel object. + + Note: Records will be parsed the same way so if model is str, + all items in the list will be parsed as str and npt as JSON (and vice versa) + """ + + def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Model) -> List[Optional[Model]]: + """Parses records found with model provided + + Parameters + ---------- + data : Dict + Lambda event to be parsed + model : Model + Data model provided to parse after extracting data using envelope + + Returns + ------- + List + List of records parsed with model provided + """ + logger.debug(f"Parsing incoming data with Kinesis model {KinesisDataStreamModel}") + parsed_envelope: KinesisDataStreamModel = KinesisDataStreamModel.parse_obj(data) + output = [] + logger.debug(f"Parsing Kinesis records in `body` with {model}") + for record in parsed_envelope.Records: + output.append(self._parse(data=record.kinesis.data.decode("utf-8"), model=model)) + return output diff --git a/aws_lambda_powertools/utilities/parser/models/__init__.py b/aws_lambda_powertools/utilities/parser/models/__init__.py index c6efec0643a..45230b8c79e 100644 --- a/aws_lambda_powertools/utilities/parser/models/__init__.py +++ b/aws_lambda_powertools/utilities/parser/models/__init__.py @@ -2,6 +2,7 @@ from .cloudwatch import CloudWatchLogsData, CloudWatchLogsDecode, CloudWatchLogsLogEvent, CloudWatchLogsModel from .dynamodb import DynamoDBStreamChangedRecordModel, DynamoDBStreamModel, DynamoDBStreamRecordModel from .event_bridge import EventBridgeModel +from .kinesis import KinesisDataStreamModel, KinesisDataStreamRecord, KinesisDataStreamRecordPayload from .s3 import S3Model, S3RecordModel from .ses import SesModel, SesRecordModel from .sns import SnsModel, SnsNotificationModel, SnsRecordModel @@ -19,6 +20,9 @@ "EventBridgeModel", "DynamoDBStreamChangedRecordModel", "DynamoDBStreamRecordModel", + "KinesisDataStreamModel", + "KinesisDataStreamRecord", + "KinesisDataStreamRecordPayload", "S3Model", "S3RecordModel", "SesModel", diff --git a/aws_lambda_powertools/utilities/parser/models/kinesis.py b/aws_lambda_powertools/utilities/parser/models/kinesis.py new file mode 100644 index 00000000000..d2852e9f4a8 --- /dev/null +++ b/aws_lambda_powertools/utilities/parser/models/kinesis.py @@ -0,0 +1,41 @@ +import base64 +import logging +from binascii import Error as BinAsciiError +from typing import List + +from pydantic import BaseModel, validator +from pydantic.types import PositiveInt +from typing_extensions import Literal + +logger = logging.getLogger(__name__) + + +class KinesisDataStreamRecordPayload(BaseModel): + kinesisSchemaVersion: str + partitionKey: str + sequenceNumber: PositiveInt + data: bytes # base64 encoded str is parsed into bytes + approximateArrivalTimestamp: float + + @validator("data", pre=True) + def data_base64_decode(cls, value): + try: + logger.debug("Decoding base64 Kinesis data record before parsing") + return base64.b64decode(value) + except (BinAsciiError, TypeError): + raise ValueError("base64 decode failed") + + +class KinesisDataStreamRecord(BaseModel): + eventSource: Literal["aws:kinesis"] + eventVersion: str + eventID: str + eventName: Literal["aws:kinesis:record"] + invokeIdentityArn: str + awsRegion: str + eventSourceARN: str + kinesis: KinesisDataStreamRecordPayload + + +class KinesisDataStreamModel(BaseModel): + Records: List[KinesisDataStreamRecord] diff --git a/docs/content/utilities/parser.mdx b/docs/content/utilities/parser.mdx index b620f0eb18f..9b7457ef21a 100644 --- a/docs/content/utilities/parser.mdx +++ b/docs/content/utilities/parser.mdx @@ -159,6 +159,7 @@ Model name | Description **AlbModel** | Lambda Event Source payload for Amazon Application Load Balancer **CloudwatchLogsModel** | Lambda Event Source payload for Amazon CloudWatch Logs **S3Model** | Lambda Event Source payload for Amazon S3 +**KinesisDataStreamModel** | Lambda Event Source payload for Amazon Kinesis Data Streams You can extend them to include your own models, and yet have all other known fields parsed along the way. @@ -296,6 +297,7 @@ Envelope name | Behaviour | Return **EventBridgeEnvelope** | 1. Parses data using `EventBridgeModel`.
2. Parses `detail` key using your model and returns it. | `Model` **SqsEnvelope** | 1. Parses data using `SqsModel`.
2. Parses records in `body` key using your model and return them in a list. | `List[Model]` **CloudWatchLogsEnvelope** | 1. Parses data using `CloudwatchLogsModel` which will base64 decode and decompress it.
2. Parses records in `message` key using your model and return them in a list. | `List[Model]` +**KinesisDataStreamEnvelope** | 1. Parses data using `KinesisDataStreamModel` which will base64 decode it.
2. Parses records in in `Records` key using your model and returns them in a list. | `List[Model]` ### Bringing your own envelope diff --git a/tests/functional/parser/schemas.py b/tests/functional/parser/schemas.py index 23614a138b4..f3aa39200a9 100644 --- a/tests/functional/parser/schemas.py +++ b/tests/functional/parser/schemas.py @@ -73,6 +73,11 @@ class MyAdvancedSnsBusiness(SnsModel): Records: List[MyAdvancedSnsRecordModel] +class MyKinesisBusiness(BaseModel): + message: str + username: str + + class MyCloudWatchBusiness(BaseModel): my_message: str user: str diff --git a/tests/functional/parser/test_kinesis.py b/tests/functional/parser/test_kinesis.py new file mode 100644 index 00000000000..5a7a94e0dac --- /dev/null +++ b/tests/functional/parser/test_kinesis.py @@ -0,0 +1,106 @@ +from typing import Any, List + +import pytest + +from aws_lambda_powertools.utilities.parser import ValidationError, envelopes, event_parser +from aws_lambda_powertools.utilities.parser.models import KinesisDataStreamModel, KinesisDataStreamRecordPayload +from aws_lambda_powertools.utilities.typing import LambdaContext +from tests.functional.parser.schemas import MyKinesisBusiness +from tests.functional.parser.utils import load_event + + +@event_parser(model=MyKinesisBusiness, envelope=envelopes.KinesisDataStreamEnvelope) +def handle_kinesis(event: List[MyKinesisBusiness], _: LambdaContext): + assert len(event) == 1 + record: KinesisDataStreamModel = event[0] + assert record.message == "test message" + assert record.username == "test" + + +@event_parser(model=KinesisDataStreamModel) +def handle_kinesis_no_envelope(event: KinesisDataStreamModel, _: LambdaContext): + records = event.Records + assert len(records) == 2 + record: KinesisDataStreamModel = records[0] + + assert record.awsRegion == "us-east-2" + assert record.eventID == "shardId-000000000006:49590338271490256608559692538361571095921575989136588898" + assert record.eventName == "aws:kinesis:record" + assert record.eventSource == "aws:kinesis" + assert record.eventSourceARN == "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" + assert record.eventVersion == "1.0" + assert record.invokeIdentityArn == "arn:aws:iam::123456789012:role/lambda-role" + + kinesis: KinesisDataStreamRecordPayload = record.kinesis + assert kinesis.approximateArrivalTimestamp == 1545084650.987 + assert kinesis.kinesisSchemaVersion == "1.0" + assert kinesis.partitionKey == "1" + assert kinesis.sequenceNumber == 49590338271490256608559692538361571095921575989136588898 + assert kinesis.data == b"Hello, this is a test." + + +def test_kinesis_trigger_event(): + event_dict = { + "Records": [ + { + "kinesis": { + "kinesisSchemaVersion": "1.0", + "partitionKey": "1", + "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", + "data": "eyJtZXNzYWdlIjogInRlc3QgbWVzc2FnZSIsICJ1c2VybmFtZSI6ICJ0ZXN0In0=", + "approximateArrivalTimestamp": 1545084650.987, + }, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", + "awsRegion": "us-east-2", + "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream", + } + ] + } + + handle_kinesis(event_dict, LambdaContext()) + + +def test_kinesis_trigger_bad_base64_event(): + event_dict = { + "Records": [ + { + "kinesis": { + "kinesisSchemaVersion": "1.0", + "partitionKey": "1", + "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", + "data": "bad", + "approximateArrivalTimestamp": 1545084650.987, + }, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", + "awsRegion": "us-east-2", + "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream", + } + ] + } + with pytest.raises(ValidationError): + handle_kinesis_no_envelope(event_dict, LambdaContext()) + + +def test_kinesis_trigger_event_no_envelope(): + event_dict = load_event("kinesisStreamEvent.json") + handle_kinesis_no_envelope(event_dict, LambdaContext()) + + +def test_validate_event_does_not_conform_with_model_no_envelope(): + event_dict: Any = {"hello": "s"} + with pytest.raises(ValidationError): + handle_kinesis_no_envelope(event_dict, LambdaContext()) + + +def test_validate_event_does_not_conform_with_model(): + event_dict: Any = {"hello": "s"} + with pytest.raises(ValidationError): + handle_kinesis(event_dict, LambdaContext())