1
1
import base64
2
2
import json
3
- from dataclasses import dataclass
3
+ from dataclasses import dataclass , field
4
4
from typing import Any , Callable , Dict , Iterator , List , Optional
5
5
6
6
from typing_extensions import Literal
9
9
10
10
11
11
@dataclass
12
- class KinesisFirehoseResponseRecordMetadata :
12
+ class KinesisFirehoseDataTransformationRecordMetadata :
13
13
"""
14
14
Documentation:
15
15
--------------
@@ -18,14 +18,14 @@ class KinesisFirehoseResponseRecordMetadata:
18
18
19
19
partition_keys : Optional [Dict [str , str ]]
20
20
21
- def asdict (self ) -> Optional [ Dict ] :
21
+ def asdict (self ) -> Dict :
22
22
if self .partition_keys is not None :
23
23
return {"partitionKeys" : self .partition_keys }
24
- return None
24
+ return {}
25
25
26
26
27
27
@dataclass
28
- class KinesisFirehoseResponseRecord :
28
+ class KinesisFirehoseDataTransformationRecord :
29
29
"""Record in Kinesis Data Firehose response object
30
30
31
31
Documentation:
@@ -36,13 +36,13 @@ class KinesisFirehoseResponseRecord:
36
36
# Record ID; uniquely identifies this record within the current batch"""
37
37
record_id : str
38
38
# Processing result, supported value: Ok, Dropped, ProcessingFailed"""
39
- result : Literal ["Ok" , "Dropped" , "ProcessingFailed" ]
39
+ result : Literal ["Ok" , "Dropped" , "ProcessingFailed" ] = "Ok"
40
40
# data blob, base64-encoded, optional at init. Allows pass in base64-encoded data directly or
41
41
# use either function like `data_from_text`, `data_from_json` to populate data"""
42
42
data : Optional [str ] = None
43
43
# Optional: Metadata associated with this record; can contain partition keys
44
44
# See - https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
45
- metadata : Optional [KinesisFirehoseResponseRecordMetadata ] = None
45
+ metadata : Optional [KinesisFirehoseDataTransformationRecordMetadata ] = None
46
46
_json_data : Optional [Any ] = None
47
47
json_serializer : Callable = json .dumps
48
48
json_deserializer : Callable = json .loads
@@ -70,31 +70,31 @@ def asdict(self) -> Dict:
70
70
return r
71
71
72
72
@property
73
- def data_as_bytes (self ) -> Optional [ bytes ] :
73
+ def data_as_bytes (self ) -> bytes :
74
74
"""Decoded base64-encoded data as bytes"""
75
75
if not self .data :
76
- return None
76
+ return b""
77
77
return base64 .b64decode (self .data )
78
78
79
79
@property
80
- def data_as_text (self ) -> Optional [ str ] :
80
+ def data_as_text (self ) -> str :
81
81
"""Decoded base64-encoded data as text"""
82
- if not self .data_as_bytes :
83
- return None
82
+ if not self .data :
83
+ return ""
84
84
return self .data_as_bytes .decode ("utf-8" )
85
85
86
86
@property
87
- def data_as_json (self ) -> Optional [ Dict ] :
87
+ def data_as_json (self ) -> Dict :
88
88
"""Decoded base64-encoded data loaded to json"""
89
- if not self .data_as_text :
90
- return None
89
+ if not self .data :
90
+ return {}
91
91
if self ._json_data is None :
92
92
self ._json_data = self .json_deserializer (self .data_as_text )
93
93
return self ._json_data
94
94
95
95
96
96
@dataclass
97
- class KinesisFirehoseResponse :
97
+ class KinesisFirehoseDataTransformationResponse :
98
98
"""Kinesis Data Firehose response object
99
99
100
100
Documentation:
@@ -108,13 +108,10 @@ class KinesisFirehoseResponse:
108
108
optional parameter at start. can be added later using `add_record` function.
109
109
"""
110
110
111
- records : Optional [ List [KinesisFirehoseResponseRecord ]] = None
111
+ records : List [KinesisFirehoseDataTransformationRecord ] = field ( default_factory = list )
112
112
113
- def add_record (self , record : KinesisFirehoseResponseRecord ):
114
- if self .records :
115
- self .records .append (record )
116
- else :
117
- self .records = [record ]
113
+ def add_record (self , record : KinesisFirehoseDataTransformationRecord ):
114
+ self .records .append (record )
118
115
119
116
def asdict (self ) -> Dict :
120
117
if not self .records :
@@ -196,12 +193,12 @@ def data_as_json(self) -> dict:
196
193
self ._json_data = self ._json_deserializer (self .data_as_text )
197
194
return self ._json_data
198
195
199
- def create_firehose_response_record (
196
+ def build_data_transformation_response (
200
197
self ,
201
- result : Literal ["Ok" , "Dropped" , "ProcessingFailed" ],
198
+ result : Literal ["Ok" , "Dropped" , "ProcessingFailed" ] = "Ok" ,
202
199
data : Optional [str ] = None ,
203
- metadata : Optional [KinesisFirehoseResponseRecordMetadata ] = None ,
204
- ) -> KinesisFirehoseResponseRecord :
200
+ metadata : Optional [KinesisFirehoseDataTransformationRecordMetadata ] = None ,
201
+ ) -> KinesisFirehoseDataTransformationRecord :
205
202
"""create a KinesisFirehoseResponseRecord directly using the record_id and given values
206
203
Parameters
207
204
----------
@@ -214,7 +211,12 @@ def create_firehose_response_record(
214
211
Metadata associated with this record; can contain partition keys
215
212
- https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
216
213
"""
217
- return KinesisFirehoseResponseRecord (record_id = self .record_id , result = result , data = data , metadata = metadata )
214
+ return KinesisFirehoseDataTransformationRecord (
215
+ record_id = self .record_id ,
216
+ result = result ,
217
+ data = data ,
218
+ metadata = metadata ,
219
+ )
218
220
219
221
220
222
class KinesisFirehoseEvent (DictWrapper ):
0 commit comments