diff --git a/aws_lambda_powertools/utilities/parser/models/__init__.py b/aws_lambda_powertools/utilities/parser/models/__init__.py index 5f7a8a6b550..c2385b7bf14 100644 --- a/aws_lambda_powertools/utilities/parser/models/__init__.py +++ b/aws_lambda_powertools/utilities/parser/models/__init__.py @@ -42,6 +42,7 @@ KinesisFirehoseRecord, KinesisFirehoseRecordMetadata, ) +from .kinesis_firehose_sqs import KinesisFirehoseSqsModel, KinesisFirehoseSqsRecord from .lambda_function_url import LambdaFunctionUrlModel from .s3 import ( S3EventNotificationEventBridgeDetailModel, @@ -144,4 +145,6 @@ "KafkaRecordModel", "KafkaMskEventModel", "KafkaBaseEventModel", + "KinesisFirehoseSqsModel", + "KinesisFirehoseSqsRecord", ] diff --git a/aws_lambda_powertools/utilities/parser/models/kinesis_firehose_sqs.py b/aws_lambda_powertools/utilities/parser/models/kinesis_firehose_sqs.py new file mode 100644 index 00000000000..b649828853b --- /dev/null +++ b/aws_lambda_powertools/utilities/parser/models/kinesis_firehose_sqs.py @@ -0,0 +1,29 @@ +import json +from typing import List, Optional + +from pydantic import BaseModel, PositiveInt, validator + +from aws_lambda_powertools.shared.functions import base64_decode +from aws_lambda_powertools.utilities.parser.models import KinesisFirehoseRecordMetadata + +from .sqs import SqsRecordModel + + +class KinesisFirehoseSqsRecord(BaseModel): + data: SqsRecordModel + recordId: str + approximateArrivalTimestamp: PositiveInt + kinesisRecordMetadata: Optional[KinesisFirehoseRecordMetadata] + + @validator("data", pre=True, allow_reuse=True) + def data_base64_decode(cls, value): + # Firehose payload is encoded + return json.loads(base64_decode(value)) + + +class KinesisFirehoseSqsModel(BaseModel): + invocationId: str + deliveryStreamArn: str + region: str + sourceKinesisStreamArn: Optional[str] + records: List[KinesisFirehoseSqsRecord] diff --git a/docs/utilities/parser.md b/docs/utilities/parser.md index 38e12c0792d..6607e7b07b0 100644 --- a/docs/utilities/parser.md +++ b/docs/utilities/parser.md @@ -168,6 +168,7 @@ Parser comes with the following built-in models: | **KafkaSelfManagedEventModel** | Lambda Event Source payload for self managed Kafka payload | | **KinesisDataStreamModel** | Lambda Event Source payload for Amazon Kinesis Data Streams | | **KinesisFirehoseModel** | Lambda Event Source payload for Amazon Kinesis Firehose | +| **KinesisFirehoseSqsModel** | Lambda Event Source payload for SQS messages wrapped in Kinesis Firehose records | | **LambdaFunctionUrlModel** | Lambda Event Source payload for Lambda Function URL payload | | **S3EventNotificationEventBridgeModel** | Lambda Event Source payload for Amazon S3 Event Notification to EventBridge. | | **S3Model** | Lambda Event Source payload for Amazon S3 | diff --git a/tests/events/kinesisFirehoseSQSEvent.json b/tests/events/kinesisFirehoseSQSEvent.json new file mode 100644 index 00000000000..bea267c4206 --- /dev/null +++ b/tests/events/kinesisFirehoseSQSEvent.json @@ -0,0 +1,12 @@ +{ + "invocationId": "556b67a3-48fc-4385-af49-e133aade9cb9", + "deliveryStreamArn": "arn:aws:firehose:us-east-1:123456789012:deliverystream/PUT-S3-tdyyE", + "region": "us-east-1", + "records": [ + { + "recordId": "49640912821178817833517986466168945147170627572855734274000000", + "approximateArrivalTimestamp": 1684864917398, + "data": "eyJtZXNzYWdlSWQiOiI1YWI4MDdkNC01NjQ0LTRjNTUtOTdhMy00NzM5NjYzNWFjNzQiLCJyZWNlaXB0SGFuZGxlIjoiQVFFQndKbkt5ckhpZ1VNWmo2cllpZ0NneGxhUzNTTHkwYS4uLiIsImJvZHkiOiJUZXN0IG1lc3NhZ2UuIiwiYXR0cmlidXRlcyI6eyJBcHByb3hpbWF0ZVJlY2VpdmVDb3VudCI6IjEiLCJTZW50VGltZXN0YW1wIjoiMTY4NDg2NDg1MjQ5MSIsIlNlbmRlcklkIjoiQUlEQUlFTlFaSk9MTzIzWVZKNFZPIiwiQXBwcm94aW1hdGVGaXJzdFJlY2VpdmVUaW1lc3RhbXAiOiIxNjg0ODY0ODcyNDkxIn0sIm1lc3NhZ2VBdHRyaWJ1dGVzIjp7fSwibWQ1T2ZNZXNzYWdlQXR0cmlidXRlcyI6bnVsbCwibWQ1T2ZCb2R5IjoiYzhiNmJjNjBjOGI4YjNhOTA0ZTQ1YzFmYWJkZjUyM2QiLCJldmVudFNvdXJjZSI6ImF3czpzcXMiLCJldmVudFNvdXJjZUFSTiI6ImFybjphd3M6c3FzOnVzLWVhc3QtMToyMDA5ODQxMTIzODY6U05TIiwiYXdzUmVnaW9uIjoidXMtZWFzdC0xIn0K" + } + ] +} diff --git a/tests/functional/parser/test_kinesis_firehose.py b/tests/functional/parser/test_kinesis_firehose.py index 59bbd2f4e18..c0b71f80540 100644 --- a/tests/functional/parser/test_kinesis_firehose.py +++ b/tests/functional/parser/test_kinesis_firehose.py @@ -11,6 +11,8 @@ KinesisFirehoseModel, KinesisFirehoseRecord, KinesisFirehoseRecordMetadata, + KinesisFirehoseSqsModel, + KinesisFirehoseSqsRecord, ) from aws_lambda_powertools.utilities.typing import LambdaContext from tests.functional.parser.schemas import MyKinesisFirehoseBusiness @@ -77,6 +79,28 @@ def handle_firehose_no_envelope_put(event: KinesisFirehoseModel, _: LambdaContex assert record_02.data == b'{"Hello": "World"}' +@event_parser(model=KinesisFirehoseSqsModel) +def handle_firehose_sqs_wrapped_message(event: KinesisFirehoseSqsModel, _: LambdaContext): + assert event.region == "us-east-1" + assert event.invocationId == "556b67a3-48fc-4385-af49-e133aade9cb9" + assert event.deliveryStreamArn == "arn:aws:firehose:us-east-1:123456789012:deliverystream/PUT-S3-tdyyE" + + records = list(event.records) + assert len(records) == 1 + + record_01: KinesisFirehoseSqsRecord = records[0] + assert record_01.data.messageId == "5ab807d4-5644-4c55-97a3-47396635ac74" + assert record_01.data.receiptHandle == "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a..." + assert record_01.data.body == "Test message." + assert record_01.data.attributes.ApproximateReceiveCount == "1" + assert record_01.data.attributes.SenderId == "AIDAIENQZJOLO23YVJ4VO" + + +def test_firehose_sqs_wrapped_message_event(): + event_dict = load_event("kinesisFirehoseSQSEvent.json") + handle_firehose_sqs_wrapped_message(event_dict, LambdaContext()) + + def test_firehose_trigger_event(): event_dict = load_event("kinesisFirehoseKinesisEvent.json") event_dict["records"].pop(0) # remove first item since the payload is bytes and we want to test payload json class