Skip to content

Commit 636e9d1

Browse files
committedSep 1, 2023
fix typo, make asdict a function
1 parent 2566f62 commit 636e9d1

File tree

6 files changed

+64
-17
lines changed

6 files changed

+64
-17
lines changed
 

‎aws_lambda_powertools/utilities/data_classes/common.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ class DictWrapper(Mapping):
1515

1616
def __init__(self, data: Dict[str, Any], json_deserializer: Optional[Callable] = None):
1717
"""
18-
ParametersW
18+
Parameters
1919
----------
2020
data : Dict[str, Any]
2121
Lambda Event Source Event payload

‎aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,11 @@
11
import base64
22
import json
3-
import sys
43
from dataclasses import dataclass
54
from typing import Any, Callable, Dict, Iterator, List, Optional
65

7-
from aws_lambda_powertools.utilities.data_classes.common import DictWrapper
6+
from typing_extensions import Literal
87

9-
if sys.version_info >= (3, 8):
10-
from typing import Literal
11-
else:
12-
from typing_extensions import Literal
8+
from aws_lambda_powertools.utilities.data_classes.common import DictWrapper
139

1410

1511
@dataclass
@@ -22,7 +18,6 @@ class KinesisFirehoseResponseRecordMetadata:
2218

2319
partition_keys: Optional[Dict[str, str]]
2420

25-
@property
2621
def asdict(self) -> Optional[Dict]:
2722
if self.partition_keys is not None:
2823
return {"partitionKeys": self.partition_keys}
@@ -42,7 +37,8 @@ class KinesisFirehoseResponseRecord:
4237
record_id: str
4338
"""processing result, supported value: Ok, Dropped, ProcessingFailed"""
4439
result: Literal["Ok", "Dropped", "ProcessingFailed"]
45-
"""The data blob, base64-encoded, optional at init"""
40+
"""data blob, base64-encoded, optional at init. Allows pass in base64-encoded data directly or
41+
use either function like `data_from_text`, `data_from_json` to populate data"""
4642
data: Optional[str] = None
4743
"""Optional: metadata associated with this record; present only when Kinesis Stream is source"""
4844
metadata: Optional[KinesisFirehoseResponseRecordMetadata] = None
@@ -63,15 +59,14 @@ def data_from_json(self, data: Any):
6359
"""Populate data field using any structure that could be converted to json"""
6460
self.data_from_text(data=self.json_serializer(data))
6561

66-
@property
6762
def asdict(self) -> Dict:
6863
r: Dict[str, Any] = {
6964
"recordId": self.record_id,
7065
"result": self.result,
7166
"data": self.data,
7267
}
7368
if self.metadata:
74-
r["metadata"] = self.metadata.asdict
69+
r["metadata"] = self.metadata.asdict()
7570
return r
7671

7772
@property
@@ -115,11 +110,12 @@ def add_record(self, record: KinesisFirehoseResponseRecord):
115110
else:
116111
self.records = [record]
117112

118-
@property
119113
def asdict(self) -> Dict:
114+
# make sure return size is less than 6MB
120115
if not self.records:
121116
return {}
122-
return {"records": [r.asdict for r in self.records]}
117+
118+
return {"records": [r.asdict() for r in self.records]}
123119

124120

125121
class KinesisFirehoseRecordMetadata(DictWrapper):
@@ -200,6 +196,16 @@ def create_firehose_response_record(
200196
result: Literal["Ok", "Dropped", "ProcessingFailed"],
201197
data: Optional[str] = None,
202198
) -> KinesisFirehoseResponseRecord:
199+
"""create a KinesisFirehoseResponseRecord directly using the record_id and given values
200+
Parameters
201+
----------
202+
result : Literal["Ok", "Dropped", "ProcessingFailed"]
203+
processing result, supported value: Ok, Dropped, ProcessingFailed
204+
data : str, optional
205+
data blob, base64-encoded, optional at init. Allows pass in base64-encoded data directly or
206+
use either function like `data_from_text`, `data_from_json` to populate data
207+
208+
"""
203209
return KinesisFirehoseResponseRecord(record_id=self.record_id, result=result, data=data)
204210

205211

‎docs/utilities/data_classes.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -981,12 +981,23 @@ inline, and re-emit them back to the Delivery Stream.
981981
Similar to Kinesis Data Streams, the events contain base64 encoded data. You can use the helper
982982
function to access the data either as json or plain text, depending on the original payload.
983983

984+
When constructing response to Firehose, You can utilize the `KinesisFirehoseResponse` class shown
985+
in the example below.
986+
984987
=== "app.py"
985988

986989
```python
987990
--8<-- "examples/event_sources/src/kinesis_firehose_delivery_stream.py"
988991
```
989992

993+
You can also construct response without using `event_source` wrapper. Shown in the example below.
994+
995+
=== "app.py"
996+
997+
```python
998+
--8<-- "examples/event_sources/src/kinesis_firehose_response.py"
999+
```
1000+
9901001
### Lambda Function URL
9911002

9921003
=== "app.py"

‎examples/event_sources/src/kinesis_firehose_delivery_stream.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,13 @@ def lambda_handler(event: KinesisFirehoseEvent, context: LambdaContext):
1515
data = record.data_as_json
1616

1717
## do all kind of stuff with data
18+
## generate data to return
19+
new_data = {"tool_used": "powertools_dataclass", "original_payload": data}
1820

1921
processed_record = record.create_firehose_response_record(result="Ok")
20-
processed_record.data_from_json(data=data)
22+
processed_record.data_from_json(data=new_data)
2123

2224
result.add_record(processed_record)
2325

2426
# return transformed records
25-
return result.asdict
27+
return result.asdict()
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import base64
2+
3+
from aws_lambda_powertools.utilities.data_classes import (
4+
KinesisFirehoseResponse,
5+
KinesisFirehoseResponseRecord,
6+
)
7+
8+
9+
def lambda_handler(event, context):
10+
result = KinesisFirehoseResponse()
11+
12+
for record in event["records"]:
13+
print(record["recordId"])
14+
payload = base64.b64decode(record["data"]).decode("utf-8")
15+
## do all kind of stuff with payload
16+
## generate data to return
17+
new_data = {"tool_used": "powertools_dataclass", "original_payload": payload}
18+
19+
processed_record = KinesisFirehoseResponseRecord(record_id=record["recordId"], result="Ok")
20+
processed_record.data_from_text(data=new_data)
21+
result.add_record(processed_record)
22+
23+
# return transformed records
24+
return result.asdict()

‎tests/unit/data_classes/test_kinesis_firehose_response.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
KinesisFirehoseEvent,
33
KinesisFirehoseResponse,
44
KinesisFirehoseResponseRecord,
5+
KinesisFirehoseResponseRecordMetadata,
56
)
67
from tests.functional.utils import load_event
78

@@ -15,13 +16,15 @@ def test_kinesis_firehose_response():
1516
# if data was delivered as json; caches loaded value
1617
data = record.data_as_text
1718

19+
metadata_partition = KinesisFirehoseResponseRecordMetadata(partition_keys={"year": 2023})
1820
processed_record = KinesisFirehoseResponseRecord(
1921
record_id=record.record_id,
2022
result="Ok",
23+
metadata=metadata_partition,
2124
)
2225
processed_record.data_from_text(data=data)
2326
response.add_record(record=processed_record)
24-
response_dict = response.asdict
27+
response_dict = response.asdict()
2528

2629
res_records = list(response_dict["records"])
2730
assert len(res_records) == 2
@@ -30,6 +33,7 @@ def test_kinesis_firehose_response():
3033
assert record_01["result"] == "Ok"
3134
assert record_01["recordId"] == record01_raw["recordId"]
3235
assert record_01["data"] == record01_raw["data"]
36+
assert record_01["metadata"]["partitionKeys"]["year"] == 2023
3337

3438
assert response.records[0].data_as_bytes == b"Hello World"
3539
assert response.records[0].data_as_text == "Hello World"
@@ -49,7 +53,7 @@ def test_kinesis_firehose_create_response():
4953
)
5054
processed_record.data_from_text(data=data)
5155
response.add_record(record=processed_record)
52-
response_dict = response.asdict
56+
response_dict = response.asdict()
5357

5458
res_records = list(response_dict["records"])
5559
assert len(res_records) == 2

0 commit comments

Comments
 (0)
Please sign in to comment.