Skip to content

feat(parser): add support for parsing SQS events wrapped in Kinesis Firehose #2294

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions aws_lambda_powertools/utilities/parser/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
KinesisFirehoseRecord,
KinesisFirehoseRecordMetadata,
)
from .kinesis_firehose_sqs import KinesisFirehoseSqsModel, KinesisFirehoseSqsRecord
from .lambda_function_url import LambdaFunctionUrlModel
from .s3 import (
S3EventNotificationEventBridgeDetailModel,
Expand Down Expand Up @@ -144,4 +145,6 @@
"KafkaRecordModel",
"KafkaMskEventModel",
"KafkaBaseEventModel",
"KinesisFirehoseSqsModel",
"KinesisFirehoseSqsRecord",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import json
from typing import List, Optional

from pydantic import BaseModel, PositiveInt, validator

from aws_lambda_powertools.shared.functions import base64_decode
from aws_lambda_powertools.utilities.parser.models import KinesisFirehoseRecordMetadata

from .sqs import SqsRecordModel


class KinesisFirehoseSqsRecord(BaseModel):
data: SqsRecordModel
recordId: str
approximateArrivalTimestamp: PositiveInt
kinesisRecordMetadata: Optional[KinesisFirehoseRecordMetadata]

@validator("data", pre=True, allow_reuse=True)
def data_base64_decode(cls, value):
# Firehose payload is encoded twice
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😱

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ruben, I will validate this one more time before merging. But scare me too.

return json.loads(base64_decode(base64_decode(value)))


class KinesisFirehoseSqsModel(BaseModel):
invocationId: str
deliveryStreamArn: str
region: str
sourceKinesisStreamArn: Optional[str]
records: List[KinesisFirehoseSqsRecord]
1 change: 1 addition & 0 deletions docs/utilities/parser.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ Parser comes with the following built-in models:
| **KafkaSelfManagedEventModel** | Lambda Event Source payload for self managed Kafka payload |
| **KinesisDataStreamModel** | Lambda Event Source payload for Amazon Kinesis Data Streams |
| **KinesisFirehoseModel** | Lambda Event Source payload for Amazon Kinesis Firehose |
| **KinesisFirehoseSqsModel** | Lambda Event Source payload for SQS messages wrapped in Kinesis Firehose records |
| **LambdaFunctionUrlModel** | Lambda Event Source payload for Lambda Function URL payload |
| **S3EventNotificationEventBridgeModel** | Lambda Event Source payload for Amazon S3 Event Notification to EventBridge. |
| **S3Model** | Lambda Event Source payload for Amazon S3 |
Expand Down
12 changes: 12 additions & 0 deletions tests/events/kinesisFirehoseSQSEvent.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"invocationId": "556b67a3-48fc-4385-af49-e133aade9cb9",
"deliveryStreamArn": "arn:aws:firehose:us-east-1:123456789012:deliverystream/PUT-S3-tdyyE",
"region": "us-east-1",
"records": [
{
"recordId": "49640890555757999697367295315052682717311106012636250114000000",
"approximateArrivalTimestamp": 1684454259637,
"data": "ZXdvZ0lDQWdJQ0FpYldWemMyRm5aVWxrSWpvZ0lqQTFPV1l6Tm1JMExUZzNZVE10TkRSaFlpMDRNMlF5TFRZMk1UazNOVGd6TUdFM1pDSXNDaUFnSUNBZ0lDSnlaV05sYVhCMFNHRnVaR3hsSWpvZ0lrRlJSVUozU201TGVYSklhV2RWVFZwcU5uSlphV2REWjNoc1lWTXpVMHg1TUdFdUxpNGlMQW9nSUNBZ0lDQWlZbTlrZVNJNklDSlVaWE4wSUcxbGMzTmhaMlV1SWl3S0lDQWdJQ0FnSW1GMGRISnBZblYwWlhNaU9pQjdDaUFnSUNBZ0lDQWdJa0Z3Y0hKdmVHbHRZWFJsVW1WalpXbDJaVU52ZFc1MElqb2dJakVpTEFvZ0lDQWdJQ0FnSUNKVFpXNTBWR2x0WlhOMFlXMXdJam9nSWpFMU5EVXdPREkyTkRreE9ETWlMQW9nSUNBZ0lDQWdJQ0pUWlc1a1pYSkpaQ0k2SUNKQlNVUkJTVVZPVVZwS1QweFBNak5aVmtvMFZrOGlMQW9nSUNBZ0lDQWdJQ0pCY0hCeWIzaHBiV0YwWlVacGNuTjBVbVZqWldsMlpWUnBiV1Z6ZEdGdGNDSTZJQ0l4TlRRMU1EZ3lOalE1TVRnMUlnb2dJQ0FnSUNCOUxBb2dJQ0FnSUNBaWJXVnpjMkZuWlVGMGRISnBZblYwWlhNaU9pQjdDaUFnSUNBZ0lDQWdJblJsYzNSQmRIUnlJam9nZXdvZ0lDQWdJQ0FnSUNBZ0luTjBjbWx1WjFaaGJIVmxJam9nSWpFd01DSXNDaUFnSUNBZ0lDQWdJQ0FpWW1sdVlYSjVWbUZzZFdVaU9pQWlZbUZ6WlRZMFUzUnlJaXdLSUNBZ0lDQWdJQ0FnSUNKa1lYUmhWSGx3WlNJNklDSk9kVzFpWlhJaUNpQWdJQ0FnSUNBZ2ZRb2dJQ0FnSUNCOUxBb2dJQ0FnSUNBaWJXUTFUMlpDYjJSNUlqb2dJbVUwWlRZNFptSTNZbVF3WlRZNU4yRXdZV1U0WmpGaVlqTTBNamcwTm1Jeklpd0tJQ0FnSUNBZ0ltVjJaVzUwVTI5MWNtTmxJam9nSW1GM2N6cHpjWE1pTEFvZ0lDQWdJQ0FpWlhabGJuUlRiM1Z5WTJWQlVrNGlPaUFpWVhKdU9tRjNjenB6Y1hNNmRYTXRaV0Z6ZEMweU9qRXlNelExTmpjNE9UQXhNanB0ZVMxeGRXVjFaU0lzQ2lBZ0lDQWdJQ0poZDNOU1pXZHBiMjRpT2lBaWRYTXRaV0Z6ZEMweUlnb2dJQ0FnZlFvPQ=="
}
]
}
24 changes: 24 additions & 0 deletions tests/functional/parser/test_kinesis_firehose.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
KinesisFirehoseModel,
KinesisFirehoseRecord,
KinesisFirehoseRecordMetadata,
KinesisFirehoseSqsModel,
KinesisFirehoseSqsRecord,
)
from aws_lambda_powertools.utilities.typing import LambdaContext
from tests.functional.parser.schemas import MyKinesisFirehoseBusiness
Expand Down Expand Up @@ -77,6 +79,28 @@ def handle_firehose_no_envelope_put(event: KinesisFirehoseModel, _: LambdaContex
assert record_02.data == b'{"Hello": "World"}'


@event_parser(model=KinesisFirehoseSqsModel)
def handle_firehose_sqs_wrapped_message(event: KinesisFirehoseSqsModel, _: LambdaContext):
assert event.region == "us-east-1"
assert event.invocationId == "556b67a3-48fc-4385-af49-e133aade9cb9"
assert event.deliveryStreamArn == "arn:aws:firehose:us-east-1:123456789012:deliverystream/PUT-S3-tdyyE"

records = list(event.records)
assert len(records) == 1

record_01: KinesisFirehoseSqsRecord = records[0]
assert record_01.data.messageId == "059f36b4-87a3-44ab-83d2-661975830a7d"
assert record_01.data.receiptHandle == "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a..."
assert record_01.data.body == "Test message."
assert record_01.data.attributes.ApproximateReceiveCount == "1"
assert record_01.data.attributes.SenderId == "AIDAIENQZJOLO23YVJ4VO"


def test_firehose_sqs_wrapped_message_event():
event_dict = load_event("kinesisFirehoseSQSEvent.json")
handle_firehose_sqs_wrapped_message(event_dict, LambdaContext())


def test_firehose_trigger_event():
event_dict = load_event("kinesisFirehoseKinesisEvent.json")
event_dict["records"].pop(0) # remove first item since the payload is bytes and we want to test payload json class
Expand Down