Skip to content

Commit 17c6763

Browse files
committed
add result warning, add asdict test, metadata test
1 parent 3a11563 commit 17c6763

File tree

2 files changed

+59
-7
lines changed

2 files changed

+59
-7
lines changed

aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import base64
22
import json
3+
import warnings
34
from dataclasses import dataclass, field
45
from typing import Any, Callable, Dict, Iterator, List, Optional
56

@@ -11,12 +12,24 @@
1112
@dataclass(repr=False, order=False, frozen=True)
1213
class KinesisFirehoseDataTransformationRecordMetadata:
1314
"""
15+
Metadata in Firehose Data Transform Record.
16+
17+
Parameters
18+
----------
19+
partition_keys: Dict[str, str]
20+
A dict of partition keys/value in string format, e.g. `{"year":"2023","month":"09"}`
21+
1422
Documentation:
1523
--------------
1624
- https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
1725
"""
1826

19-
partition_keys: Dict[str, Any] = field(default_factory=lambda: {})
27+
partition_keys: Dict[str, str] = field(default_factory=lambda: {})
28+
29+
def asdict(self) -> Dict:
30+
if self.partition_keys is not None:
31+
return {"partitionKeys": self.partition_keys}
32+
return {}
2033

2134

2235
@dataclass(repr=False, order=False)
@@ -58,13 +71,19 @@ class KinesisFirehoseDataTransformationRecord:
5871
_json_data: Optional[Any] = None
5972

6073
def asdict(self) -> Dict:
74+
if self.result not in ["Ok", "Dropped", "ProcessingFailed"]:
75+
warnings.warn(
76+
stacklevel=1,
77+
message=f'The result "{self.result}" is not valid, Choose from "Ok", "Dropped", "ProcessingFailed"',
78+
)
79+
6180
record: Dict[str, Any] = {
6281
"recordId": self.record_id,
6382
"result": self.result,
6483
"data": self.data,
6584
}
6685
if self.metadata:
67-
record["metadata"] = self.metadata.__dict__
86+
record["metadata"] = self.metadata.asdict()
6887
return record
6988

7089
@property

tests/unit/data_classes/test_kinesis_firehose_response.py

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,20 @@
88
from tests.functional.utils import load_event
99

1010

11+
def test_kinesis_firehose_response_metadata():
12+
# When we create metadata with partition keys and attach to a firehose response record
13+
metadata_partition = KinesisFirehoseDataTransformationRecordMetadata(partition_keys={"year": "2023"})
14+
15+
processed_record = KinesisFirehoseDataTransformationRecord(
16+
record_id="test_id",
17+
metadata=metadata_partition,
18+
data="",
19+
)
20+
# Then we should have partition keys available in metadata field with same value
21+
assert processed_record.metadata.partition_keys["year"] == "2023"
22+
assert metadata_partition.asdict() == {"partitionKeys": {"year": "2023"}}
23+
24+
1125
def test_kinesis_firehose_response():
1226
# GIVEN a Kinesis Firehose Event with two records
1327
raw_event = load_event("kinesisFirehoseKinesisEvent.json")
@@ -16,10 +30,8 @@ def test_kinesis_firehose_response():
1630
# WHEN we create a Data Transformation Response without changing the data
1731
response = KinesisFirehoseDataTransformationResponse()
1832
for record in parsed_event.records:
19-
metadata_partition = KinesisFirehoseDataTransformationRecordMetadata(partition_keys={"year": 2023})
2033
processed_record = KinesisFirehoseDataTransformationRecord(
2134
record_id=record.record_id,
22-
metadata=metadata_partition,
2335
data=record.data,
2436
)
2537
response.add_record(record=processed_record)
@@ -39,7 +51,28 @@ def test_kinesis_firehose_response():
3951
assert record_01.data == raw_record_01["data"]
4052
assert record_02.data == raw_record_02["data"]
4153

42-
assert record_01.metadata.partition_keys["year"] == 2023
54+
55+
def test_kinesis_firehose_response_asdict():
56+
# Given the following example response provided by Firehose
57+
sample_response = {
58+
"records": [
59+
{"recordId": "sample_record", "data": "", "result": "Ok", "metadata": {"partitionKeys": {"year": "2023"}}},
60+
],
61+
}
62+
63+
# Then asdict function should be able to return the same value
64+
response = KinesisFirehoseDataTransformationResponse()
65+
metadata_partition = KinesisFirehoseDataTransformationRecordMetadata(
66+
partition_keys=sample_response["records"][0]["metadata"]["partitionKeys"],
67+
)
68+
processed_record = KinesisFirehoseDataTransformationRecord(
69+
record_id=sample_response["records"][0]["recordId"],
70+
data=sample_response["records"][0]["data"],
71+
result=sample_response["records"][0]["result"],
72+
metadata=metadata_partition,
73+
)
74+
response.add_record(record=processed_record)
75+
assert response.asdict() == sample_response
4376

4477

4578
def test_kinesis_firehose_create_response():
@@ -54,7 +87,7 @@ def test_kinesis_firehose_create_response():
5487

5588
response = KinesisFirehoseDataTransformationResponse()
5689
for record in parsed_event.records:
57-
metadata_partition = KinesisFirehoseDataTransformationRecordMetadata(partition_keys={"year": 2023})
90+
metadata_partition = KinesisFirehoseDataTransformationRecordMetadata(partition_keys={"year": "2023"})
5891
processed_record = record.build_data_transformation_response(
5992
result="Ok",
6093
metadata=metadata_partition,
@@ -77,4 +110,4 @@ def test_kinesis_firehose_create_response():
77110
assert record_01.data == base64_encode(arbitrary_data)
78111
assert record_02.data == base64_encode(arbitrary_data)
79112

80-
assert record_01.metadata.partition_keys["year"] == 2023
113+
assert record_01.metadata.partition_keys["year"] == "2023"

0 commit comments

Comments
 (0)