Skip to content

feat(event_source): add Kinesis Firehose Data Transformation data class #3029

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 52 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
040358d
support kinesis response
roger-zhangg Aug 30, 2023
6eb8530
Merge branch 'develop' into kinesis
leandrodamascena Aug 30, 2023
7ca016e
fix lint, address Leandro suggestions
roger-zhangg Aug 30, 2023
785a144
Merge branch 'develop' of https://github.com/aws-powertools/powertool…
roger-zhangg Aug 30, 2023
708686f
Merge branch 'kinesis' of https://github.com/roger-zhangg/aws-lambda-…
roger-zhangg Aug 30, 2023
8c9db32
remove deleted const
roger-zhangg Aug 30, 2023
f02319f
fix Literal import in 3.7
roger-zhangg Aug 30, 2023
5f55aa7
change to use data-classes
roger-zhangg Sep 1, 2023
2566f62
fix mypy
roger-zhangg Sep 1, 2023
636e9d1
fix typo, make asdict a function
roger-zhangg Sep 1, 2023
d3114c4
Merge branch 'develop' into kinesis
leandrodamascena Sep 4, 2023
3fa7b2d
Merge branch 'develop' into kinesis
leandrodamascena Sep 5, 2023
370e156
address Troy/Leandro suggestions
roger-zhangg Sep 5, 2023
99837fc
Merge branch 'kinesis' of https://github.com/roger-zhangg/aws-lambda-…
roger-zhangg Sep 5, 2023
029a55c
Merge branch 'develop' into kinesis
leandrodamascena Sep 6, 2023
af1abfe
remove 6MB comment
roger-zhangg Sep 6, 2023
2032146
Merge branch 'kinesis' of https://github.com/roger-zhangg/aws-lambda-…
roger-zhangg Sep 6, 2023
4168e21
Merge branch 'develop' into kinesis
leandrodamascena Sep 7, 2023
312830b
fix comments
roger-zhangg Sep 7, 2023
a6c05a5
Merge branch 'kinesis' of https://github.com/roger-zhangg/aws-lambda-…
roger-zhangg Sep 7, 2023
a3ed9f9
address Heitor's suggestion
roger-zhangg Sep 7, 2023
bfbee60
data class default optimization
roger-zhangg Sep 7, 2023
4016446
remove slot for static check
roger-zhangg Sep 7, 2023
5dbb3ff
fix doc, example
roger-zhangg Sep 7, 2023
5b8a9c6
Merge branch 'develop' into kinesis
leandrodamascena Sep 7, 2023
95b3958
Merge branch 'develop' into kinesis
leandrodamascena Sep 11, 2023
e4d75d7
rename r->record
roger-zhangg Sep 11, 2023
f5ca27d
Merge branch 'kinesis' of github.com:roger-zhangg/aws-lambda-powertoo…
roger-zhangg Sep 11, 2023
46fbe98
Merge branch 'develop' into kinesis
leandrodamascena Sep 11, 2023
ddd49d2
Merge branch 'develop' into kinesis
leandrodamascena Sep 12, 2023
ce6ed61
Addressing Heitor's feedback
leandrodamascena Sep 12, 2023
d8be53e
Addressing Heitor's feedback
leandrodamascena Sep 12, 2023
3a11563
Addressing Heitor's feedback
leandrodamascena Sep 12, 2023
17c6763
add result warning, add asdict test, metadata test
roger-zhangg Sep 12, 2023
00051a8
Merge branch 'develop' into kinesis
roger-zhangg Sep 13, 2023
455402a
Merge branch 'develop' into kinesis
leandrodamascena Sep 14, 2023
cad6c09
refactor: initial refactoring
heitorlessa Sep 12, 2023
6809e0e
chore: branding
heitorlessa Sep 14, 2023
42a0de9
refactor: use classvar and tuple for perf
heitorlessa Sep 14, 2023
d05ca07
chore: fix rebase issue
heitorlessa Sep 14, 2023
b20137a
chore: fix mypy tuple exactness type
heitorlessa Sep 14, 2023
ef12496
remove Ok in example response,add failure example
roger-zhangg Sep 14, 2023
f2d5e63
Merge branch 'kinesis' of github.com:roger-zhangg/aws-lambda-powertoo…
roger-zhangg Sep 14, 2023
3e43a25
chore: clean up docs example
heitorlessa Sep 14, 2023
4ed0e66
chore: lower cognitive overhead; add example docstring
heitorlessa Sep 14, 2023
d1fc1c5
add drop example
roger-zhangg Sep 14, 2023
eb39c03
Merge branch 'kinesis' of github.com:roger-zhangg/aws-lambda-powertoo…
roger-zhangg Sep 14, 2023
c039480
docs: give info upfront, name examples
heitorlessa Sep 14, 2023
e11c718
docs: improve transforming records example
heitorlessa Sep 14, 2023
745492c
docs: improve dropping records example
heitorlessa Sep 14, 2023
61ddee9
docs: improve exception example
heitorlessa Sep 14, 2023
6f95e8f
Merge branch 'develop' into kinesis
leandrodamascena Sep 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion aws_lambda_powertools/utilities/data_classes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@
from .event_bridge_event import EventBridgeEvent
from .event_source import event_source
from .kafka_event import KafkaEvent
from .kinesis_firehose_event import KinesisFirehoseEvent
from .kinesis_firehose_event import (
KinesisFirehoseEvent,
KinesisFirehoseResponse,
KinesisFirehoseResponseRecord,
KinesisFirehoseResponseRecordMetadata,
)
from .kinesis_stream_event import KinesisStreamEvent
from .lambda_function_url_event import LambdaFunctionUrlEvent
from .s3_event import S3Event, S3EventBridgeNotificationEvent
Expand All @@ -37,6 +42,9 @@
"KafkaEvent",
"KinesisFirehoseEvent",
"KinesisStreamEvent",
"KinesisFirehoseResponse",
"KinesisFirehoseResponseRecord",
"KinesisFirehoseResponseRecordMetadata",
"LambdaFunctionUrlEvent",
"S3Event",
"S3EventBridgeNotificationEvent",
Expand Down
2 changes: 1 addition & 1 deletion aws_lambda_powertools/utilities/data_classes/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class DictWrapper(Mapping):

def __init__(self, data: Dict[str, Any], json_deserializer: Optional[Callable] = None):
"""
Parameters
ParametersW
----------
data : Dict[str, Any]
Lambda Event Source Event payload
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,126 @@
import base64
from typing import Iterator, Optional
import json
import sys
from dataclasses import dataclass
from typing import Any, Callable, Dict, Iterator, List, Optional

from aws_lambda_powertools.utilities.data_classes.common import DictWrapper

if sys.version_info >= (3, 8):
from typing import Literal
else:
from typing_extensions import Literal


@dataclass
class KinesisFirehoseResponseRecordMetadata:
"""
Documentation:
--------------
- https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
"""

partition_keys: Optional[Dict[str, str]]

@property
def asdict(self) -> Optional[Dict]:
if self.partition_keys is not None:
return {"partitionKeys": self.partition_keys}
return None


@dataclass
class KinesisFirehoseResponseRecord:
"""Record in Kinesis Data Firehose event

Documentation:
--------------
- https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html
"""

"""Record ID; uniquely identifies this record within the current batch"""
record_id: str
"""processing result, supported value: Ok, Dropped, ProcessingFailed"""
result: Literal["Ok", "Dropped", "ProcessingFailed"]
"""The data blob, base64-encoded, optional at init"""
data: Optional[str] = None
"""Optional: metadata associated with this record; present only when Kinesis Stream is source"""
metadata: Optional[KinesisFirehoseResponseRecordMetadata] = None
"""Json data for caching json.dump result"""
_json_data: Optional[Any] = None
json_serializer: Callable = json.dumps
json_deserializer: Callable = json.loads

def data_from_byte(self, data: bytes):
"""Populate data field using a byte like data"""
self.data = base64.b64encode(data).decode("utf-8")

def data_from_text(self, data: str):
"""Populate data field using a string like data"""
self.data_from_byte(data.encode("utf-8"))

def data_from_json(self, data: Any):
"""Populate data field using any structure that could be converted to json"""
self.data_from_text(data=self.json_serializer(data))

@property
def asdict(self) -> Dict:
r: Dict[str, Any] = {
"recordId": self.record_id,
"result": self.result,
"data": self.data,
}
if self.metadata:
r["metadata"] = self.metadata.asdict
return r

@property
def data_as_bytes(self) -> Optional[bytes]:
"""Decoded base64-encoded data as bytes"""
if not self.data:
return None
return base64.b64decode(self.data)

@property
def data_as_text(self) -> Optional[str]:
"""Decoded base64-encoded data as text"""
if not self.data_as_bytes:
return None
return self.data_as_bytes.decode("utf-8")

@property
def data_as_json(self) -> Optional[Dict]:
"""Decoded base64-encoded data loaded to json"""
if not self.data_as_text:
return None
if self._json_data is None:
self._json_data = self.json_deserializer(self.data_as_text)
return self._json_data


@dataclass
class KinesisFirehoseResponse:
"""Kinesis Data Firehose event

Documentation:
--------------
- https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html
"""

records: Optional[List[KinesisFirehoseResponseRecord]] = None

def add_record(self, record: KinesisFirehoseResponseRecord):
if self.records:
self.records.append(record)
else:
self.records = [record]

@property
def asdict(self) -> Dict:
if not self.records:
return {}
return {"records": [r.asdict for r in self.records]}


class KinesisFirehoseRecordMetadata(DictWrapper):
@property
Expand Down Expand Up @@ -77,6 +195,13 @@ def data_as_json(self) -> dict:
self._json_data = self._json_deserializer(self.data_as_text)
return self._json_data

def create_firehose_response_record(
self,
result: Literal["Ok", "Dropped", "ProcessingFailed"],
data: Optional[str] = None,
) -> KinesisFirehoseResponseRecord:
return KinesisFirehoseResponseRecord(record_id=self.record_id, result=result, data=data)


class KinesisFirehoseEvent(DictWrapper):
"""Kinesis Data Firehose event
Expand Down
19 changes: 8 additions & 11 deletions examples/event_sources/src/kinesis_firehose_delivery_stream.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,25 @@
import base64
import json

from aws_lambda_powertools.utilities.data_classes import (
KinesisFirehoseEvent,
KinesisFirehoseResponse,
event_source,
)
from aws_lambda_powertools.utilities.typing import LambdaContext


@event_source(data_class=KinesisFirehoseEvent)
def lambda_handler(event: KinesisFirehoseEvent, context: LambdaContext):
result = []
result = KinesisFirehoseResponse()

for record in event.records:
# if data was delivered as json; caches loaded value
data = record.data_as_json

processed_record = {
"recordId": record.record_id,
"data": base64.b64encode(json.dumps(data).encode("utf-8")),
"result": "Ok",
}
## do all kind of stuff with data

processed_record = record.create_firehose_response_record(result="Ok")
processed_record.data_from_json(data=data)

result.append(processed_record)
result.add_record(processed_record)

# return transformed records
return {"records": result}
return result.asdict
63 changes: 63 additions & 0 deletions tests/unit/data_classes/test_kinesis_firehose_response.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from aws_lambda_powertools.utilities.data_classes import (
KinesisFirehoseEvent,
KinesisFirehoseResponse,
KinesisFirehoseResponseRecord,
)
from tests.functional.utils import load_event


def test_kinesis_firehose_response():
raw_event = load_event("kinesisFirehoseKinesisEvent.json")
parsed_event = KinesisFirehoseEvent(data=raw_event)

response = KinesisFirehoseResponse()
for record in parsed_event.records:
# if data was delivered as json; caches loaded value
data = record.data_as_text

processed_record = KinesisFirehoseResponseRecord(
record_id=record.record_id,
result="Ok",
)
processed_record.data_from_text(data=data)
response.add_record(record=processed_record)
response_dict = response.asdict

res_records = list(response_dict["records"])
assert len(res_records) == 2
record_01, record_02 = res_records[:]
record01_raw = raw_event["records"][0]
assert record_01["result"] == "Ok"
assert record_01["recordId"] == record01_raw["recordId"]
assert record_01["data"] == record01_raw["data"]

assert response.records[0].data_as_bytes == b"Hello World"
assert response.records[0].data_as_text == "Hello World"


def test_kinesis_firehose_create_response():
raw_event = load_event("kinesisFirehoseKinesisEvent.json")
parsed_event = KinesisFirehoseEvent(data=raw_event)

response = KinesisFirehoseResponse()
for record in parsed_event.records:
# if data was delivered as json; caches loaded value
data = record.data_as_text

processed_record = record.create_firehose_response_record(
result="Ok",
)
processed_record.data_from_text(data=data)
response.add_record(record=processed_record)
response_dict = response.asdict

res_records = list(response_dict["records"])
assert len(res_records) == 2
record_01, record_02 = res_records[:]
record01_raw = raw_event["records"][0]
assert record_01["result"] == "Ok"
assert record_01["recordId"] == record01_raw["recordId"]
assert record_01["data"] == record01_raw["data"]

assert response.records[0].data_as_bytes == b"Hello World"
assert response.records[0].data_as_text == "Hello World"