diff --git a/aws_lambda_powertools/utilities/parser/envelopes/__init__.py b/aws_lambda_powertools/utilities/parser/envelopes/__init__.py
index 8f68f0cca36..d9d820aede0 100644
--- a/aws_lambda_powertools/utilities/parser/envelopes/__init__.py
+++ b/aws_lambda_powertools/utilities/parser/envelopes/__init__.py
@@ -2,6 +2,7 @@
from .cloudwatch import CloudWatchLogsEnvelope
from .dynamodb import DynamoDBStreamEnvelope
from .event_bridge import EventBridgeEnvelope
+from .kinesis import KinesisDataStreamEnvelope
from .sns import SnsEnvelope
from .sqs import SqsEnvelope
@@ -9,6 +10,7 @@
"CloudWatchLogsEnvelope",
"DynamoDBStreamEnvelope",
"EventBridgeEnvelope",
+ "KinesisDataStreamEnvelope",
"SnsEnvelope",
"SqsEnvelope",
"BaseEnvelope",
diff --git a/aws_lambda_powertools/utilities/parser/envelopes/kinesis.py b/aws_lambda_powertools/utilities/parser/envelopes/kinesis.py
new file mode 100644
index 00000000000..97ad7bffec7
--- /dev/null
+++ b/aws_lambda_powertools/utilities/parser/envelopes/kinesis.py
@@ -0,0 +1,43 @@
+import logging
+from typing import Any, Dict, List, Optional, Union
+
+from ..models import KinesisDataStreamModel
+from ..types import Model
+from .base import BaseEnvelope
+
+logger = logging.getLogger(__name__)
+
+
+class KinesisDataStreamEnvelope(BaseEnvelope):
+ """Kinesis Data Stream Envelope to extract array of Records
+
+ The record's data parameter is a base64 encoded string which is parsed into a bytes array,
+ though it can also be a JSON encoded string.
+ Regardless of its type it'll be parsed into a BaseModel object.
+
+ Note: Records will be parsed the same way so if model is str,
+ all items in the list will be parsed as str and npt as JSON (and vice versa)
+ """
+
+ def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Model) -> List[Optional[Model]]:
+ """Parses records found with model provided
+
+ Parameters
+ ----------
+ data : Dict
+ Lambda event to be parsed
+ model : Model
+ Data model provided to parse after extracting data using envelope
+
+ Returns
+ -------
+ List
+ List of records parsed with model provided
+ """
+ logger.debug(f"Parsing incoming data with Kinesis model {KinesisDataStreamModel}")
+ parsed_envelope: KinesisDataStreamModel = KinesisDataStreamModel.parse_obj(data)
+ output = []
+ logger.debug(f"Parsing Kinesis records in `body` with {model}")
+ for record in parsed_envelope.Records:
+ output.append(self._parse(data=record.kinesis.data.decode("utf-8"), model=model))
+ return output
diff --git a/aws_lambda_powertools/utilities/parser/models/__init__.py b/aws_lambda_powertools/utilities/parser/models/__init__.py
index c6efec0643a..45230b8c79e 100644
--- a/aws_lambda_powertools/utilities/parser/models/__init__.py
+++ b/aws_lambda_powertools/utilities/parser/models/__init__.py
@@ -2,6 +2,7 @@
from .cloudwatch import CloudWatchLogsData, CloudWatchLogsDecode, CloudWatchLogsLogEvent, CloudWatchLogsModel
from .dynamodb import DynamoDBStreamChangedRecordModel, DynamoDBStreamModel, DynamoDBStreamRecordModel
from .event_bridge import EventBridgeModel
+from .kinesis import KinesisDataStreamModel, KinesisDataStreamRecord, KinesisDataStreamRecordPayload
from .s3 import S3Model, S3RecordModel
from .ses import SesModel, SesRecordModel
from .sns import SnsModel, SnsNotificationModel, SnsRecordModel
@@ -19,6 +20,9 @@
"EventBridgeModel",
"DynamoDBStreamChangedRecordModel",
"DynamoDBStreamRecordModel",
+ "KinesisDataStreamModel",
+ "KinesisDataStreamRecord",
+ "KinesisDataStreamRecordPayload",
"S3Model",
"S3RecordModel",
"SesModel",
diff --git a/aws_lambda_powertools/utilities/parser/models/kinesis.py b/aws_lambda_powertools/utilities/parser/models/kinesis.py
new file mode 100644
index 00000000000..d2852e9f4a8
--- /dev/null
+++ b/aws_lambda_powertools/utilities/parser/models/kinesis.py
@@ -0,0 +1,41 @@
+import base64
+import logging
+from binascii import Error as BinAsciiError
+from typing import List
+
+from pydantic import BaseModel, validator
+from pydantic.types import PositiveInt
+from typing_extensions import Literal
+
+logger = logging.getLogger(__name__)
+
+
+class KinesisDataStreamRecordPayload(BaseModel):
+ kinesisSchemaVersion: str
+ partitionKey: str
+ sequenceNumber: PositiveInt
+ data: bytes # base64 encoded str is parsed into bytes
+ approximateArrivalTimestamp: float
+
+ @validator("data", pre=True)
+ def data_base64_decode(cls, value):
+ try:
+ logger.debug("Decoding base64 Kinesis data record before parsing")
+ return base64.b64decode(value)
+ except (BinAsciiError, TypeError):
+ raise ValueError("base64 decode failed")
+
+
+class KinesisDataStreamRecord(BaseModel):
+ eventSource: Literal["aws:kinesis"]
+ eventVersion: str
+ eventID: str
+ eventName: Literal["aws:kinesis:record"]
+ invokeIdentityArn: str
+ awsRegion: str
+ eventSourceARN: str
+ kinesis: KinesisDataStreamRecordPayload
+
+
+class KinesisDataStreamModel(BaseModel):
+ Records: List[KinesisDataStreamRecord]
diff --git a/docs/content/utilities/parser.mdx b/docs/content/utilities/parser.mdx
index b620f0eb18f..9b7457ef21a 100644
--- a/docs/content/utilities/parser.mdx
+++ b/docs/content/utilities/parser.mdx
@@ -159,6 +159,7 @@ Model name | Description
**AlbModel** | Lambda Event Source payload for Amazon Application Load Balancer
**CloudwatchLogsModel** | Lambda Event Source payload for Amazon CloudWatch Logs
**S3Model** | Lambda Event Source payload for Amazon S3
+**KinesisDataStreamModel** | Lambda Event Source payload for Amazon Kinesis Data Streams
You can extend them to include your own models, and yet have all other known fields parsed along the way.
@@ -296,6 +297,7 @@ Envelope name | Behaviour | Return
**EventBridgeEnvelope** | 1. Parses data using `EventBridgeModel`.
2. Parses `detail` key using your model and returns it. | `Model`
**SqsEnvelope** | 1. Parses data using `SqsModel`.
2. Parses records in `body` key using your model and return them in a list. | `List[Model]`
**CloudWatchLogsEnvelope** | 1. Parses data using `CloudwatchLogsModel` which will base64 decode and decompress it.
2. Parses records in `message` key using your model and return them in a list. | `List[Model]`
+**KinesisDataStreamEnvelope** | 1. Parses data using `KinesisDataStreamModel` which will base64 decode it.
2. Parses records in in `Records` key using your model and returns them in a list. | `List[Model]`
### Bringing your own envelope
diff --git a/tests/functional/parser/schemas.py b/tests/functional/parser/schemas.py
index 23614a138b4..f3aa39200a9 100644
--- a/tests/functional/parser/schemas.py
+++ b/tests/functional/parser/schemas.py
@@ -73,6 +73,11 @@ class MyAdvancedSnsBusiness(SnsModel):
Records: List[MyAdvancedSnsRecordModel]
+class MyKinesisBusiness(BaseModel):
+ message: str
+ username: str
+
+
class MyCloudWatchBusiness(BaseModel):
my_message: str
user: str
diff --git a/tests/functional/parser/test_kinesis.py b/tests/functional/parser/test_kinesis.py
new file mode 100644
index 00000000000..5a7a94e0dac
--- /dev/null
+++ b/tests/functional/parser/test_kinesis.py
@@ -0,0 +1,106 @@
+from typing import Any, List
+
+import pytest
+
+from aws_lambda_powertools.utilities.parser import ValidationError, envelopes, event_parser
+from aws_lambda_powertools.utilities.parser.models import KinesisDataStreamModel, KinesisDataStreamRecordPayload
+from aws_lambda_powertools.utilities.typing import LambdaContext
+from tests.functional.parser.schemas import MyKinesisBusiness
+from tests.functional.parser.utils import load_event
+
+
+@event_parser(model=MyKinesisBusiness, envelope=envelopes.KinesisDataStreamEnvelope)
+def handle_kinesis(event: List[MyKinesisBusiness], _: LambdaContext):
+ assert len(event) == 1
+ record: KinesisDataStreamModel = event[0]
+ assert record.message == "test message"
+ assert record.username == "test"
+
+
+@event_parser(model=KinesisDataStreamModel)
+def handle_kinesis_no_envelope(event: KinesisDataStreamModel, _: LambdaContext):
+ records = event.Records
+ assert len(records) == 2
+ record: KinesisDataStreamModel = records[0]
+
+ assert record.awsRegion == "us-east-2"
+ assert record.eventID == "shardId-000000000006:49590338271490256608559692538361571095921575989136588898"
+ assert record.eventName == "aws:kinesis:record"
+ assert record.eventSource == "aws:kinesis"
+ assert record.eventSourceARN == "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
+ assert record.eventVersion == "1.0"
+ assert record.invokeIdentityArn == "arn:aws:iam::123456789012:role/lambda-role"
+
+ kinesis: KinesisDataStreamRecordPayload = record.kinesis
+ assert kinesis.approximateArrivalTimestamp == 1545084650.987
+ assert kinesis.kinesisSchemaVersion == "1.0"
+ assert kinesis.partitionKey == "1"
+ assert kinesis.sequenceNumber == 49590338271490256608559692538361571095921575989136588898
+ assert kinesis.data == b"Hello, this is a test."
+
+
+def test_kinesis_trigger_event():
+ event_dict = {
+ "Records": [
+ {
+ "kinesis": {
+ "kinesisSchemaVersion": "1.0",
+ "partitionKey": "1",
+ "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
+ "data": "eyJtZXNzYWdlIjogInRlc3QgbWVzc2FnZSIsICJ1c2VybmFtZSI6ICJ0ZXN0In0=",
+ "approximateArrivalTimestamp": 1545084650.987,
+ },
+ "eventSource": "aws:kinesis",
+ "eventVersion": "1.0",
+ "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
+ "eventName": "aws:kinesis:record",
+ "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
+ "awsRegion": "us-east-2",
+ "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream",
+ }
+ ]
+ }
+
+ handle_kinesis(event_dict, LambdaContext())
+
+
+def test_kinesis_trigger_bad_base64_event():
+ event_dict = {
+ "Records": [
+ {
+ "kinesis": {
+ "kinesisSchemaVersion": "1.0",
+ "partitionKey": "1",
+ "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
+ "data": "bad",
+ "approximateArrivalTimestamp": 1545084650.987,
+ },
+ "eventSource": "aws:kinesis",
+ "eventVersion": "1.0",
+ "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
+ "eventName": "aws:kinesis:record",
+ "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
+ "awsRegion": "us-east-2",
+ "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream",
+ }
+ ]
+ }
+ with pytest.raises(ValidationError):
+ handle_kinesis_no_envelope(event_dict, LambdaContext())
+
+
+def test_kinesis_trigger_event_no_envelope():
+ event_dict = load_event("kinesisStreamEvent.json")
+ handle_kinesis_no_envelope(event_dict, LambdaContext())
+
+
+def test_validate_event_does_not_conform_with_model_no_envelope():
+ event_dict: Any = {"hello": "s"}
+ with pytest.raises(ValidationError):
+ handle_kinesis_no_envelope(event_dict, LambdaContext())
+
+
+def test_validate_event_does_not_conform_with_model():
+ event_dict: Any = {"hello": "s"}
+ with pytest.raises(ValidationError):
+ handle_kinesis(event_dict, LambdaContext())