Skip to content

Commit ab47c92

Browse files
feat(parser): add support for parsing SQS events wrapped in Kinesis Firehose (#2294)
1 parent 5a4aa44 commit ab47c92

File tree

5 files changed

+69
-0
lines changed

5 files changed

+69
-0
lines changed

Diff for: aws_lambda_powertools/utilities/parser/models/__init__.py

+3
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
KinesisFirehoseRecord,
4343
KinesisFirehoseRecordMetadata,
4444
)
45+
from .kinesis_firehose_sqs import KinesisFirehoseSqsModel, KinesisFirehoseSqsRecord
4546
from .lambda_function_url import LambdaFunctionUrlModel
4647
from .s3 import (
4748
S3EventNotificationEventBridgeDetailModel,
@@ -144,4 +145,6 @@
144145
"KafkaRecordModel",
145146
"KafkaMskEventModel",
146147
"KafkaBaseEventModel",
148+
"KinesisFirehoseSqsModel",
149+
"KinesisFirehoseSqsRecord",
147150
]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import json
2+
from typing import List, Optional
3+
4+
from pydantic import BaseModel, PositiveInt, validator
5+
6+
from aws_lambda_powertools.shared.functions import base64_decode
7+
from aws_lambda_powertools.utilities.parser.models import KinesisFirehoseRecordMetadata
8+
9+
from .sqs import SqsRecordModel
10+
11+
12+
class KinesisFirehoseSqsRecord(BaseModel):
13+
data: SqsRecordModel
14+
recordId: str
15+
approximateArrivalTimestamp: PositiveInt
16+
kinesisRecordMetadata: Optional[KinesisFirehoseRecordMetadata]
17+
18+
@validator("data", pre=True, allow_reuse=True)
19+
def data_base64_decode(cls, value):
20+
# Firehose payload is encoded
21+
return json.loads(base64_decode(value))
22+
23+
24+
class KinesisFirehoseSqsModel(BaseModel):
25+
invocationId: str
26+
deliveryStreamArn: str
27+
region: str
28+
sourceKinesisStreamArn: Optional[str]
29+
records: List[KinesisFirehoseSqsRecord]

Diff for: docs/utilities/parser.md

+1
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ Parser comes with the following built-in models:
168168
| **KafkaSelfManagedEventModel** | Lambda Event Source payload for self managed Kafka payload |
169169
| **KinesisDataStreamModel** | Lambda Event Source payload for Amazon Kinesis Data Streams |
170170
| **KinesisFirehoseModel** | Lambda Event Source payload for Amazon Kinesis Firehose |
171+
| **KinesisFirehoseSqsModel** | Lambda Event Source payload for SQS messages wrapped in Kinesis Firehose records |
171172
| **LambdaFunctionUrlModel** | Lambda Event Source payload for Lambda Function URL payload |
172173
| **S3EventNotificationEventBridgeModel** | Lambda Event Source payload for Amazon S3 Event Notification to EventBridge. |
173174
| **S3Model** | Lambda Event Source payload for Amazon S3 |

Diff for: tests/events/kinesisFirehoseSQSEvent.json

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
{
2+
"invocationId": "556b67a3-48fc-4385-af49-e133aade9cb9",
3+
"deliveryStreamArn": "arn:aws:firehose:us-east-1:123456789012:deliverystream/PUT-S3-tdyyE",
4+
"region": "us-east-1",
5+
"records": [
6+
{
7+
"recordId": "49640912821178817833517986466168945147170627572855734274000000",
8+
"approximateArrivalTimestamp": 1684864917398,
9+
"data": "eyJtZXNzYWdlSWQiOiI1YWI4MDdkNC01NjQ0LTRjNTUtOTdhMy00NzM5NjYzNWFjNzQiLCJyZWNlaXB0SGFuZGxlIjoiQVFFQndKbkt5ckhpZ1VNWmo2cllpZ0NneGxhUzNTTHkwYS4uLiIsImJvZHkiOiJUZXN0IG1lc3NhZ2UuIiwiYXR0cmlidXRlcyI6eyJBcHByb3hpbWF0ZVJlY2VpdmVDb3VudCI6IjEiLCJTZW50VGltZXN0YW1wIjoiMTY4NDg2NDg1MjQ5MSIsIlNlbmRlcklkIjoiQUlEQUlFTlFaSk9MTzIzWVZKNFZPIiwiQXBwcm94aW1hdGVGaXJzdFJlY2VpdmVUaW1lc3RhbXAiOiIxNjg0ODY0ODcyNDkxIn0sIm1lc3NhZ2VBdHRyaWJ1dGVzIjp7fSwibWQ1T2ZNZXNzYWdlQXR0cmlidXRlcyI6bnVsbCwibWQ1T2ZCb2R5IjoiYzhiNmJjNjBjOGI4YjNhOTA0ZTQ1YzFmYWJkZjUyM2QiLCJldmVudFNvdXJjZSI6ImF3czpzcXMiLCJldmVudFNvdXJjZUFSTiI6ImFybjphd3M6c3FzOnVzLWVhc3QtMToyMDA5ODQxMTIzODY6U05TIiwiYXdzUmVnaW9uIjoidXMtZWFzdC0xIn0K"
10+
}
11+
]
12+
}

Diff for: tests/functional/parser/test_kinesis_firehose.py

+24
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
KinesisFirehoseModel,
1212
KinesisFirehoseRecord,
1313
KinesisFirehoseRecordMetadata,
14+
KinesisFirehoseSqsModel,
15+
KinesisFirehoseSqsRecord,
1416
)
1517
from aws_lambda_powertools.utilities.typing import LambdaContext
1618
from tests.functional.parser.schemas import MyKinesisFirehoseBusiness
@@ -77,6 +79,28 @@ def handle_firehose_no_envelope_put(event: KinesisFirehoseModel, _: LambdaContex
7779
assert record_02.data == b'{"Hello": "World"}'
7880

7981

82+
@event_parser(model=KinesisFirehoseSqsModel)
83+
def handle_firehose_sqs_wrapped_message(event: KinesisFirehoseSqsModel, _: LambdaContext):
84+
assert event.region == "us-east-1"
85+
assert event.invocationId == "556b67a3-48fc-4385-af49-e133aade9cb9"
86+
assert event.deliveryStreamArn == "arn:aws:firehose:us-east-1:123456789012:deliverystream/PUT-S3-tdyyE"
87+
88+
records = list(event.records)
89+
assert len(records) == 1
90+
91+
record_01: KinesisFirehoseSqsRecord = records[0]
92+
assert record_01.data.messageId == "5ab807d4-5644-4c55-97a3-47396635ac74"
93+
assert record_01.data.receiptHandle == "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a..."
94+
assert record_01.data.body == "Test message."
95+
assert record_01.data.attributes.ApproximateReceiveCount == "1"
96+
assert record_01.data.attributes.SenderId == "AIDAIENQZJOLO23YVJ4VO"
97+
98+
99+
def test_firehose_sqs_wrapped_message_event():
100+
event_dict = load_event("kinesisFirehoseSQSEvent.json")
101+
handle_firehose_sqs_wrapped_message(event_dict, LambdaContext())
102+
103+
80104
def test_firehose_trigger_event():
81105
event_dict = load_event("kinesisFirehoseKinesisEvent.json")
82106
event_dict["records"].pop(0) # remove first item since the payload is bytes and we want to test payload json class

0 commit comments

Comments
 (0)