-
Notifications
You must be signed in to change notification settings - Fork 421
/
Copy pathextending_processor_handlers.py
44 lines (34 loc) · 1.52 KB
/
extending_processor_handlers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import json
from typing import Any, Dict
from aws_lambda_powertools import Logger, Metrics, Tracer
from aws_lambda_powertools.metrics import MetricUnit
from aws_lambda_powertools.utilities.batch import (
BatchProcessor,
EventType,
ExceptionInfo,
FailureResponse,
process_partial_response,
)
from aws_lambda_powertools.utilities.batch.base import SuccessResponse
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
from aws_lambda_powertools.utilities.typing import LambdaContext
class MyProcessor(BatchProcessor):
def success_handler(self, record: Dict[str, Any], result: Any) -> SuccessResponse:
metrics.add_metric(name="BatchRecordSuccesses", unit=MetricUnit.Count, value=1)
return super().success_handler(record, result)
def failure_handler(self, record: SQSRecord, exception: ExceptionInfo) -> FailureResponse:
metrics.add_metric(name="BatchRecordFailures", unit=MetricUnit.Count, value=1)
return super().failure_handler(record, exception)
processor = MyProcessor(event_type=EventType.SQS)
metrics = Metrics(namespace="test")
logger = Logger()
tracer = Tracer()
@tracer.capture_method
def record_handler(record: SQSRecord):
payload: str = record.body
if payload:
item: dict = json.loads(payload)
logger.info(item)
@metrics.log_metrics(capture_cold_start_metric=True)
def lambda_handler(event, context: LambdaContext):
return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context)