Skip to content

feat: Add Kinesis lambda event support to Parser utility #227

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Dec 4, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion aws_lambda_powertools/utilities/parser/envelopes/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
from .base import BaseEnvelope
from .dynamodb import DynamoDBStreamEnvelope
from .event_bridge import EventBridgeEnvelope
from .kinesis import KinesisEnvelope
from .sns import SnsEnvelope
from .sqs import SqsEnvelope

__all__ = ["DynamoDBStreamEnvelope", "EventBridgeEnvelope", "SnsEnvelope", "SqsEnvelope", "BaseEnvelope"]
__all__ = [
"DynamoDBStreamEnvelope",
"EventBridgeEnvelope",
"KinesisEnvelope",
"SnsEnvelope",
"SqsEnvelope",
"BaseEnvelope",
]
43 changes: 43 additions & 0 deletions aws_lambda_powertools/utilities/parser/envelopes/kinesis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import logging
from typing import Any, Dict, List, Optional, Union

from ..models import KinesisStreamModel
from ..types import Model
from .base import BaseEnvelope

logger = logging.getLogger(__name__)


class KinesisEnvelope(BaseEnvelope):
"""Kinesis Envelope to extract array of Records

The record's data parameter is a base64 encoded string which is parsed into a bytes array,
though it can also be a JSON encoded string.
Regardless of its type it'll be parsed into a BaseModel object.

Note: Records will be parsed the same way so if model is str,
all items in the list will be parsed as str and npt as JSON (and vice versa)
"""

def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Model) -> List[Optional[Model]]:
"""Parses records found with model provided

Parameters
----------
data : Dict
Lambda event to be parsed
model : Model
Data model provided to parse after extracting data using envelope

Returns
-------
List
List of records parsed with model provided
"""
logger.debug(f"Parsing incoming data with Kinesis model {KinesisStreamModel}")
parsed_envelope: KinesisStreamModel = KinesisStreamModel.parse_obj(data)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
parsed_envelope: KinesisStreamModel = KinesisStreamModel.parse_obj(data)
parsed_envelope: KinesisDataStreamModel = KinesisDataStreamModel.parse_obj(data)

output = []
logger.debug(f"Parsing Kinesis records in `body` with {model}")
for record in parsed_envelope.Records:
output.append(self._parse(data=record.kinesis.data.decode("utf-8"), model=model))
return output
4 changes: 4 additions & 0 deletions aws_lambda_powertools/utilities/parser/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from .dynamodb import DynamoDBStreamChangedRecordModel, DynamoDBStreamModel, DynamoDBStreamRecordModel
from .event_bridge import EventBridgeModel
from .kinesis import KinesisStreamModel, KinesisStreamRecord, KinesisStreamRecordPayload
from .ses import SesModel, SesRecordModel
from .sns import SnsModel, SnsNotificationModel, SnsRecordModel
from .sqs import SqsModel, SqsRecordModel
Expand All @@ -9,6 +10,9 @@
"EventBridgeModel",
"DynamoDBStreamChangedRecordModel",
"DynamoDBStreamRecordModel",
"KinesisStreamModel",
"KinesisStreamRecord",
"KinesisStreamRecordPayload",
"SesModel",
"SesRecordModel",
"SnsModel",
Expand Down
37 changes: 37 additions & 0 deletions aws_lambda_powertools/utilities/parser/models/kinesis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import base64
from binascii import Error as BinAsciiError
from typing import List

from pydantic import BaseModel, validator
from pydantic.types import PositiveInt
from typing_extensions import Literal


class KinesisStreamRecordPayload(BaseModel):
kinesisSchemaVersion: str
partitionKey: str
sequenceNumber: PositiveInt
data: bytes # base64 encoded str is parsed into bytes
approximateArrivalTimestamp: float

@validator("data", pre=True)
def data_base64_decode(cls, value):
try:
return base64.b64decode(value)
except (BinAsciiError, TypeError):
raise ValueError("base64 decode failed")


class KinesisStreamRecord(BaseModel):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
class KinesisStreamRecord(BaseModel):
class KinesisDataStreamRecord(BaseModel):

eventSource: Literal["aws:kinesis"]
eventVersion: str
eventID: str
eventName: Literal["aws:kinesis:record"]
invokeIdentityArn: str
awsRegion: str
eventSourceARN: str
kinesis: KinesisStreamRecordPayload
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
kinesis: KinesisStreamRecordPayload
kinesis: KinesisDataStreamRecordPayload



class KinesisStreamModel(BaseModel):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
class KinesisStreamModel(BaseModel):
class KinesisDataStreamModel(BaseModel):

Records: List[KinesisStreamRecord]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Records: List[KinesisStreamRecord]
Records: List[KinesisDataStreamRecord]

5 changes: 5 additions & 0 deletions tests/functional/parser/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,8 @@ class MyAdvancedSnsRecordModel(SnsRecordModel):

class MyAdvancedSnsBusiness(SnsModel):
Records: List[MyAdvancedSnsRecordModel]


class MyKinesisBusiness(BaseModel):
message: str
username: str
106 changes: 106 additions & 0 deletions tests/functional/parser/test_kinesis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
from typing import Any, List

import pytest

from aws_lambda_powertools.utilities.parser import ValidationError, envelopes, event_parser
from aws_lambda_powertools.utilities.parser.models import KinesisStreamModel, KinesisStreamRecordPayload
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
from aws_lambda_powertools.utilities.parser.models import KinesisStreamModel, KinesisStreamRecordPayload
from aws_lambda_powertools.utilities.parser.models import KinesisDataStreamModel, KinesisDataStreamRecordPayload

from aws_lambda_powertools.utilities.typing import LambdaContext
from tests.functional.parser.schemas import MyKinesisBusiness
from tests.functional.parser.utils import load_event


@event_parser(model=MyKinesisBusiness, envelope=envelopes.KinesisEnvelope)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@event_parser(model=MyKinesisBusiness, envelope=envelopes.KinesisEnvelope)
@event_parser(model=MyKinesisBusiness, envelope=envelopes.KinesisDataStreamEnvelope)

def handle_kinesis(event: List[MyKinesisBusiness], _: LambdaContext):
assert len(event) == 1
record: KinesisStreamModel = event[0]
assert record.message == "test message"
assert record.username == "test"


@event_parser(model=KinesisStreamModel)
def handle_kinesis_no_envelope(event: KinesisStreamModel, _: LambdaContext):
records = event.Records
assert len(records) == 2
record: KinesisStreamModel = records[0]

assert record.awsRegion == "us-east-2"
assert record.eventID == "shardId-000000000006:49590338271490256608559692538361571095921575989136588898"
assert record.eventName == "aws:kinesis:record"
assert record.eventSource == "aws:kinesis"
assert record.eventSourceARN == "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
assert record.eventVersion == "1.0"
assert record.invokeIdentityArn == "arn:aws:iam::123456789012:role/lambda-role"

kinesis: KinesisStreamRecordPayload = record.kinesis
assert kinesis.approximateArrivalTimestamp == 1545084650.987
assert kinesis.kinesisSchemaVersion == "1.0"
assert kinesis.partitionKey == "1"
assert kinesis.sequenceNumber == 49590338271490256608559692538361571095921575989136588898
assert kinesis.data == b"Hello, this is a test."


def test_kinesis_trigger_event():
event_dict = {
"Records": [
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "1",
"sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
"data": "eyJtZXNzYWdlIjogInRlc3QgbWVzc2FnZSIsICJ1c2VybmFtZSI6ICJ0ZXN0In0=",
"approximateArrivalTimestamp": 1545084650.987,
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
"awsRegion": "us-east-2",
"eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream",
}
]
}

handle_kinesis(event_dict, LambdaContext())


def test_kinesis_trigger_bad_base64_event():
event_dict = {
"Records": [
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "1",
"sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
"data": "bad",
"approximateArrivalTimestamp": 1545084650.987,
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
"awsRegion": "us-east-2",
"eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream",
}
]
}
with pytest.raises(ValidationError):
handle_kinesis_no_envelope(event_dict, LambdaContext())


def test_kinesis_trigger_event_no_envelope():
event_dict = load_event("kinesisStreamEvent.json")
handle_kinesis_no_envelope(event_dict, LambdaContext())


def test_validate_event_does_not_conform_with_model_no_envelope():
event_dict: Any = {"hello": "s"}
with pytest.raises(ValidationError):
handle_kinesis_no_envelope(event_dict, LambdaContext())


def test_validate_event_does_not_conform_with_model():
event_dict: Any = {"hello": "s"}
with pytest.raises(ValidationError):
handle_kinesis(event_dict, LambdaContext())