Skip to content

Commit b9ec28d

Browse files
committed
* 'develop' of https://github.com/awslabs/aws-lambda-powertools-python: fix: s3 model import docs: add Kinesis Streams as a supported model & envelope cr fixes feat: Add Kinesis lambda event support to Parser utility feat: Add Kinesis lambda event support to Parser utility
2 parents 06d09a7 + ee18f83 commit b9ec28d

File tree

7 files changed

+203
-0
lines changed

7 files changed

+203
-0
lines changed

aws_lambda_powertools/utilities/parser/envelopes/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,15 @@
22
from .cloudwatch import CloudWatchLogsEnvelope
33
from .dynamodb import DynamoDBStreamEnvelope
44
from .event_bridge import EventBridgeEnvelope
5+
from .kinesis import KinesisDataStreamEnvelope
56
from .sns import SnsEnvelope
67
from .sqs import SqsEnvelope
78

89
__all__ = [
910
"CloudWatchLogsEnvelope",
1011
"DynamoDBStreamEnvelope",
1112
"EventBridgeEnvelope",
13+
"KinesisDataStreamEnvelope",
1214
"SnsEnvelope",
1315
"SqsEnvelope",
1416
"BaseEnvelope",
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import logging
2+
from typing import Any, Dict, List, Optional, Union
3+
4+
from ..models import KinesisDataStreamModel
5+
from ..types import Model
6+
from .base import BaseEnvelope
7+
8+
logger = logging.getLogger(__name__)
9+
10+
11+
class KinesisDataStreamEnvelope(BaseEnvelope):
12+
"""Kinesis Data Stream Envelope to extract array of Records
13+
14+
The record's data parameter is a base64 encoded string which is parsed into a bytes array,
15+
though it can also be a JSON encoded string.
16+
Regardless of its type it'll be parsed into a BaseModel object.
17+
18+
Note: Records will be parsed the same way so if model is str,
19+
all items in the list will be parsed as str and npt as JSON (and vice versa)
20+
"""
21+
22+
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Model) -> List[Optional[Model]]:
23+
"""Parses records found with model provided
24+
25+
Parameters
26+
----------
27+
data : Dict
28+
Lambda event to be parsed
29+
model : Model
30+
Data model provided to parse after extracting data using envelope
31+
32+
Returns
33+
-------
34+
List
35+
List of records parsed with model provided
36+
"""
37+
logger.debug(f"Parsing incoming data with Kinesis model {KinesisDataStreamModel}")
38+
parsed_envelope: KinesisDataStreamModel = KinesisDataStreamModel.parse_obj(data)
39+
output = []
40+
logger.debug(f"Parsing Kinesis records in `body` with {model}")
41+
for record in parsed_envelope.Records:
42+
output.append(self._parse(data=record.kinesis.data.decode("utf-8"), model=model))
43+
return output

aws_lambda_powertools/utilities/parser/models/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from .cloudwatch import CloudWatchLogsData, CloudWatchLogsDecode, CloudWatchLogsLogEvent, CloudWatchLogsModel
33
from .dynamodb import DynamoDBStreamChangedRecordModel, DynamoDBStreamModel, DynamoDBStreamRecordModel
44
from .event_bridge import EventBridgeModel
5+
from .kinesis import KinesisDataStreamModel, KinesisDataStreamRecord, KinesisDataStreamRecordPayload
56
from .s3 import S3Model, S3RecordModel
67
from .ses import SesModel, SesRecordModel
78
from .sns import SnsModel, SnsNotificationModel, SnsRecordModel
@@ -19,6 +20,9 @@
1920
"EventBridgeModel",
2021
"DynamoDBStreamChangedRecordModel",
2122
"DynamoDBStreamRecordModel",
23+
"KinesisDataStreamModel",
24+
"KinesisDataStreamRecord",
25+
"KinesisDataStreamRecordPayload",
2226
"S3Model",
2327
"S3RecordModel",
2428
"SesModel",
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import base64
2+
import logging
3+
from binascii import Error as BinAsciiError
4+
from typing import List
5+
6+
from pydantic import BaseModel, validator
7+
from pydantic.types import PositiveInt
8+
from typing_extensions import Literal
9+
10+
logger = logging.getLogger(__name__)
11+
12+
13+
class KinesisDataStreamRecordPayload(BaseModel):
14+
kinesisSchemaVersion: str
15+
partitionKey: str
16+
sequenceNumber: PositiveInt
17+
data: bytes # base64 encoded str is parsed into bytes
18+
approximateArrivalTimestamp: float
19+
20+
@validator("data", pre=True)
21+
def data_base64_decode(cls, value):
22+
try:
23+
logger.debug("Decoding base64 Kinesis data record before parsing")
24+
return base64.b64decode(value)
25+
except (BinAsciiError, TypeError):
26+
raise ValueError("base64 decode failed")
27+
28+
29+
class KinesisDataStreamRecord(BaseModel):
30+
eventSource: Literal["aws:kinesis"]
31+
eventVersion: str
32+
eventID: str
33+
eventName: Literal["aws:kinesis:record"]
34+
invokeIdentityArn: str
35+
awsRegion: str
36+
eventSourceARN: str
37+
kinesis: KinesisDataStreamRecordPayload
38+
39+
40+
class KinesisDataStreamModel(BaseModel):
41+
Records: List[KinesisDataStreamRecord]

docs/content/utilities/parser.mdx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ Model name | Description
159159
**AlbModel** | Lambda Event Source payload for Amazon Application Load Balancer
160160
**CloudwatchLogsModel** | Lambda Event Source payload for Amazon CloudWatch Logs
161161
**S3Model** | Lambda Event Source payload for Amazon S3
162+
**KinesisDataStreamModel** | Lambda Event Source payload for Amazon Kinesis Data Streams
162163

163164
You can extend them to include your own models, and yet have all other known fields parsed along the way.
164165

@@ -296,6 +297,7 @@ Envelope name | Behaviour | Return
296297
**EventBridgeEnvelope** | 1. Parses data using `EventBridgeModel`. <br/> 2. Parses `detail` key using your model and returns it. | `Model`
297298
**SqsEnvelope** | 1. Parses data using `SqsModel`. <br/> 2. Parses records in `body` key using your model and return them in a list. | `List[Model]`
298299
**CloudWatchLogsEnvelope** | 1. Parses data using `CloudwatchLogsModel` which will base64 decode and decompress it. <br/> 2. Parses records in `message` key using your model and return them in a list. | `List[Model]`
300+
**KinesisDataStreamEnvelope** | 1. Parses data using `KinesisDataStreamModel` which will base64 decode it. <br/> 2. Parses records in in `Records` key using your model and returns them in a list. | `List[Model]`
299301

300302
### Bringing your own envelope
301303

tests/functional/parser/schemas.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,11 @@ class MyAdvancedSnsBusiness(SnsModel):
7373
Records: List[MyAdvancedSnsRecordModel]
7474

7575

76+
class MyKinesisBusiness(BaseModel):
77+
message: str
78+
username: str
79+
80+
7681
class MyCloudWatchBusiness(BaseModel):
7782
my_message: str
7883
user: str
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
from typing import Any, List
2+
3+
import pytest
4+
5+
from aws_lambda_powertools.utilities.parser import ValidationError, envelopes, event_parser
6+
from aws_lambda_powertools.utilities.parser.models import KinesisDataStreamModel, KinesisDataStreamRecordPayload
7+
from aws_lambda_powertools.utilities.typing import LambdaContext
8+
from tests.functional.parser.schemas import MyKinesisBusiness
9+
from tests.functional.parser.utils import load_event
10+
11+
12+
@event_parser(model=MyKinesisBusiness, envelope=envelopes.KinesisDataStreamEnvelope)
13+
def handle_kinesis(event: List[MyKinesisBusiness], _: LambdaContext):
14+
assert len(event) == 1
15+
record: KinesisDataStreamModel = event[0]
16+
assert record.message == "test message"
17+
assert record.username == "test"
18+
19+
20+
@event_parser(model=KinesisDataStreamModel)
21+
def handle_kinesis_no_envelope(event: KinesisDataStreamModel, _: LambdaContext):
22+
records = event.Records
23+
assert len(records) == 2
24+
record: KinesisDataStreamModel = records[0]
25+
26+
assert record.awsRegion == "us-east-2"
27+
assert record.eventID == "shardId-000000000006:49590338271490256608559692538361571095921575989136588898"
28+
assert record.eventName == "aws:kinesis:record"
29+
assert record.eventSource == "aws:kinesis"
30+
assert record.eventSourceARN == "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
31+
assert record.eventVersion == "1.0"
32+
assert record.invokeIdentityArn == "arn:aws:iam::123456789012:role/lambda-role"
33+
34+
kinesis: KinesisDataStreamRecordPayload = record.kinesis
35+
assert kinesis.approximateArrivalTimestamp == 1545084650.987
36+
assert kinesis.kinesisSchemaVersion == "1.0"
37+
assert kinesis.partitionKey == "1"
38+
assert kinesis.sequenceNumber == 49590338271490256608559692538361571095921575989136588898
39+
assert kinesis.data == b"Hello, this is a test."
40+
41+
42+
def test_kinesis_trigger_event():
43+
event_dict = {
44+
"Records": [
45+
{
46+
"kinesis": {
47+
"kinesisSchemaVersion": "1.0",
48+
"partitionKey": "1",
49+
"sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
50+
"data": "eyJtZXNzYWdlIjogInRlc3QgbWVzc2FnZSIsICJ1c2VybmFtZSI6ICJ0ZXN0In0=",
51+
"approximateArrivalTimestamp": 1545084650.987,
52+
},
53+
"eventSource": "aws:kinesis",
54+
"eventVersion": "1.0",
55+
"eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
56+
"eventName": "aws:kinesis:record",
57+
"invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
58+
"awsRegion": "us-east-2",
59+
"eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream",
60+
}
61+
]
62+
}
63+
64+
handle_kinesis(event_dict, LambdaContext())
65+
66+
67+
def test_kinesis_trigger_bad_base64_event():
68+
event_dict = {
69+
"Records": [
70+
{
71+
"kinesis": {
72+
"kinesisSchemaVersion": "1.0",
73+
"partitionKey": "1",
74+
"sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
75+
"data": "bad",
76+
"approximateArrivalTimestamp": 1545084650.987,
77+
},
78+
"eventSource": "aws:kinesis",
79+
"eventVersion": "1.0",
80+
"eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
81+
"eventName": "aws:kinesis:record",
82+
"invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
83+
"awsRegion": "us-east-2",
84+
"eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream",
85+
}
86+
]
87+
}
88+
with pytest.raises(ValidationError):
89+
handle_kinesis_no_envelope(event_dict, LambdaContext())
90+
91+
92+
def test_kinesis_trigger_event_no_envelope():
93+
event_dict = load_event("kinesisStreamEvent.json")
94+
handle_kinesis_no_envelope(event_dict, LambdaContext())
95+
96+
97+
def test_validate_event_does_not_conform_with_model_no_envelope():
98+
event_dict: Any = {"hello": "s"}
99+
with pytest.raises(ValidationError):
100+
handle_kinesis_no_envelope(event_dict, LambdaContext())
101+
102+
103+
def test_validate_event_does_not_conform_with_model():
104+
event_dict: Any = {"hello": "s"}
105+
with pytest.raises(ValidationError):
106+
handle_kinesis(event_dict, LambdaContext())

0 commit comments

Comments
 (0)