diff --git a/aws_lambda_powertools/shared/functions.py b/aws_lambda_powertools/shared/functions.py index 2212eb77e18..30070382d31 100644 --- a/aws_lambda_powertools/shared/functions.py +++ b/aws_lambda_powertools/shared/functions.py @@ -71,7 +71,7 @@ def resolve_env_var_choice( def base64_decode(value: str) -> bytes: try: - logger.debug("Decoding base64 Kafka record item before parsing") + logger.debug("Decoding base64 record item before parsing") return base64.b64decode(value) except (BinAsciiError, TypeError): raise ValueError("base64 decode failed") diff --git a/aws_lambda_powertools/utilities/parser/envelopes/__init__.py b/aws_lambda_powertools/utilities/parser/envelopes/__init__.py index 4b0e4c943a2..0f985f29d88 100644 --- a/aws_lambda_powertools/utilities/parser/envelopes/__init__.py +++ b/aws_lambda_powertools/utilities/parser/envelopes/__init__.py @@ -6,6 +6,7 @@ from .event_bridge import EventBridgeEnvelope from .kafka import KafkaEnvelope from .kinesis import KinesisDataStreamEnvelope +from .kinesis_firehose import KinesisFirehoseEnvelope from .lambda_function_url import LambdaFunctionUrlEnvelope from .sns import SnsEnvelope, SnsSqsEnvelope from .sqs import SqsEnvelope @@ -17,6 +18,7 @@ "DynamoDBStreamEnvelope", "EventBridgeEnvelope", "KinesisDataStreamEnvelope", + "KinesisFirehoseEnvelope", "LambdaFunctionUrlEnvelope", "SnsEnvelope", "SnsSqsEnvelope", diff --git a/aws_lambda_powertools/utilities/parser/envelopes/kinesis.py b/aws_lambda_powertools/utilities/parser/envelopes/kinesis.py index 9ff221a7b7b..24104ebd40c 100644 --- a/aws_lambda_powertools/utilities/parser/envelopes/kinesis.py +++ b/aws_lambda_powertools/utilities/parser/envelopes/kinesis.py @@ -16,7 +16,7 @@ class KinesisDataStreamEnvelope(BaseEnvelope): Regardless of its type it'll be parsed into a BaseModel object. Note: Records will be parsed the same way so if model is str, - all items in the list will be parsed as str and npt as JSON (and vice versa) + all items in the list will be parsed as str and not as JSON (and vice versa) """ def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]: diff --git a/aws_lambda_powertools/utilities/parser/envelopes/kinesis_firehose.py b/aws_lambda_powertools/utilities/parser/envelopes/kinesis_firehose.py new file mode 100644 index 00000000000..c8dd936512c --- /dev/null +++ b/aws_lambda_powertools/utilities/parser/envelopes/kinesis_firehose.py @@ -0,0 +1,47 @@ +import logging +from typing import Any, Dict, List, Optional, Type, Union, cast + +from ..models import KinesisFirehoseModel +from ..types import Model +from .base import BaseEnvelope + +logger = logging.getLogger(__name__) + + +class KinesisFirehoseEnvelope(BaseEnvelope): + """Kinesis Firehose Envelope to extract array of Records + + The record's data parameter is a base64 encoded string which is parsed into a bytes array, + though it can also be a JSON encoded string. + Regardless of its type it'll be parsed into a BaseModel object. + + Note: Records will be parsed the same way so if model is str, + all items in the list will be parsed as str and not as JSON (and vice versa) + + https://docs.aws.amazon.com/lambda/latest/dg/services-kinesisfirehose.html + """ + + def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]: + """Parses records found with model provided + + Parameters + ---------- + data : Dict + Lambda event to be parsed + model : Type[Model] + Data model provided to parse after extracting data using envelope + + Returns + ------- + List + List of records parsed with model provided + """ + logger.debug(f"Parsing incoming data with Kinesis Firehose model {KinesisFirehoseModel}") + parsed_envelope: KinesisFirehoseModel = KinesisFirehoseModel.parse_obj(data) + logger.debug(f"Parsing Kinesis Firehose records in `body` with {model}") + models = [] + for record in parsed_envelope.records: + # We allow either AWS expected contract (bytes) or a custom Model, see #943 + data = cast(bytes, record.data) + models.append(self._parse(data=data.decode("utf-8"), model=model)) + return models diff --git a/aws_lambda_powertools/utilities/parser/models/__init__.py b/aws_lambda_powertools/utilities/parser/models/__init__.py index 52059cb9ee7..62e28a62374 100644 --- a/aws_lambda_powertools/utilities/parser/models/__init__.py +++ b/aws_lambda_powertools/utilities/parser/models/__init__.py @@ -37,6 +37,11 @@ KinesisDataStreamRecord, KinesisDataStreamRecordPayload, ) +from .kinesis_firehose import ( + KinesisFirehoseModel, + KinesisFirehoseRecord, + KinesisFirehoseRecordMetadata, +) from .lambda_function_url import LambdaFunctionUrlModel from .s3 import S3Model, S3RecordModel from .s3_object_event import ( @@ -86,6 +91,9 @@ "KinesisDataStreamModel", "KinesisDataStreamRecord", "KinesisDataStreamRecordPayload", + "KinesisFirehoseModel", + "KinesisFirehoseRecord", + "KinesisFirehoseRecordMetadata", "LambdaFunctionUrlModel", "S3Model", "S3RecordModel", diff --git a/aws_lambda_powertools/utilities/parser/models/kinesis.py b/aws_lambda_powertools/utilities/parser/models/kinesis.py index be868ca44ba..ffc89bcbdaa 100644 --- a/aws_lambda_powertools/utilities/parser/models/kinesis.py +++ b/aws_lambda_powertools/utilities/parser/models/kinesis.py @@ -1,14 +1,10 @@ -import base64 -import logging -from binascii import Error as BinAsciiError from typing import List, Type, Union from pydantic import BaseModel, validator +from aws_lambda_powertools.shared.functions import base64_decode from aws_lambda_powertools.utilities.parser.types import Literal -logger = logging.getLogger(__name__) - class KinesisDataStreamRecordPayload(BaseModel): kinesisSchemaVersion: str @@ -19,11 +15,7 @@ class KinesisDataStreamRecordPayload(BaseModel): @validator("data", pre=True, allow_reuse=True) def data_base64_decode(cls, value): - try: - logger.debug("Decoding base64 Kinesis data record before parsing") - return base64.b64decode(value) - except (BinAsciiError, TypeError): - raise ValueError("base64 decode failed") + return base64_decode(value) class KinesisDataStreamRecord(BaseModel): diff --git a/aws_lambda_powertools/utilities/parser/models/kinesis_firehose.py b/aws_lambda_powertools/utilities/parser/models/kinesis_firehose.py new file mode 100644 index 00000000000..c59d8c680e5 --- /dev/null +++ b/aws_lambda_powertools/utilities/parser/models/kinesis_firehose.py @@ -0,0 +1,32 @@ +from typing import List, Optional, Type, Union + +from pydantic import BaseModel, PositiveInt, validator + +from aws_lambda_powertools.shared.functions import base64_decode + + +class KinesisFirehoseRecordMetadata(BaseModel): + shardId: str + partitionKey: str + approximateArrivalTimestamp: PositiveInt + sequenceNumber: str + subsequenceNumber: str + + +class KinesisFirehoseRecord(BaseModel): + data: Union[bytes, Type[BaseModel]] # base64 encoded str is parsed into bytes + recordId: str + approximateArrivalTimestamp: PositiveInt + kinesisRecordMetadata: Optional[KinesisFirehoseRecordMetadata] + + @validator("data", pre=True, allow_reuse=True) + def data_base64_decode(cls, value): + return base64_decode(value) + + +class KinesisFirehoseModel(BaseModel): + invocationId: str + deliveryStreamArn: str + region: str + sourceKinesisStreamArn: Optional[str] + records: List[KinesisFirehoseRecord] diff --git a/docs/utilities/parser.md b/docs/utilities/parser.md index cdcb949d28a..48c244c8df2 100644 --- a/docs/utilities/parser.md +++ b/docs/utilities/parser.md @@ -163,6 +163,7 @@ Parser comes with the following built-in models: | **S3Model** | Lambda Event Source payload for Amazon S3 | | **S3ObjectLambdaEvent** | Lambda Event Source payload for Amazon S3 Object Lambda | | **KinesisDataStreamModel** | Lambda Event Source payload for Amazon Kinesis Data Streams | +| **KinesisFirehoseModel** | Lambda Event Source payload for Amazon Kinesis Firehose | | **SesModel** | Lambda Event Source payload for Amazon Simple Email Service | | **SnsModel** | Lambda Event Source payload for Amazon Simple Notification Service | | **APIGatewayProxyEventModel** | Lambda Event Source payload for Amazon API Gateway | @@ -319,6 +320,7 @@ Parser comes with the following built-in envelopes, where `Model` in the return | **SqsEnvelope** | 1. Parses data using `SqsModel`.
2. Parses records in `body` key using your model and return them in a list. | `List[Model]` | | **CloudWatchLogsEnvelope** | 1. Parses data using `CloudwatchLogsModel` which will base64 decode and decompress it.
2. Parses records in `message` key using your model and return them in a list. | `List[Model]` | | **KinesisDataStreamEnvelope** | 1. Parses data using `KinesisDataStreamModel` which will base64 decode it.
2. Parses records in in `Records` key using your model and returns them in a list. | `List[Model]` | +| **KinesisFirehoseEnvelope** | 1. Parses data using `KinesisFirehoseModel` which will base64 decode it.
2. Parses records in in `Records` key using your model and returns them in a list. | `List[Model]` | | **SnsEnvelope** | 1. Parses data using `SnsModel`.
2. Parses records in `body` key using your model and return them in a list. | `List[Model]` | | **SnsSqsEnvelope** | 1. Parses data using `SqsModel`.
2. Parses SNS records in `body` key using `SnsNotificationModel`.
3. Parses data in `Message` key using your model and return them in a list. | `List[Model]` | | **ApiGatewayEnvelope** | 1. Parses data using `APIGatewayProxyEventModel`.
2. Parses `body` key using your model and returns it. | `Model` | diff --git a/tests/events/kinesisFirehoseKinesisEvent.json b/tests/events/kinesisFirehoseKinesisEvent.json new file mode 100644 index 00000000000..5120dd57ccb --- /dev/null +++ b/tests/events/kinesisFirehoseKinesisEvent.json @@ -0,0 +1,32 @@ +{ + "invocationId": "2b4d1ad9-2f48-94bd-a088-767c317e994a", + "sourceKinesisStreamArn":"arn:aws:kinesis:us-east-1:123456789012:stream/kinesis-source", + "deliveryStreamArn": "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name", + "region": "us-east-2", + "records": [ + { + "data": "SGVsbG8gV29ybGQ=", + "recordId": "record1", + "approximateArrivalTimestamp": 1664028820148, + "kinesisRecordMetadata": { + "shardId": "shardId-000000000000", + "partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c317a", + "approximateArrivalTimestamp": 1664028820148, + "sequenceNumber": "49546986683135544286507457936321625675700192471156785154", + "subsequenceNumber": "" + } + }, + { + "data": "eyJIZWxsbyI6ICJXb3JsZCJ9", + "recordId": "record2", + "approximateArrivalTimestamp": 1664028793294, + "kinesisRecordMetadata": { + "shardId": "shardId-000000000001", + "partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c318a", + "approximateArrivalTimestamp": 1664028793294, + "sequenceNumber": "49546986683135544286507457936321625675700192471156785155", + "subsequenceNumber": "" + } + } + ] +} diff --git a/tests/events/kinesisFirehosePutEvent.json b/tests/events/kinesisFirehosePutEvent.json new file mode 100644 index 00000000000..27aeddd80eb --- /dev/null +++ b/tests/events/kinesisFirehosePutEvent.json @@ -0,0 +1,17 @@ +{ + "invocationId": "2b4d1ad9-2f48-94bd-a088-767c317e994a", + "deliveryStreamArn": "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name", + "region": "us-east-2", + "records":[ + { + "recordId":"record1", + "approximateArrivalTimestamp":1664029185290, + "data":"SGVsbG8gV29ybGQ=" + }, + { + "recordId":"record2", + "approximateArrivalTimestamp":1664029186945, + "data":"eyJIZWxsbyI6ICJXb3JsZCJ9" + } + ] + } diff --git a/tests/functional/parser/schemas.py b/tests/functional/parser/schemas.py index b1b66c63379..907deb40aa0 100644 --- a/tests/functional/parser/schemas.py +++ b/tests/functional/parser/schemas.py @@ -95,3 +95,7 @@ class MyALambdaFuncUrlBusiness(BaseModel): class MyLambdaKafkaBusiness(BaseModel): key: str + + +class MyKinesisFirehoseBusiness(BaseModel): + Hello: str diff --git a/tests/functional/parser/test_kinesis_firehose.py b/tests/functional/parser/test_kinesis_firehose.py new file mode 100644 index 00000000000..59bbd2f4e18 --- /dev/null +++ b/tests/functional/parser/test_kinesis_firehose.py @@ -0,0 +1,114 @@ +from typing import List + +import pytest + +from aws_lambda_powertools.utilities.parser import ( + ValidationError, + envelopes, + event_parser, +) +from aws_lambda_powertools.utilities.parser.models import ( + KinesisFirehoseModel, + KinesisFirehoseRecord, + KinesisFirehoseRecordMetadata, +) +from aws_lambda_powertools.utilities.typing import LambdaContext +from tests.functional.parser.schemas import MyKinesisFirehoseBusiness +from tests.functional.utils import load_event + + +@event_parser(model=MyKinesisFirehoseBusiness, envelope=envelopes.KinesisFirehoseEnvelope) +def handle_firehose(event: List[MyKinesisFirehoseBusiness], _: LambdaContext): + assert len(event) == 1 + assert event[0].Hello == "World" + + +@event_parser(model=KinesisFirehoseModel) +def handle_firehose_no_envelope_kinesis(event: KinesisFirehoseModel, _: LambdaContext): + assert event.region == "us-east-2" + assert event.invocationId == "2b4d1ad9-2f48-94bd-a088-767c317e994a" + assert event.deliveryStreamArn == "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name" + assert event.sourceKinesisStreamArn == "arn:aws:kinesis:us-east-1:123456789012:stream/kinesis-source" + + records = list(event.records) + assert len(records) == 2 + record_01: KinesisFirehoseRecord = records[0] + assert record_01.approximateArrivalTimestamp == 1664028820148 + assert record_01.recordId == "record1" + assert record_01.data == b"Hello World" + + metadata_01: KinesisFirehoseRecordMetadata = record_01.kinesisRecordMetadata + assert metadata_01.partitionKey == "4d1ad2b9-24f8-4b9d-a088-76e9947c317a" + assert metadata_01.subsequenceNumber == "" + assert metadata_01.shardId == "shardId-000000000000" + assert metadata_01.approximateArrivalTimestamp == 1664028820148 + assert metadata_01.sequenceNumber == "49546986683135544286507457936321625675700192471156785154" + + record_02: KinesisFirehoseRecord = records[1] + assert record_02.approximateArrivalTimestamp == 1664028793294 + assert record_02.recordId == "record2" + assert record_02.data == b'{"Hello": "World"}' + + metadata_02: KinesisFirehoseRecordMetadata = record_02.kinesisRecordMetadata + assert metadata_02.partitionKey == "4d1ad2b9-24f8-4b9d-a088-76e9947c318a" + assert metadata_02.subsequenceNumber == "" + assert metadata_02.shardId == "shardId-000000000001" + assert metadata_02.approximateArrivalTimestamp == 1664028793294 + assert metadata_02.sequenceNumber == "49546986683135544286507457936321625675700192471156785155" + + +@event_parser(model=KinesisFirehoseModel) +def handle_firehose_no_envelope_put(event: KinesisFirehoseModel, _: LambdaContext): + assert event.region == "us-east-2" + assert event.invocationId == "2b4d1ad9-2f48-94bd-a088-767c317e994a" + assert event.deliveryStreamArn == "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name" + + records = list(event.records) + assert len(records) == 2 + + record_01: KinesisFirehoseRecord = records[0] + assert record_01.approximateArrivalTimestamp == 1664029185290 + assert record_01.recordId == "record1" + assert record_01.data == b"Hello World" + + record_02: KinesisFirehoseRecord = records[1] + assert record_02.approximateArrivalTimestamp == 1664029186945 + assert record_02.recordId == "record2" + assert record_02.data == b'{"Hello": "World"}' + + +def test_firehose_trigger_event(): + event_dict = load_event("kinesisFirehoseKinesisEvent.json") + event_dict["records"].pop(0) # remove first item since the payload is bytes and we want to test payload json class + handle_firehose(event_dict, LambdaContext()) + + +def test_firehose_trigger_event_kinesis_no_envelope(): + event_dict = load_event("kinesisFirehoseKinesisEvent.json") + handle_firehose_no_envelope_kinesis(event_dict, LambdaContext()) + + +def test_firehose_trigger_event_put_no_envelope(): + event_dict = load_event("kinesisFirehosePutEvent.json") + handle_firehose_no_envelope_put(event_dict, LambdaContext()) + + +def test_kinesis_trigger_bad_base64_event(): + event_dict = load_event("kinesisFirehoseKinesisEvent.json") + event_dict["records"][0]["data"] = {"bad base64"} + with pytest.raises(ValidationError): + handle_firehose_no_envelope_kinesis(event_dict, LambdaContext()) + + +def test_kinesis_trigger_bad_timestamp_event(): + event_dict = load_event("kinesisFirehoseKinesisEvent.json") + event_dict["records"][0]["approximateArrivalTimestamp"] = -1 + with pytest.raises(ValidationError): + handle_firehose_no_envelope_kinesis(event_dict, LambdaContext()) + + +def test_kinesis_trigger_bad_metadata_timestamp_event(): + event_dict = load_event("kinesisFirehoseKinesisEvent.json") + event_dict["records"][0]["kinesisRecordMetadata"]["approximateArrivalTimestamp"] = "-1" + with pytest.raises(ValidationError): + handle_firehose_no_envelope_kinesis(event_dict, LambdaContext())