Skip to content

Commit d1fc1c5

Browse files
committed
add drop example
1 parent f2d5e63 commit d1fc1c5

File tree

5 files changed

+61
-59
lines changed

5 files changed

+61
-59
lines changed

docs/utilities/data_classes.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -989,10 +989,6 @@ in the example below.
989989
```python
990990
--8<-- "examples/event_sources/src/kinesis_firehose_delivery_stream.py"
991991
```
992-
=== "with Failure"
993-
```python hl_lines="25"
994-
--8<-- "examples/event_sources/src/kinesis_firehose_delivery_stream_failed.py"
995-
```
996992

997993
You can also construct response without using `event_source` wrapper. Shown in the example below.
998994

@@ -1001,10 +997,14 @@ You can also construct response without using `event_source` wrapper. Shown in t
1001997
```python
1002998
--8<-- "examples/event_sources/src/kinesis_firehose_response.py"
1003999
```
1004-
=== "with Exception"
1005-
```python hl_lines="26"
1000+
=== "with Failure"
1001+
```python hl_lines="30"
10061002
--8<-- "examples/event_sources/src/kinesis_firehose_response_exception.py"
10071003
```
1004+
=== "with Dropped"
1005+
```python hl_lines="30"
1006+
--8<-- "examples/event_sources/src/kinesis_firehose_response_drop.py"
1007+
```
10081008

10091009
### Lambda Function URL
10101010

examples/event_sources/src/kinesis_firehose_delivery_stream_failed.py

Lines changed: 0 additions & 31 deletions
This file was deleted.

examples/event_sources/src/kinesis_firehose_response.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,27 @@
1-
import base64
2-
31
from aws_lambda_powertools.utilities.data_classes import (
42
KinesisFirehoseDataTransformationRecord,
53
KinesisFirehoseDataTransformationResponse,
4+
KinesisFirehoseEvent,
65
)
76
from aws_lambda_powertools.utilities.serialization import base64_from_json
87
from aws_lambda_powertools.utilities.typing import LambdaContext
98

109

1110
def lambda_handler(event: dict, context: LambdaContext):
11+
firehose_event = KinesisFirehoseEvent(event)
1212
result = KinesisFirehoseDataTransformationResponse()
1313

14-
for record in event["records"]:
15-
print(record["recordId"])
16-
payload = base64.b64decode(record["data"]).decode("utf-8")
17-
## do all kind of stuff with payload
14+
for record in firehose_event.records:
15+
payload = record.data_as_text # base64 decoded data as str
16+
1817
## generate data to return
1918
transformed_data = {"tool_used": "powertools_dataclass", "original_payload": payload}
2019

2120
processed_record = KinesisFirehoseDataTransformationRecord(
22-
record_id=record["recordId"],
21+
record_id=record.record_id,
2322
data=base64_from_json(transformed_data),
2423
)
24+
2525
result.add_record(processed_record)
2626

2727
# return transformed records
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
from aws_lambda_powertools.utilities.data_classes import (
2+
KinesisFirehoseDataTransformationRecord,
3+
KinesisFirehoseDataTransformationResponse,
4+
KinesisFirehoseEvent,
5+
)
6+
from aws_lambda_powertools.utilities.serialization import base64_from_json
7+
from aws_lambda_powertools.utilities.typing import LambdaContext
8+
9+
10+
def lambda_handler(event: dict, context: LambdaContext):
11+
firehose_event = KinesisFirehoseEvent(event)
12+
result = KinesisFirehoseDataTransformationResponse()
13+
14+
for record in firehose_event.records:
15+
try:
16+
payload = record.data_as_text # base64 decoded data as str
17+
## do all kind of stuff with payload
18+
## generate data to return
19+
transformed_data = {"tool_used": "powertools_dataclass", "original_payload": payload}
20+
# Default result is Ok
21+
processed_record = KinesisFirehoseDataTransformationRecord(
22+
record_id=record.record_id,
23+
data=base64_from_json(transformed_data),
24+
)
25+
except Exception:
26+
# encountered failure that couldn't be fixed by retry
27+
processed_record = KinesisFirehoseDataTransformationRecord(
28+
record_id=record.record_id,
29+
data=record.data,
30+
result="Dropped",
31+
)
32+
33+
result.add_record(processed_record)
34+
35+
# return transformed records
36+
return result.asdict()

examples/event_sources/src/kinesis_firehose_response_exception.py

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,35 @@
1-
import base64
2-
31
from aws_lambda_powertools.utilities.data_classes import (
42
KinesisFirehoseDataTransformationRecord,
53
KinesisFirehoseDataTransformationResponse,
4+
KinesisFirehoseEvent,
65
)
76
from aws_lambda_powertools.utilities.serialization import base64_from_json
87
from aws_lambda_powertools.utilities.typing import LambdaContext
98

109

1110
def lambda_handler(event: dict, context: LambdaContext):
11+
firehose_event = KinesisFirehoseEvent(event)
1212
result = KinesisFirehoseDataTransformationResponse()
1313

14-
for record in event["records"]:
15-
print(record["recordId"])
14+
for record in firehose_event.records:
1615
try:
17-
payload = base64.b64decode(record["data"]).decode("utf-8")
16+
payload = record.data_as_text # base64 decoded data as str
1817
## do all kind of stuff with payload
1918
## generate data to return
2019
transformed_data = {"tool_used": "powertools_dataclass", "original_payload": payload}
21-
except Exception:
22-
# add Failed result to processing results and send back to kinesis
20+
# Default result is Ok
2321
processed_record = KinesisFirehoseDataTransformationRecord(
24-
record_id=record["recordId"],
22+
record_id=record.record_id,
2523
data=base64_from_json(transformed_data),
24+
)
25+
except Exception:
26+
# add Failed result to processing results, send back to kinesis for retry
27+
processed_record = KinesisFirehoseDataTransformationRecord(
28+
record_id=record.record_id,
29+
data=record.data,
2630
result="ProcessingFailed",
2731
)
28-
result.add_record(processed_record)
29-
continue
3032

31-
# Default result is Ok
32-
processed_record = KinesisFirehoseDataTransformationRecord(
33-
record_id=record["recordId"],
34-
data=base64_from_json(transformed_data),
35-
)
3633
result.add_record(processed_record)
3734

3835
# return transformed records

0 commit comments

Comments
 (0)