Skip to content

feat(event_source): add Kinesis Firehose Data Transformation data class #3029

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 52 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
040358d
support kinesis response
roger-zhangg Aug 30, 2023
6eb8530
Merge branch 'develop' into kinesis
leandrodamascena Aug 30, 2023
7ca016e
fix lint, address Leandro suggestions
roger-zhangg Aug 30, 2023
785a144
Merge branch 'develop' of https://github.com/aws-powertools/powertool…
roger-zhangg Aug 30, 2023
708686f
Merge branch 'kinesis' of https://github.com/roger-zhangg/aws-lambda-…
roger-zhangg Aug 30, 2023
8c9db32
remove deleted const
roger-zhangg Aug 30, 2023
f02319f
fix Literal import in 3.7
roger-zhangg Aug 30, 2023
5f55aa7
change to use data-classes
roger-zhangg Sep 1, 2023
2566f62
fix mypy
roger-zhangg Sep 1, 2023
636e9d1
fix typo, make asdict a function
roger-zhangg Sep 1, 2023
d3114c4
Merge branch 'develop' into kinesis
leandrodamascena Sep 4, 2023
3fa7b2d
Merge branch 'develop' into kinesis
leandrodamascena Sep 5, 2023
370e156
address Troy/Leandro suggestions
roger-zhangg Sep 5, 2023
99837fc
Merge branch 'kinesis' of https://github.com/roger-zhangg/aws-lambda-…
roger-zhangg Sep 5, 2023
029a55c
Merge branch 'develop' into kinesis
leandrodamascena Sep 6, 2023
af1abfe
remove 6MB comment
roger-zhangg Sep 6, 2023
2032146
Merge branch 'kinesis' of https://github.com/roger-zhangg/aws-lambda-…
roger-zhangg Sep 6, 2023
4168e21
Merge branch 'develop' into kinesis
leandrodamascena Sep 7, 2023
312830b
fix comments
roger-zhangg Sep 7, 2023
a6c05a5
Merge branch 'kinesis' of https://github.com/roger-zhangg/aws-lambda-…
roger-zhangg Sep 7, 2023
a3ed9f9
address Heitor's suggestion
roger-zhangg Sep 7, 2023
bfbee60
data class default optimization
roger-zhangg Sep 7, 2023
4016446
remove slot for static check
roger-zhangg Sep 7, 2023
5dbb3ff
fix doc, example
roger-zhangg Sep 7, 2023
5b8a9c6
Merge branch 'develop' into kinesis
leandrodamascena Sep 7, 2023
95b3958
Merge branch 'develop' into kinesis
leandrodamascena Sep 11, 2023
e4d75d7
rename r->record
roger-zhangg Sep 11, 2023
f5ca27d
Merge branch 'kinesis' of github.com:roger-zhangg/aws-lambda-powertoo…
roger-zhangg Sep 11, 2023
46fbe98
Merge branch 'develop' into kinesis
leandrodamascena Sep 11, 2023
ddd49d2
Merge branch 'develop' into kinesis
leandrodamascena Sep 12, 2023
ce6ed61
Addressing Heitor's feedback
leandrodamascena Sep 12, 2023
d8be53e
Addressing Heitor's feedback
leandrodamascena Sep 12, 2023
3a11563
Addressing Heitor's feedback
leandrodamascena Sep 12, 2023
17c6763
add result warning, add asdict test, metadata test
roger-zhangg Sep 12, 2023
00051a8
Merge branch 'develop' into kinesis
roger-zhangg Sep 13, 2023
455402a
Merge branch 'develop' into kinesis
leandrodamascena Sep 14, 2023
cad6c09
refactor: initial refactoring
heitorlessa Sep 12, 2023
6809e0e
chore: branding
heitorlessa Sep 14, 2023
42a0de9
refactor: use classvar and tuple for perf
heitorlessa Sep 14, 2023
d05ca07
chore: fix rebase issue
heitorlessa Sep 14, 2023
b20137a
chore: fix mypy tuple exactness type
heitorlessa Sep 14, 2023
ef12496
remove Ok in example response,add failure example
roger-zhangg Sep 14, 2023
f2d5e63
Merge branch 'kinesis' of github.com:roger-zhangg/aws-lambda-powertoo…
roger-zhangg Sep 14, 2023
3e43a25
chore: clean up docs example
heitorlessa Sep 14, 2023
4ed0e66
chore: lower cognitive overhead; add example docstring
heitorlessa Sep 14, 2023
d1fc1c5
add drop example
roger-zhangg Sep 14, 2023
eb39c03
Merge branch 'kinesis' of github.com:roger-zhangg/aws-lambda-powertoo…
roger-zhangg Sep 14, 2023
c039480
docs: give info upfront, name examples
heitorlessa Sep 14, 2023
e11c718
docs: improve transforming records example
heitorlessa Sep 14, 2023
745492c
docs: improve dropping records example
heitorlessa Sep 14, 2023
61ddee9
docs: improve exception example
heitorlessa Sep 14, 2023
6f95e8f
Merge branch 'develop' into kinesis
leandrodamascena Sep 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions aws_lambda_powertools/utilities/data_classes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,17 @@
from .event_source import event_source
from .kafka_event import KafkaEvent
from .kinesis_firehose_event import KinesisFirehoseEvent
from .kinesis_firehose_response import (
FirehoseStateDropped,
FirehoseStateFailed,
FirehoseStateOk,
KinesisFirehoseResponce,
KinesisFirehoseResponceFactory,
KinesisFirehoseResponceRecord,
KinesisFirehoseResponceRecordFactory,
KinesisFirehoseResponseRecordMetadata,
KinesisFirehoseResponseRecordMetadataFactory,
)
from .kinesis_stream_event import KinesisStreamEvent
from .lambda_function_url_event import LambdaFunctionUrlEvent
from .s3_event import S3Event, S3EventBridgeNotificationEvent
Expand All @@ -37,6 +48,15 @@
"KafkaEvent",
"KinesisFirehoseEvent",
"KinesisStreamEvent",
"KinesisFirehoseResponce",
"KinesisFirehoseResponceRecord",
"KinesisFirehoseResponseRecordMetadata",
"FirehoseStateOk",
"FirehoseStateDropped",
"FirehoseStateFailed",
"KinesisFirehoseResponceFactory",
"KinesisFirehoseResponceRecordFactory",
"KinesisFirehoseResponseRecordMetadataFactory",
"LambdaFunctionUrlEvent",
"S3Event",
"S3EventBridgeNotificationEvent",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
from __future__ import annotations

import base64
from typing import Callable, Iterator, List, Optional, Union

from aws_lambda_powertools.utilities.data_classes.common import DictWrapper

FirehoseStateOk = "Ok"
FirehoseStateDropped = "Dropped"
FirehoseStateFailed = "ProcessingFailed"


class KinesisFirehoseResponseRecordMetadata(DictWrapper):
"""
Documentation:
--------------
- https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
"""

@property
def _metadata(self) -> Optional[dict]:
"""Optional: metadata associated with this record; present only when Kinesis Stream is source"""
return self["metadata"] # could raise KeyError

@property
def partition_keys(self) -> Optional[dict[str, str]]:
"""Kinesis stream partition key; present only when Kinesis Stream is source"""
return self._metadata["partitionKeys"]


def KinesisFirehoseResponseRecordMetadataFactory(
partition_keys: dict[str, str],
json_deserializer: Optional[Callable] = None,
) -> KinesisFirehoseResponseRecordMetadata:
data = {
"metadata": {
"partitionKeys": partition_keys,
},
}
return KinesisFirehoseResponseRecordMetadata(data=data, json_deserializer=json_deserializer)


class KinesisFirehoseResponceRecord(DictWrapper):
"""Record in Kinesis Data Firehose event

Documentation:
--------------
- https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html
"""

@property
def record_id(self) -> str:
"""Record ID; uniquely identifies this record within the current batch"""
return self["recordId"]

@property
def result(self) -> Union[FirehoseStateOk, FirehoseStateDropped, FirehoseStateFailed]:
"""processing result, supported value: Ok, Dropped, ProcessingFailed"""
return self["result"]

@property
def data(self) -> str:
"""The data blob, base64-encoded"""
return self["data"]

@property
def metadata(self) -> Optional[KinesisFirehoseResponseRecordMetadata]:
"""Optional: metadata associated with this record; present only when Kinesis Stream is source"""
return KinesisFirehoseResponseRecordMetadata(self._data) if self.get("metadata") else None

@property
def data_as_bytes(self) -> bytes:
"""Decoded base64-encoded data as bytes"""
return base64.b64decode(self.data)

@property
def data_as_text(self) -> str:
"""Decoded base64-encoded data as text"""
return self.data_as_bytes.decode("utf-8")

@property
def data_as_json(self) -> dict:
"""Decoded base64-encoded data loaded to json"""
if self._json_data is None:
self._json_data = self._json_deserializer(self.data_as_text)
return self._json_data


def KinesisFirehoseResponceRecordFactory(
record_id: str,
result: Union[FirehoseStateOk, FirehoseStateDropped, FirehoseStateFailed],
data: str,
metadata: Optional[KinesisFirehoseResponseRecordMetadata] = None,
json_deserializer: Optional[Callable] = None,
) -> KinesisFirehoseResponceRecord:
pass_data = {
"recordId": record_id,
"result": result,
"data": base64.b64encode(data.encode("utf-8")).decode("utf-8"),
}
if metadata:
data["metadata"] = metadata
return KinesisFirehoseResponceRecord(data=pass_data, json_deserializer=json_deserializer)


class KinesisFirehoseResponce(DictWrapper):
"""Kinesis Data Firehose event

Documentation:
--------------
- https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html
"""

@property
def records(self) -> Iterator[KinesisFirehoseResponceRecord]:
for record in self["records"]:
yield KinesisFirehoseResponceRecord(data=record, json_deserializer=self._json_deserializer)


def KinesisFirehoseResponceFactory(
records: List[KinesisFirehoseResponceRecord],
json_deserializer: Optional[Callable] = None,
) -> KinesisFirehoseResponce:
pass_data = {"records": records}
return KinesisFirehoseResponce(data=pass_data, json_deserializer=json_deserializer)
20 changes: 11 additions & 9 deletions examples/event_sources/src/kinesis_firehose_delivery_stream.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,30 @@
import base64
import json

from aws_lambda_powertools.utilities.data_classes import (
FirehoseStateOk,
KinesisFirehoseEvent,
KinesisFirehoseResponce,
KinesisFirehoseResponceRecordFactory,
event_source,
)
from aws_lambda_powertools.utilities.typing import LambdaContext


@event_source(data_class=KinesisFirehoseEvent)
def lambda_handler(event: KinesisFirehoseEvent, context: LambdaContext):
result = []
result = KinesisFirehoseResponce({})

for record in event.records:
# if data was delivered as json; caches loaded value
data = record.data_as_json

processed_record = {
"recordId": record.record_id,
"data": base64.b64encode(json.dumps(data).encode("utf-8")),
"result": "Ok",
}
processed_record = KinesisFirehoseResponceRecordFactory(
record_id=record.record_id,
result=FirehoseStateOk,
data=(json.dumps(data)),
)

result.append(processed_record)
result.add_record(processed_record)

# return transformed records
return {"records": result}
return result
36 changes: 36 additions & 0 deletions tests/unit/data_classes/test_kinesis_firehose_response.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from aws_lambda_powertools.utilities.data_classes import (
FirehoseStateOk,
KinesisFirehoseEvent,
KinesisFirehoseResponceFactory,
KinesisFirehoseResponceRecordFactory,
)
from tests.functional.utils import load_event


def test_kinesis_firehose_response():
raw_event = load_event("kinesisFirehoseKinesisEvent.json")
parsed_event = KinesisFirehoseEvent(raw_event)

result = []
for record in parsed_event.records:
# if data was delivered as json; caches loaded value
data = record.data_as_text

processed_record = KinesisFirehoseResponceRecordFactory(
record_id=record.record_id,
result=FirehoseStateOk,
data=(data),
)

result.append(processed_record)
response = KinesisFirehoseResponceFactory(result)

res_records = list(response.records)
assert len(res_records) == 2
record_01, record_02 = res_records[:]
record01_raw = raw_event["records"][0]
assert record_01.result == FirehoseStateOk
assert record_01.record_id == record01_raw["recordId"]
assert record_01.data_as_bytes == b"Hello World"
assert record_01.data_as_text == "Hello World"
assert record_01.data == record01_raw["data"]