diff --git a/aws_lambda_powertools/shared/functions.py b/aws_lambda_powertools/shared/functions.py
index 2212eb77e18..30070382d31 100644
--- a/aws_lambda_powertools/shared/functions.py
+++ b/aws_lambda_powertools/shared/functions.py
@@ -71,7 +71,7 @@ def resolve_env_var_choice(
def base64_decode(value: str) -> bytes:
try:
- logger.debug("Decoding base64 Kafka record item before parsing")
+ logger.debug("Decoding base64 record item before parsing")
return base64.b64decode(value)
except (BinAsciiError, TypeError):
raise ValueError("base64 decode failed")
diff --git a/aws_lambda_powertools/utilities/parser/envelopes/__init__.py b/aws_lambda_powertools/utilities/parser/envelopes/__init__.py
index 4b0e4c943a2..0f985f29d88 100644
--- a/aws_lambda_powertools/utilities/parser/envelopes/__init__.py
+++ b/aws_lambda_powertools/utilities/parser/envelopes/__init__.py
@@ -6,6 +6,7 @@
from .event_bridge import EventBridgeEnvelope
from .kafka import KafkaEnvelope
from .kinesis import KinesisDataStreamEnvelope
+from .kinesis_firehose import KinesisFirehoseEnvelope
from .lambda_function_url import LambdaFunctionUrlEnvelope
from .sns import SnsEnvelope, SnsSqsEnvelope
from .sqs import SqsEnvelope
@@ -17,6 +18,7 @@
"DynamoDBStreamEnvelope",
"EventBridgeEnvelope",
"KinesisDataStreamEnvelope",
+ "KinesisFirehoseEnvelope",
"LambdaFunctionUrlEnvelope",
"SnsEnvelope",
"SnsSqsEnvelope",
diff --git a/aws_lambda_powertools/utilities/parser/envelopes/kinesis.py b/aws_lambda_powertools/utilities/parser/envelopes/kinesis.py
index 9ff221a7b7b..24104ebd40c 100644
--- a/aws_lambda_powertools/utilities/parser/envelopes/kinesis.py
+++ b/aws_lambda_powertools/utilities/parser/envelopes/kinesis.py
@@ -16,7 +16,7 @@ class KinesisDataStreamEnvelope(BaseEnvelope):
Regardless of its type it'll be parsed into a BaseModel object.
Note: Records will be parsed the same way so if model is str,
- all items in the list will be parsed as str and npt as JSON (and vice versa)
+ all items in the list will be parsed as str and not as JSON (and vice versa)
"""
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]:
diff --git a/aws_lambda_powertools/utilities/parser/envelopes/kinesis_firehose.py b/aws_lambda_powertools/utilities/parser/envelopes/kinesis_firehose.py
new file mode 100644
index 00000000000..c8dd936512c
--- /dev/null
+++ b/aws_lambda_powertools/utilities/parser/envelopes/kinesis_firehose.py
@@ -0,0 +1,47 @@
+import logging
+from typing import Any, Dict, List, Optional, Type, Union, cast
+
+from ..models import KinesisFirehoseModel
+from ..types import Model
+from .base import BaseEnvelope
+
+logger = logging.getLogger(__name__)
+
+
+class KinesisFirehoseEnvelope(BaseEnvelope):
+ """Kinesis Firehose Envelope to extract array of Records
+
+ The record's data parameter is a base64 encoded string which is parsed into a bytes array,
+ though it can also be a JSON encoded string.
+ Regardless of its type it'll be parsed into a BaseModel object.
+
+ Note: Records will be parsed the same way so if model is str,
+ all items in the list will be parsed as str and not as JSON (and vice versa)
+
+ https://docs.aws.amazon.com/lambda/latest/dg/services-kinesisfirehose.html
+ """
+
+ def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]:
+ """Parses records found with model provided
+
+ Parameters
+ ----------
+ data : Dict
+ Lambda event to be parsed
+ model : Type[Model]
+ Data model provided to parse after extracting data using envelope
+
+ Returns
+ -------
+ List
+ List of records parsed with model provided
+ """
+ logger.debug(f"Parsing incoming data with Kinesis Firehose model {KinesisFirehoseModel}")
+ parsed_envelope: KinesisFirehoseModel = KinesisFirehoseModel.parse_obj(data)
+ logger.debug(f"Parsing Kinesis Firehose records in `body` with {model}")
+ models = []
+ for record in parsed_envelope.records:
+ # We allow either AWS expected contract (bytes) or a custom Model, see #943
+ data = cast(bytes, record.data)
+ models.append(self._parse(data=data.decode("utf-8"), model=model))
+ return models
diff --git a/aws_lambda_powertools/utilities/parser/models/__init__.py b/aws_lambda_powertools/utilities/parser/models/__init__.py
index 52059cb9ee7..62e28a62374 100644
--- a/aws_lambda_powertools/utilities/parser/models/__init__.py
+++ b/aws_lambda_powertools/utilities/parser/models/__init__.py
@@ -37,6 +37,11 @@
KinesisDataStreamRecord,
KinesisDataStreamRecordPayload,
)
+from .kinesis_firehose import (
+ KinesisFirehoseModel,
+ KinesisFirehoseRecord,
+ KinesisFirehoseRecordMetadata,
+)
from .lambda_function_url import LambdaFunctionUrlModel
from .s3 import S3Model, S3RecordModel
from .s3_object_event import (
@@ -86,6 +91,9 @@
"KinesisDataStreamModel",
"KinesisDataStreamRecord",
"KinesisDataStreamRecordPayload",
+ "KinesisFirehoseModel",
+ "KinesisFirehoseRecord",
+ "KinesisFirehoseRecordMetadata",
"LambdaFunctionUrlModel",
"S3Model",
"S3RecordModel",
diff --git a/aws_lambda_powertools/utilities/parser/models/kinesis.py b/aws_lambda_powertools/utilities/parser/models/kinesis.py
index be868ca44ba..ffc89bcbdaa 100644
--- a/aws_lambda_powertools/utilities/parser/models/kinesis.py
+++ b/aws_lambda_powertools/utilities/parser/models/kinesis.py
@@ -1,14 +1,10 @@
-import base64
-import logging
-from binascii import Error as BinAsciiError
from typing import List, Type, Union
from pydantic import BaseModel, validator
+from aws_lambda_powertools.shared.functions import base64_decode
from aws_lambda_powertools.utilities.parser.types import Literal
-logger = logging.getLogger(__name__)
-
class KinesisDataStreamRecordPayload(BaseModel):
kinesisSchemaVersion: str
@@ -19,11 +15,7 @@ class KinesisDataStreamRecordPayload(BaseModel):
@validator("data", pre=True, allow_reuse=True)
def data_base64_decode(cls, value):
- try:
- logger.debug("Decoding base64 Kinesis data record before parsing")
- return base64.b64decode(value)
- except (BinAsciiError, TypeError):
- raise ValueError("base64 decode failed")
+ return base64_decode(value)
class KinesisDataStreamRecord(BaseModel):
diff --git a/aws_lambda_powertools/utilities/parser/models/kinesis_firehose.py b/aws_lambda_powertools/utilities/parser/models/kinesis_firehose.py
new file mode 100644
index 00000000000..c59d8c680e5
--- /dev/null
+++ b/aws_lambda_powertools/utilities/parser/models/kinesis_firehose.py
@@ -0,0 +1,32 @@
+from typing import List, Optional, Type, Union
+
+from pydantic import BaseModel, PositiveInt, validator
+
+from aws_lambda_powertools.shared.functions import base64_decode
+
+
+class KinesisFirehoseRecordMetadata(BaseModel):
+ shardId: str
+ partitionKey: str
+ approximateArrivalTimestamp: PositiveInt
+ sequenceNumber: str
+ subsequenceNumber: str
+
+
+class KinesisFirehoseRecord(BaseModel):
+ data: Union[bytes, Type[BaseModel]] # base64 encoded str is parsed into bytes
+ recordId: str
+ approximateArrivalTimestamp: PositiveInt
+ kinesisRecordMetadata: Optional[KinesisFirehoseRecordMetadata]
+
+ @validator("data", pre=True, allow_reuse=True)
+ def data_base64_decode(cls, value):
+ return base64_decode(value)
+
+
+class KinesisFirehoseModel(BaseModel):
+ invocationId: str
+ deliveryStreamArn: str
+ region: str
+ sourceKinesisStreamArn: Optional[str]
+ records: List[KinesisFirehoseRecord]
diff --git a/docs/utilities/parser.md b/docs/utilities/parser.md
index cdcb949d28a..48c244c8df2 100644
--- a/docs/utilities/parser.md
+++ b/docs/utilities/parser.md
@@ -163,6 +163,7 @@ Parser comes with the following built-in models:
| **S3Model** | Lambda Event Source payload for Amazon S3 |
| **S3ObjectLambdaEvent** | Lambda Event Source payload for Amazon S3 Object Lambda |
| **KinesisDataStreamModel** | Lambda Event Source payload for Amazon Kinesis Data Streams |
+| **KinesisFirehoseModel** | Lambda Event Source payload for Amazon Kinesis Firehose |
| **SesModel** | Lambda Event Source payload for Amazon Simple Email Service |
| **SnsModel** | Lambda Event Source payload for Amazon Simple Notification Service |
| **APIGatewayProxyEventModel** | Lambda Event Source payload for Amazon API Gateway |
@@ -319,6 +320,7 @@ Parser comes with the following built-in envelopes, where `Model` in the return
| **SqsEnvelope** | 1. Parses data using `SqsModel`.
2. Parses records in `body` key using your model and return them in a list. | `List[Model]` |
| **CloudWatchLogsEnvelope** | 1. Parses data using `CloudwatchLogsModel` which will base64 decode and decompress it.
2. Parses records in `message` key using your model and return them in a list. | `List[Model]` |
| **KinesisDataStreamEnvelope** | 1. Parses data using `KinesisDataStreamModel` which will base64 decode it.
2. Parses records in in `Records` key using your model and returns them in a list. | `List[Model]` |
+| **KinesisFirehoseEnvelope** | 1. Parses data using `KinesisFirehoseModel` which will base64 decode it.
2. Parses records in in `Records` key using your model and returns them in a list. | `List[Model]` |
| **SnsEnvelope** | 1. Parses data using `SnsModel`.
2. Parses records in `body` key using your model and return them in a list. | `List[Model]` |
| **SnsSqsEnvelope** | 1. Parses data using `SqsModel`.
2. Parses SNS records in `body` key using `SnsNotificationModel`.
3. Parses data in `Message` key using your model and return them in a list. | `List[Model]` |
| **ApiGatewayEnvelope** | 1. Parses data using `APIGatewayProxyEventModel`.
2. Parses `body` key using your model and returns it. | `Model` |
diff --git a/tests/events/kinesisFirehoseKinesisEvent.json b/tests/events/kinesisFirehoseKinesisEvent.json
new file mode 100644
index 00000000000..5120dd57ccb
--- /dev/null
+++ b/tests/events/kinesisFirehoseKinesisEvent.json
@@ -0,0 +1,32 @@
+{
+ "invocationId": "2b4d1ad9-2f48-94bd-a088-767c317e994a",
+ "sourceKinesisStreamArn":"arn:aws:kinesis:us-east-1:123456789012:stream/kinesis-source",
+ "deliveryStreamArn": "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name",
+ "region": "us-east-2",
+ "records": [
+ {
+ "data": "SGVsbG8gV29ybGQ=",
+ "recordId": "record1",
+ "approximateArrivalTimestamp": 1664028820148,
+ "kinesisRecordMetadata": {
+ "shardId": "shardId-000000000000",
+ "partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c317a",
+ "approximateArrivalTimestamp": 1664028820148,
+ "sequenceNumber": "49546986683135544286507457936321625675700192471156785154",
+ "subsequenceNumber": ""
+ }
+ },
+ {
+ "data": "eyJIZWxsbyI6ICJXb3JsZCJ9",
+ "recordId": "record2",
+ "approximateArrivalTimestamp": 1664028793294,
+ "kinesisRecordMetadata": {
+ "shardId": "shardId-000000000001",
+ "partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c318a",
+ "approximateArrivalTimestamp": 1664028793294,
+ "sequenceNumber": "49546986683135544286507457936321625675700192471156785155",
+ "subsequenceNumber": ""
+ }
+ }
+ ]
+}
diff --git a/tests/events/kinesisFirehosePutEvent.json b/tests/events/kinesisFirehosePutEvent.json
new file mode 100644
index 00000000000..27aeddd80eb
--- /dev/null
+++ b/tests/events/kinesisFirehosePutEvent.json
@@ -0,0 +1,17 @@
+{
+ "invocationId": "2b4d1ad9-2f48-94bd-a088-767c317e994a",
+ "deliveryStreamArn": "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name",
+ "region": "us-east-2",
+ "records":[
+ {
+ "recordId":"record1",
+ "approximateArrivalTimestamp":1664029185290,
+ "data":"SGVsbG8gV29ybGQ="
+ },
+ {
+ "recordId":"record2",
+ "approximateArrivalTimestamp":1664029186945,
+ "data":"eyJIZWxsbyI6ICJXb3JsZCJ9"
+ }
+ ]
+ }
diff --git a/tests/functional/parser/schemas.py b/tests/functional/parser/schemas.py
index b1b66c63379..907deb40aa0 100644
--- a/tests/functional/parser/schemas.py
+++ b/tests/functional/parser/schemas.py
@@ -95,3 +95,7 @@ class MyALambdaFuncUrlBusiness(BaseModel):
class MyLambdaKafkaBusiness(BaseModel):
key: str
+
+
+class MyKinesisFirehoseBusiness(BaseModel):
+ Hello: str
diff --git a/tests/functional/parser/test_kinesis_firehose.py b/tests/functional/parser/test_kinesis_firehose.py
new file mode 100644
index 00000000000..59bbd2f4e18
--- /dev/null
+++ b/tests/functional/parser/test_kinesis_firehose.py
@@ -0,0 +1,114 @@
+from typing import List
+
+import pytest
+
+from aws_lambda_powertools.utilities.parser import (
+ ValidationError,
+ envelopes,
+ event_parser,
+)
+from aws_lambda_powertools.utilities.parser.models import (
+ KinesisFirehoseModel,
+ KinesisFirehoseRecord,
+ KinesisFirehoseRecordMetadata,
+)
+from aws_lambda_powertools.utilities.typing import LambdaContext
+from tests.functional.parser.schemas import MyKinesisFirehoseBusiness
+from tests.functional.utils import load_event
+
+
+@event_parser(model=MyKinesisFirehoseBusiness, envelope=envelopes.KinesisFirehoseEnvelope)
+def handle_firehose(event: List[MyKinesisFirehoseBusiness], _: LambdaContext):
+ assert len(event) == 1
+ assert event[0].Hello == "World"
+
+
+@event_parser(model=KinesisFirehoseModel)
+def handle_firehose_no_envelope_kinesis(event: KinesisFirehoseModel, _: LambdaContext):
+ assert event.region == "us-east-2"
+ assert event.invocationId == "2b4d1ad9-2f48-94bd-a088-767c317e994a"
+ assert event.deliveryStreamArn == "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name"
+ assert event.sourceKinesisStreamArn == "arn:aws:kinesis:us-east-1:123456789012:stream/kinesis-source"
+
+ records = list(event.records)
+ assert len(records) == 2
+ record_01: KinesisFirehoseRecord = records[0]
+ assert record_01.approximateArrivalTimestamp == 1664028820148
+ assert record_01.recordId == "record1"
+ assert record_01.data == b"Hello World"
+
+ metadata_01: KinesisFirehoseRecordMetadata = record_01.kinesisRecordMetadata
+ assert metadata_01.partitionKey == "4d1ad2b9-24f8-4b9d-a088-76e9947c317a"
+ assert metadata_01.subsequenceNumber == ""
+ assert metadata_01.shardId == "shardId-000000000000"
+ assert metadata_01.approximateArrivalTimestamp == 1664028820148
+ assert metadata_01.sequenceNumber == "49546986683135544286507457936321625675700192471156785154"
+
+ record_02: KinesisFirehoseRecord = records[1]
+ assert record_02.approximateArrivalTimestamp == 1664028793294
+ assert record_02.recordId == "record2"
+ assert record_02.data == b'{"Hello": "World"}'
+
+ metadata_02: KinesisFirehoseRecordMetadata = record_02.kinesisRecordMetadata
+ assert metadata_02.partitionKey == "4d1ad2b9-24f8-4b9d-a088-76e9947c318a"
+ assert metadata_02.subsequenceNumber == ""
+ assert metadata_02.shardId == "shardId-000000000001"
+ assert metadata_02.approximateArrivalTimestamp == 1664028793294
+ assert metadata_02.sequenceNumber == "49546986683135544286507457936321625675700192471156785155"
+
+
+@event_parser(model=KinesisFirehoseModel)
+def handle_firehose_no_envelope_put(event: KinesisFirehoseModel, _: LambdaContext):
+ assert event.region == "us-east-2"
+ assert event.invocationId == "2b4d1ad9-2f48-94bd-a088-767c317e994a"
+ assert event.deliveryStreamArn == "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name"
+
+ records = list(event.records)
+ assert len(records) == 2
+
+ record_01: KinesisFirehoseRecord = records[0]
+ assert record_01.approximateArrivalTimestamp == 1664029185290
+ assert record_01.recordId == "record1"
+ assert record_01.data == b"Hello World"
+
+ record_02: KinesisFirehoseRecord = records[1]
+ assert record_02.approximateArrivalTimestamp == 1664029186945
+ assert record_02.recordId == "record2"
+ assert record_02.data == b'{"Hello": "World"}'
+
+
+def test_firehose_trigger_event():
+ event_dict = load_event("kinesisFirehoseKinesisEvent.json")
+ event_dict["records"].pop(0) # remove first item since the payload is bytes and we want to test payload json class
+ handle_firehose(event_dict, LambdaContext())
+
+
+def test_firehose_trigger_event_kinesis_no_envelope():
+ event_dict = load_event("kinesisFirehoseKinesisEvent.json")
+ handle_firehose_no_envelope_kinesis(event_dict, LambdaContext())
+
+
+def test_firehose_trigger_event_put_no_envelope():
+ event_dict = load_event("kinesisFirehosePutEvent.json")
+ handle_firehose_no_envelope_put(event_dict, LambdaContext())
+
+
+def test_kinesis_trigger_bad_base64_event():
+ event_dict = load_event("kinesisFirehoseKinesisEvent.json")
+ event_dict["records"][0]["data"] = {"bad base64"}
+ with pytest.raises(ValidationError):
+ handle_firehose_no_envelope_kinesis(event_dict, LambdaContext())
+
+
+def test_kinesis_trigger_bad_timestamp_event():
+ event_dict = load_event("kinesisFirehoseKinesisEvent.json")
+ event_dict["records"][0]["approximateArrivalTimestamp"] = -1
+ with pytest.raises(ValidationError):
+ handle_firehose_no_envelope_kinesis(event_dict, LambdaContext())
+
+
+def test_kinesis_trigger_bad_metadata_timestamp_event():
+ event_dict = load_event("kinesisFirehoseKinesisEvent.json")
+ event_dict["records"][0]["kinesisRecordMetadata"]["approximateArrivalTimestamp"] = "-1"
+ with pytest.raises(ValidationError):
+ handle_firehose_no_envelope_kinesis(event_dict, LambdaContext())