|
| 1 | +from json import JSONDecodeError |
| 2 | +from typing import Dict |
| 3 | + |
1 | 4 | from aws_lambda_powertools.utilities.data_classes import (
|
2 | 5 | KinesisFirehoseDataTransformationRecord,
|
3 | 6 | KinesisFirehoseDataTransformationResponse,
|
4 | 7 | KinesisFirehoseEvent,
|
| 8 | + event_source, |
5 | 9 | )
|
6 | 10 | from aws_lambda_powertools.utilities.serialization import base64_from_json
|
7 | 11 | from aws_lambda_powertools.utilities.typing import LambdaContext
|
8 | 12 |
|
9 | 13 |
|
10 |
| -def lambda_handler(event: dict, context: LambdaContext): |
11 |
| - firehose_event = KinesisFirehoseEvent(event) |
| 14 | +@event_source(data_class=KinesisFirehoseEvent) |
| 15 | +def lambda_handler(event: KinesisFirehoseEvent, context: LambdaContext): |
12 | 16 | result = KinesisFirehoseDataTransformationResponse()
|
13 | 17 |
|
14 |
| - for record in firehose_event.records: |
| 18 | + for record in event.records: |
15 | 19 | try:
|
16 |
| - payload = record.data_as_text # base64 decoded data as str |
17 |
| - ## do all kind of stuff with payload |
| 20 | + payload: Dict = record.data_as_json # decodes and deserialize base64 JSON string |
| 21 | + |
18 | 22 | ## generate data to return
|
19 | 23 | transformed_data = {"tool_used": "powertools_dataclass", "original_payload": payload}
|
20 |
| - # Default result is Ok |
| 24 | + |
21 | 25 | processed_record = KinesisFirehoseDataTransformationRecord(
|
22 | 26 | record_id=record.record_id,
|
23 | 27 | data=base64_from_json(transformed_data),
|
24 | 28 | )
|
25 |
| - except Exception: |
26 |
| - # encountered failure that couldn't be fixed by retry |
| 29 | + except JSONDecodeError: # (1)! |
| 30 | + # our producers ingest JSON payloads only; drop malformed records from the stream |
27 | 31 | processed_record = KinesisFirehoseDataTransformationRecord(
|
28 | 32 | record_id=record.record_id,
|
29 | 33 | data=record.data,
|
|
0 commit comments