From 229bd6fb12fd294631da830e4954fa9b7c7f08bb Mon Sep 17 00:00:00 2001 From: Ran Isenberg Date: Thu, 29 Sep 2022 19:25:41 +0300 Subject: [PATCH 1/3] Feature request: Kinesis Data Firehose event envelope and parser model --- aws_lambda_powertools/shared/functions.py | 2 +- .../utilities/parser/envelopes/__init__.py | 2 + .../utilities/parser/envelopes/kinesis.py | 2 +- .../parser/envelopes/kinesis_firehose.py | 47 ++++++++++++ .../utilities/parser/models/__init__.py | 4 + .../utilities/parser/models/kinesis.py | 12 +-- .../parser/models/kinesis_firehose.py | 33 +++++++++ docs/utilities/parser.md | 2 + ...xtending_built_in_models_with_json_mypy.py | 2 +- tests/events/kinesisFirehoseEvent.json | 31 ++++++++ tests/functional/parser/schemas.py | 4 + .../parser/test_kinesis firehose.py | 74 +++++++++++++++++++ 12 files changed, 202 insertions(+), 13 deletions(-) create mode 100644 aws_lambda_powertools/utilities/parser/envelopes/kinesis_firehose.py create mode 100644 aws_lambda_powertools/utilities/parser/models/kinesis_firehose.py create mode 100644 tests/events/kinesisFirehoseEvent.json create mode 100644 tests/functional/parser/test_kinesis firehose.py diff --git a/aws_lambda_powertools/shared/functions.py b/aws_lambda_powertools/shared/functions.py index e9bc3521125..03ffa3c7cbc 100644 --- a/aws_lambda_powertools/shared/functions.py +++ b/aws_lambda_powertools/shared/functions.py @@ -67,7 +67,7 @@ def resolve_env_var_choice( def base64_decode(value: str) -> bytes: try: - logger.debug("Decoding base64 Kafka record item before parsing") + logger.debug("Decoding base64 record item before parsing") return base64.b64decode(value) except (BinAsciiError, TypeError): raise ValueError("base64 decode failed") diff --git a/aws_lambda_powertools/utilities/parser/envelopes/__init__.py b/aws_lambda_powertools/utilities/parser/envelopes/__init__.py index 4b0e4c943a2..0f985f29d88 100644 --- a/aws_lambda_powertools/utilities/parser/envelopes/__init__.py +++ b/aws_lambda_powertools/utilities/parser/envelopes/__init__.py @@ -6,6 +6,7 @@ from .event_bridge import EventBridgeEnvelope from .kafka import KafkaEnvelope from .kinesis import KinesisDataStreamEnvelope +from .kinesis_firehose import KinesisFirehoseEnvelope from .lambda_function_url import LambdaFunctionUrlEnvelope from .sns import SnsEnvelope, SnsSqsEnvelope from .sqs import SqsEnvelope @@ -17,6 +18,7 @@ "DynamoDBStreamEnvelope", "EventBridgeEnvelope", "KinesisDataStreamEnvelope", + "KinesisFirehoseEnvelope", "LambdaFunctionUrlEnvelope", "SnsEnvelope", "SnsSqsEnvelope", diff --git a/aws_lambda_powertools/utilities/parser/envelopes/kinesis.py b/aws_lambda_powertools/utilities/parser/envelopes/kinesis.py index 9ff221a7b7b..24104ebd40c 100644 --- a/aws_lambda_powertools/utilities/parser/envelopes/kinesis.py +++ b/aws_lambda_powertools/utilities/parser/envelopes/kinesis.py @@ -16,7 +16,7 @@ class KinesisDataStreamEnvelope(BaseEnvelope): Regardless of its type it'll be parsed into a BaseModel object. Note: Records will be parsed the same way so if model is str, - all items in the list will be parsed as str and npt as JSON (and vice versa) + all items in the list will be parsed as str and not as JSON (and vice versa) """ def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]: diff --git a/aws_lambda_powertools/utilities/parser/envelopes/kinesis_firehose.py b/aws_lambda_powertools/utilities/parser/envelopes/kinesis_firehose.py new file mode 100644 index 00000000000..c8dd936512c --- /dev/null +++ b/aws_lambda_powertools/utilities/parser/envelopes/kinesis_firehose.py @@ -0,0 +1,47 @@ +import logging +from typing import Any, Dict, List, Optional, Type, Union, cast + +from ..models import KinesisFirehoseModel +from ..types import Model +from .base import BaseEnvelope + +logger = logging.getLogger(__name__) + + +class KinesisFirehoseEnvelope(BaseEnvelope): + """Kinesis Firehose Envelope to extract array of Records + + The record's data parameter is a base64 encoded string which is parsed into a bytes array, + though it can also be a JSON encoded string. + Regardless of its type it'll be parsed into a BaseModel object. + + Note: Records will be parsed the same way so if model is str, + all items in the list will be parsed as str and not as JSON (and vice versa) + + https://docs.aws.amazon.com/lambda/latest/dg/services-kinesisfirehose.html + """ + + def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]: + """Parses records found with model provided + + Parameters + ---------- + data : Dict + Lambda event to be parsed + model : Type[Model] + Data model provided to parse after extracting data using envelope + + Returns + ------- + List + List of records parsed with model provided + """ + logger.debug(f"Parsing incoming data with Kinesis Firehose model {KinesisFirehoseModel}") + parsed_envelope: KinesisFirehoseModel = KinesisFirehoseModel.parse_obj(data) + logger.debug(f"Parsing Kinesis Firehose records in `body` with {model}") + models = [] + for record in parsed_envelope.records: + # We allow either AWS expected contract (bytes) or a custom Model, see #943 + data = cast(bytes, record.data) + models.append(self._parse(data=data.decode("utf-8"), model=model)) + return models diff --git a/aws_lambda_powertools/utilities/parser/models/__init__.py b/aws_lambda_powertools/utilities/parser/models/__init__.py index 6d403019181..c516318e246 100644 --- a/aws_lambda_powertools/utilities/parser/models/__init__.py +++ b/aws_lambda_powertools/utilities/parser/models/__init__.py @@ -19,6 +19,7 @@ from .event_bridge import EventBridgeModel from .kafka import KafkaBaseEventModel, KafkaMskEventModel, KafkaRecordModel, KafkaSelfManagedEventModel from .kinesis import KinesisDataStreamModel, KinesisDataStreamRecord, KinesisDataStreamRecordPayload +from .kinesis_firehose import KinesisFirehoseModel, KinesisFirehoseRecord, KinesisFirehoseRecordMetadata from .lambda_function_url import LambdaFunctionUrlModel from .s3 import S3Model, S3RecordModel from .s3_object_event import ( @@ -68,6 +69,9 @@ "KinesisDataStreamModel", "KinesisDataStreamRecord", "KinesisDataStreamRecordPayload", + "KinesisFirehoseModel", + "KinesisFirehoseRecord", + "KinesisFirehoseRecordMetadata", "LambdaFunctionUrlModel", "S3Model", "S3RecordModel", diff --git a/aws_lambda_powertools/utilities/parser/models/kinesis.py b/aws_lambda_powertools/utilities/parser/models/kinesis.py index be868ca44ba..ffc89bcbdaa 100644 --- a/aws_lambda_powertools/utilities/parser/models/kinesis.py +++ b/aws_lambda_powertools/utilities/parser/models/kinesis.py @@ -1,14 +1,10 @@ -import base64 -import logging -from binascii import Error as BinAsciiError from typing import List, Type, Union from pydantic import BaseModel, validator +from aws_lambda_powertools.shared.functions import base64_decode from aws_lambda_powertools.utilities.parser.types import Literal -logger = logging.getLogger(__name__) - class KinesisDataStreamRecordPayload(BaseModel): kinesisSchemaVersion: str @@ -19,11 +15,7 @@ class KinesisDataStreamRecordPayload(BaseModel): @validator("data", pre=True, allow_reuse=True) def data_base64_decode(cls, value): - try: - logger.debug("Decoding base64 Kinesis data record before parsing") - return base64.b64decode(value) - except (BinAsciiError, TypeError): - raise ValueError("base64 decode failed") + return base64_decode(value) class KinesisDataStreamRecord(BaseModel): diff --git a/aws_lambda_powertools/utilities/parser/models/kinesis_firehose.py b/aws_lambda_powertools/utilities/parser/models/kinesis_firehose.py new file mode 100644 index 00000000000..3508738e55e --- /dev/null +++ b/aws_lambda_powertools/utilities/parser/models/kinesis_firehose.py @@ -0,0 +1,33 @@ +from datetime import datetime +from typing import List, Optional, Type, Union + +from pydantic import BaseModel, validator + +from aws_lambda_powertools.shared.functions import base64_decode + + +class KinesisFirehoseRecordMetadata(BaseModel): + shardId: str + partitionKey: str + approximateArrivalTimestamp: str + sequenceNumber: str + subsequenceNumber: str + + +class KinesisFirehoseRecord(BaseModel): + data: Union[bytes, Type[BaseModel]] # base64 encoded str is parsed into bytes + recordId: str + approximateArrivalTimestamp: datetime + kinesisRecordMetadata: Optional[KinesisFirehoseRecordMetadata] + + @validator("data", pre=True, allow_reuse=True) + def data_base64_decode(cls, value): + return base64_decode(value) + + +class KinesisFirehoseModel(BaseModel): + invocationId: str + deliveryStreamArn: str + region: str + sourceKinesisStreamArn: Optional[str] + records: List[KinesisFirehoseRecord] diff --git a/docs/utilities/parser.md b/docs/utilities/parser.md index cdcb949d28a..48c244c8df2 100644 --- a/docs/utilities/parser.md +++ b/docs/utilities/parser.md @@ -163,6 +163,7 @@ Parser comes with the following built-in models: | **S3Model** | Lambda Event Source payload for Amazon S3 | | **S3ObjectLambdaEvent** | Lambda Event Source payload for Amazon S3 Object Lambda | | **KinesisDataStreamModel** | Lambda Event Source payload for Amazon Kinesis Data Streams | +| **KinesisFirehoseModel** | Lambda Event Source payload for Amazon Kinesis Firehose | | **SesModel** | Lambda Event Source payload for Amazon Simple Email Service | | **SnsModel** | Lambda Event Source payload for Amazon Simple Notification Service | | **APIGatewayProxyEventModel** | Lambda Event Source payload for Amazon API Gateway | @@ -319,6 +320,7 @@ Parser comes with the following built-in envelopes, where `Model` in the return | **SqsEnvelope** | 1. Parses data using `SqsModel`.
2. Parses records in `body` key using your model and return them in a list. | `List[Model]` | | **CloudWatchLogsEnvelope** | 1. Parses data using `CloudwatchLogsModel` which will base64 decode and decompress it.
2. Parses records in `message` key using your model and return them in a list. | `List[Model]` | | **KinesisDataStreamEnvelope** | 1. Parses data using `KinesisDataStreamModel` which will base64 decode it.
2. Parses records in in `Records` key using your model and returns them in a list. | `List[Model]` | +| **KinesisFirehoseEnvelope** | 1. Parses data using `KinesisFirehoseModel` which will base64 decode it.
2. Parses records in in `Records` key using your model and returns them in a list. | `List[Model]` | | **SnsEnvelope** | 1. Parses data using `SnsModel`.
2. Parses records in `body` key using your model and return them in a list. | `List[Model]` | | **SnsSqsEnvelope** | 1. Parses data using `SqsModel`.
2. Parses SNS records in `body` key using `SnsNotificationModel`.
3. Parses data in `Message` key using your model and return them in a list. | `List[Model]` | | **ApiGatewayEnvelope** | 1. Parses data using `APIGatewayProxyEventModel`.
2. Parses `body` key using your model and returns it. | `Model` | diff --git a/examples/parser/src/extending_built_in_models_with_json_mypy.py b/examples/parser/src/extending_built_in_models_with_json_mypy.py index 43e04143347..80314a814ce 100644 --- a/examples/parser/src/extending_built_in_models_with_json_mypy.py +++ b/examples/parser/src/extending_built_in_models_with_json_mypy.py @@ -11,7 +11,7 @@ class CancelOrder(BaseModel): class CancelOrderModel(APIGatewayProxyEventV2Model): - body: Json[CancelOrder] # type: ignore[assignment] + body: Json[CancelOrder] # type: ignore[type-arg] @event_parser(model=CancelOrderModel) diff --git a/tests/events/kinesisFirehoseEvent.json b/tests/events/kinesisFirehoseEvent.json new file mode 100644 index 00000000000..a428a222a3d --- /dev/null +++ b/tests/events/kinesisFirehoseEvent.json @@ -0,0 +1,31 @@ +{ + "invocationId": "2b4d1ad9-2f48-94bd-a088-767c317e994a", + "deliveryStreamArn": "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name", + "region": "us-east-2", + "records": [ + { + "data": "SGVsbG8gV29ybGQ=", + "recordId": "record1", + "approximateArrivalTimestamp": 1510772160000, + "kinesisRecordMetadata": { + "shardId": "shardId-000000000000", + "partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c317a", + "approximateArrivalTimestamp": "2012-04-23T18:25:43.511Z", + "sequenceNumber": "49546986683135544286507457936321625675700192471156785154", + "subsequenceNumber": "" + } + }, + { + "data": "eyJIZWxsbyI6ICJXb3JsZCJ9", + "recordId": "record2", + "approximateArrivalTimestamp": 151077216000, + "kinesisRecordMetadata": { + "shardId": "shardId-000000000001", + "partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c318a", + "approximateArrivalTimestamp": "2012-04-23T19:25:43.511Z", + "sequenceNumber": "49546986683135544286507457936321625675700192471156785155", + "subsequenceNumber": "" + } + } + ] +} \ No newline at end of file diff --git a/tests/functional/parser/schemas.py b/tests/functional/parser/schemas.py index b1b66c63379..907deb40aa0 100644 --- a/tests/functional/parser/schemas.py +++ b/tests/functional/parser/schemas.py @@ -95,3 +95,7 @@ class MyALambdaFuncUrlBusiness(BaseModel): class MyLambdaKafkaBusiness(BaseModel): key: str + + +class MyKinesisFirehoseBusiness(BaseModel): + Hello: str diff --git a/tests/functional/parser/test_kinesis firehose.py b/tests/functional/parser/test_kinesis firehose.py new file mode 100644 index 00000000000..e8ed0c8f933 --- /dev/null +++ b/tests/functional/parser/test_kinesis firehose.py @@ -0,0 +1,74 @@ +from typing import List + +import pytest + +from aws_lambda_powertools.utilities.parser import ValidationError, envelopes, event_parser +from aws_lambda_powertools.utilities.parser.models import ( + KinesisFirehoseModel, + KinesisFirehoseRecord, + KinesisFirehoseRecordMetadata, +) +from aws_lambda_powertools.utilities.typing import LambdaContext +from tests.functional.parser.schemas import MyKinesisFirehoseBusiness +from tests.functional.utils import load_event + + +@event_parser(model=MyKinesisFirehoseBusiness, envelope=envelopes.KinesisFirehoseEnvelope) +def handle_kinesis(event: List[MyKinesisFirehoseBusiness], _: LambdaContext): + assert len(event) == 1 + assert event[0].Hello == "World" + + +@event_parser(model=KinesisFirehoseModel) +def handle_kinesis_no_envelope(event: KinesisFirehoseModel, _: LambdaContext): + assert event.region == "us-east-2" + assert event.invocationId == "2b4d1ad9-2f48-94bd-a088-767c317e994a" + assert event.deliveryStreamArn == "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name" + assert event.sourceKinesisStreamArn is None + + records = list(event.records) + assert len(records) == 2 + record_01: KinesisFirehoseRecord = records[0] + + convert_time = int(round(record_01.approximateArrivalTimestamp.timestamp() * 1000)) + assert convert_time == 1510772160000 + assert record_01.recordId == "record1" + assert record_01.data == b"Hello World" + + metadata_01: KinesisFirehoseRecordMetadata = record_01.kinesisRecordMetadata + assert metadata_01.partitionKey == "4d1ad2b9-24f8-4b9d-a088-76e9947c317a" + assert metadata_01.subsequenceNumber == "" + assert metadata_01.shardId == "shardId-000000000000" + assert metadata_01.approximateArrivalTimestamp == "2012-04-23T18:25:43.511Z" + assert metadata_01.sequenceNumber == "49546986683135544286507457936321625675700192471156785154" + + record_02: KinesisFirehoseRecord = records[1] + convert_time = int(round(record_02.approximateArrivalTimestamp.timestamp() * 1000)) + assert convert_time == 151077216000 + assert record_02.recordId == "record2" + assert record_02.data == b'{"Hello": "World"}' + + metadata_02: KinesisFirehoseRecordMetadata = record_02.kinesisRecordMetadata + assert metadata_02.partitionKey == "4d1ad2b9-24f8-4b9d-a088-76e9947c318a" + assert metadata_02.subsequenceNumber == "" + assert metadata_02.shardId == "shardId-000000000001" + assert metadata_02.approximateArrivalTimestamp == "2012-04-23T19:25:43.511Z" + assert metadata_02.sequenceNumber == "49546986683135544286507457936321625675700192471156785155" + + +def test_kinesis_trigger_event(): + event_dict = load_event("kinesisFirehoseEvent.json") + event_dict["records"].pop(0) # remove first item since the payload is bytes and we want to test payload json class + handle_kinesis(event_dict, LambdaContext()) + + +def test_kinesis_trigger_event_no_envelope(): + event_dict = load_event("kinesisFirehoseEvent.json") + handle_kinesis_no_envelope(event_dict, LambdaContext()) + + +def test_kinesis_trigger_bad_base64_event(): + event_dict = load_event("kinesisFirehoseEvent.json") + event_dict["records"][0]["data"] = {"bad base64"} + with pytest.raises(ValidationError): + handle_kinesis_no_envelope(event_dict, LambdaContext()) From 0b482684a695c76d3343c20b3f6beeb6507a0be4 Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Thu, 6 Oct 2022 15:10:07 +0100 Subject: [PATCH 2/3] feat(parser): adjusts in kinesis firehose parser --- .../parser/models/kinesis_firehose.py | 5 +- ....json => kinesisFirehoseKinesisEvent.json} | 11 ++-- tests/events/kinesisFirehosePutEvent.json | 17 ++++++ ...s firehose.py => test_kinesis_firehose.py} | 58 +++++++++++++------ 4 files changed, 65 insertions(+), 26 deletions(-) rename tests/events/{kinesisFirehoseEvent.json => kinesisFirehoseKinesisEvent.json} (75%) create mode 100644 tests/events/kinesisFirehosePutEvent.json rename tests/functional/parser/{test_kinesis firehose.py => test_kinesis_firehose.py} (52%) diff --git a/aws_lambda_powertools/utilities/parser/models/kinesis_firehose.py b/aws_lambda_powertools/utilities/parser/models/kinesis_firehose.py index 3508738e55e..2e4bdb93e31 100644 --- a/aws_lambda_powertools/utilities/parser/models/kinesis_firehose.py +++ b/aws_lambda_powertools/utilities/parser/models/kinesis_firehose.py @@ -1,4 +1,3 @@ -from datetime import datetime from typing import List, Optional, Type, Union from pydantic import BaseModel, validator @@ -9,7 +8,7 @@ class KinesisFirehoseRecordMetadata(BaseModel): shardId: str partitionKey: str - approximateArrivalTimestamp: str + approximateArrivalTimestamp: int sequenceNumber: str subsequenceNumber: str @@ -17,7 +16,7 @@ class KinesisFirehoseRecordMetadata(BaseModel): class KinesisFirehoseRecord(BaseModel): data: Union[bytes, Type[BaseModel]] # base64 encoded str is parsed into bytes recordId: str - approximateArrivalTimestamp: datetime + approximateArrivalTimestamp: int kinesisRecordMetadata: Optional[KinesisFirehoseRecordMetadata] @validator("data", pre=True, allow_reuse=True) diff --git a/tests/events/kinesisFirehoseEvent.json b/tests/events/kinesisFirehoseKinesisEvent.json similarity index 75% rename from tests/events/kinesisFirehoseEvent.json rename to tests/events/kinesisFirehoseKinesisEvent.json index a428a222a3d..5120dd57ccb 100644 --- a/tests/events/kinesisFirehoseEvent.json +++ b/tests/events/kinesisFirehoseKinesisEvent.json @@ -1,16 +1,17 @@ { "invocationId": "2b4d1ad9-2f48-94bd-a088-767c317e994a", + "sourceKinesisStreamArn":"arn:aws:kinesis:us-east-1:123456789012:stream/kinesis-source", "deliveryStreamArn": "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name", "region": "us-east-2", "records": [ { "data": "SGVsbG8gV29ybGQ=", "recordId": "record1", - "approximateArrivalTimestamp": 1510772160000, + "approximateArrivalTimestamp": 1664028820148, "kinesisRecordMetadata": { "shardId": "shardId-000000000000", "partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c317a", - "approximateArrivalTimestamp": "2012-04-23T18:25:43.511Z", + "approximateArrivalTimestamp": 1664028820148, "sequenceNumber": "49546986683135544286507457936321625675700192471156785154", "subsequenceNumber": "" } @@ -18,14 +19,14 @@ { "data": "eyJIZWxsbyI6ICJXb3JsZCJ9", "recordId": "record2", - "approximateArrivalTimestamp": 151077216000, + "approximateArrivalTimestamp": 1664028793294, "kinesisRecordMetadata": { "shardId": "shardId-000000000001", "partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c318a", - "approximateArrivalTimestamp": "2012-04-23T19:25:43.511Z", + "approximateArrivalTimestamp": 1664028793294, "sequenceNumber": "49546986683135544286507457936321625675700192471156785155", "subsequenceNumber": "" } } ] -} \ No newline at end of file +} diff --git a/tests/events/kinesisFirehosePutEvent.json b/tests/events/kinesisFirehosePutEvent.json new file mode 100644 index 00000000000..27aeddd80eb --- /dev/null +++ b/tests/events/kinesisFirehosePutEvent.json @@ -0,0 +1,17 @@ +{ + "invocationId": "2b4d1ad9-2f48-94bd-a088-767c317e994a", + "deliveryStreamArn": "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name", + "region": "us-east-2", + "records":[ + { + "recordId":"record1", + "approximateArrivalTimestamp":1664029185290, + "data":"SGVsbG8gV29ybGQ=" + }, + { + "recordId":"record2", + "approximateArrivalTimestamp":1664029186945, + "data":"eyJIZWxsbyI6ICJXb3JsZCJ9" + } + ] + } diff --git a/tests/functional/parser/test_kinesis firehose.py b/tests/functional/parser/test_kinesis_firehose.py similarity index 52% rename from tests/functional/parser/test_kinesis firehose.py rename to tests/functional/parser/test_kinesis_firehose.py index e8ed0c8f933..7ff7f3f9800 100644 --- a/tests/functional/parser/test_kinesis firehose.py +++ b/tests/functional/parser/test_kinesis_firehose.py @@ -14,24 +14,22 @@ @event_parser(model=MyKinesisFirehoseBusiness, envelope=envelopes.KinesisFirehoseEnvelope) -def handle_kinesis(event: List[MyKinesisFirehoseBusiness], _: LambdaContext): +def handle_firehose(event: List[MyKinesisFirehoseBusiness], _: LambdaContext): assert len(event) == 1 assert event[0].Hello == "World" @event_parser(model=KinesisFirehoseModel) -def handle_kinesis_no_envelope(event: KinesisFirehoseModel, _: LambdaContext): +def handle_firehose_no_envelope_kinesis(event: KinesisFirehoseModel, _: LambdaContext): assert event.region == "us-east-2" assert event.invocationId == "2b4d1ad9-2f48-94bd-a088-767c317e994a" assert event.deliveryStreamArn == "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name" - assert event.sourceKinesisStreamArn is None + assert event.sourceKinesisStreamArn == "arn:aws:kinesis:us-east-1:123456789012:stream/kinesis-source" records = list(event.records) assert len(records) == 2 record_01: KinesisFirehoseRecord = records[0] - - convert_time = int(round(record_01.approximateArrivalTimestamp.timestamp() * 1000)) - assert convert_time == 1510772160000 + assert record_01.approximateArrivalTimestamp == 1664028820148 assert record_01.recordId == "record1" assert record_01.data == b"Hello World" @@ -39,12 +37,11 @@ def handle_kinesis_no_envelope(event: KinesisFirehoseModel, _: LambdaContext): assert metadata_01.partitionKey == "4d1ad2b9-24f8-4b9d-a088-76e9947c317a" assert metadata_01.subsequenceNumber == "" assert metadata_01.shardId == "shardId-000000000000" - assert metadata_01.approximateArrivalTimestamp == "2012-04-23T18:25:43.511Z" + assert metadata_01.approximateArrivalTimestamp == 1664028820148 assert metadata_01.sequenceNumber == "49546986683135544286507457936321625675700192471156785154" record_02: KinesisFirehoseRecord = records[1] - convert_time = int(round(record_02.approximateArrivalTimestamp.timestamp() * 1000)) - assert convert_time == 151077216000 + assert record_02.approximateArrivalTimestamp == 1664028793294 assert record_02.recordId == "record2" assert record_02.data == b'{"Hello": "World"}' @@ -52,23 +49,48 @@ def handle_kinesis_no_envelope(event: KinesisFirehoseModel, _: LambdaContext): assert metadata_02.partitionKey == "4d1ad2b9-24f8-4b9d-a088-76e9947c318a" assert metadata_02.subsequenceNumber == "" assert metadata_02.shardId == "shardId-000000000001" - assert metadata_02.approximateArrivalTimestamp == "2012-04-23T19:25:43.511Z" + assert metadata_02.approximateArrivalTimestamp == 1664028793294 assert metadata_02.sequenceNumber == "49546986683135544286507457936321625675700192471156785155" -def test_kinesis_trigger_event(): - event_dict = load_event("kinesisFirehoseEvent.json") +@event_parser(model=KinesisFirehoseModel) +def handle_firehose_no_envelope_put(event: KinesisFirehoseModel, _: LambdaContext): + assert event.region == "us-east-2" + assert event.invocationId == "2b4d1ad9-2f48-94bd-a088-767c317e994a" + assert event.deliveryStreamArn == "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name" + + records = list(event.records) + assert len(records) == 2 + + record_01: KinesisFirehoseRecord = records[0] + assert record_01.approximateArrivalTimestamp == 1664029185290 + assert record_01.recordId == "record1" + assert record_01.data == b"Hello World" + + record_02: KinesisFirehoseRecord = records[1] + assert record_02.approximateArrivalTimestamp == 1664029186945 + assert record_02.recordId == "record2" + assert record_02.data == b'{"Hello": "World"}' + + +def test_firehose_trigger_event(): + event_dict = load_event("kinesisFirehoseKinesisEvent.json") event_dict["records"].pop(0) # remove first item since the payload is bytes and we want to test payload json class - handle_kinesis(event_dict, LambdaContext()) + handle_firehose(event_dict, LambdaContext()) + + +def test_firehose_trigger_event_kinesis_no_envelope(): + event_dict = load_event("kinesisFirehoseKinesisEvent.json") + handle_firehose_no_envelope_kinesis(event_dict, LambdaContext()) -def test_kinesis_trigger_event_no_envelope(): - event_dict = load_event("kinesisFirehoseEvent.json") - handle_kinesis_no_envelope(event_dict, LambdaContext()) +def test_firehose_trigger_event_put_no_envelope(): + event_dict = load_event("kinesisFirehosePutEvent.json") + handle_firehose_no_envelope_put(event_dict, LambdaContext()) def test_kinesis_trigger_bad_base64_event(): - event_dict = load_event("kinesisFirehoseEvent.json") + event_dict = load_event("kinesisFirehoseKinesisEvent.json") event_dict["records"][0]["data"] = {"bad base64"} with pytest.raises(ValidationError): - handle_kinesis_no_envelope(event_dict, LambdaContext()) + handle_firehose_no_envelope_kinesis(event_dict, LambdaContext()) From ca73ecd03a8a3fcfc4b2cb02dfd38b750a3d1f20 Mon Sep 17 00:00:00 2001 From: Ran Isenberg Date: Sun, 9 Oct 2022 14:13:28 +0300 Subject: [PATCH 3/3] add positive int and fix develop conflict --- .../utilities/parser/models/__init__.py | 19 +++++++++++++++--- .../parser/models/kinesis_firehose.py | 6 +++--- .../parser/test_kinesis_firehose.py | 20 ++++++++++++++++++- 3 files changed, 38 insertions(+), 7 deletions(-) diff --git a/aws_lambda_powertools/utilities/parser/models/__init__.py b/aws_lambda_powertools/utilities/parser/models/__init__.py index ece50195eef..62e28a62374 100644 --- a/aws_lambda_powertools/utilities/parser/models/__init__.py +++ b/aws_lambda_powertools/utilities/parser/models/__init__.py @@ -26,9 +26,22 @@ DynamoDBStreamRecordModel, ) from .event_bridge import EventBridgeModel -from .kafka import KafkaBaseEventModel, KafkaMskEventModel, KafkaRecordModel, KafkaSelfManagedEventModel -from .kinesis import KinesisDataStreamModel, KinesisDataStreamRecord, KinesisDataStreamRecordPayload -from .kinesis_firehose import KinesisFirehoseModel, KinesisFirehoseRecord, KinesisFirehoseRecordMetadata +from .kafka import ( + KafkaBaseEventModel, + KafkaMskEventModel, + KafkaRecordModel, + KafkaSelfManagedEventModel, +) +from .kinesis import ( + KinesisDataStreamModel, + KinesisDataStreamRecord, + KinesisDataStreamRecordPayload, +) +from .kinesis_firehose import ( + KinesisFirehoseModel, + KinesisFirehoseRecord, + KinesisFirehoseRecordMetadata, +) from .lambda_function_url import LambdaFunctionUrlModel from .s3 import S3Model, S3RecordModel from .s3_object_event import ( diff --git a/aws_lambda_powertools/utilities/parser/models/kinesis_firehose.py b/aws_lambda_powertools/utilities/parser/models/kinesis_firehose.py index 2e4bdb93e31..c59d8c680e5 100644 --- a/aws_lambda_powertools/utilities/parser/models/kinesis_firehose.py +++ b/aws_lambda_powertools/utilities/parser/models/kinesis_firehose.py @@ -1,6 +1,6 @@ from typing import List, Optional, Type, Union -from pydantic import BaseModel, validator +from pydantic import BaseModel, PositiveInt, validator from aws_lambda_powertools.shared.functions import base64_decode @@ -8,7 +8,7 @@ class KinesisFirehoseRecordMetadata(BaseModel): shardId: str partitionKey: str - approximateArrivalTimestamp: int + approximateArrivalTimestamp: PositiveInt sequenceNumber: str subsequenceNumber: str @@ -16,7 +16,7 @@ class KinesisFirehoseRecordMetadata(BaseModel): class KinesisFirehoseRecord(BaseModel): data: Union[bytes, Type[BaseModel]] # base64 encoded str is parsed into bytes recordId: str - approximateArrivalTimestamp: int + approximateArrivalTimestamp: PositiveInt kinesisRecordMetadata: Optional[KinesisFirehoseRecordMetadata] @validator("data", pre=True, allow_reuse=True) diff --git a/tests/functional/parser/test_kinesis_firehose.py b/tests/functional/parser/test_kinesis_firehose.py index 7ff7f3f9800..59bbd2f4e18 100644 --- a/tests/functional/parser/test_kinesis_firehose.py +++ b/tests/functional/parser/test_kinesis_firehose.py @@ -2,7 +2,11 @@ import pytest -from aws_lambda_powertools.utilities.parser import ValidationError, envelopes, event_parser +from aws_lambda_powertools.utilities.parser import ( + ValidationError, + envelopes, + event_parser, +) from aws_lambda_powertools.utilities.parser.models import ( KinesisFirehoseModel, KinesisFirehoseRecord, @@ -94,3 +98,17 @@ def test_kinesis_trigger_bad_base64_event(): event_dict["records"][0]["data"] = {"bad base64"} with pytest.raises(ValidationError): handle_firehose_no_envelope_kinesis(event_dict, LambdaContext()) + + +def test_kinesis_trigger_bad_timestamp_event(): + event_dict = load_event("kinesisFirehoseKinesisEvent.json") + event_dict["records"][0]["approximateArrivalTimestamp"] = -1 + with pytest.raises(ValidationError): + handle_firehose_no_envelope_kinesis(event_dict, LambdaContext()) + + +def test_kinesis_trigger_bad_metadata_timestamp_event(): + event_dict = load_event("kinesisFirehoseKinesisEvent.json") + event_dict["records"][0]["kinesisRecordMetadata"]["approximateArrivalTimestamp"] = "-1" + with pytest.raises(ValidationError): + handle_firehose_no_envelope_kinesis(event_dict, LambdaContext())