Skip to content

docs(batch): new visuals and error handling section #2857

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
3 changes: 3 additions & 0 deletions .gitleaksignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
examples/batch_processing/src/context_manager_access_output_pydantic.txt:aws-access-token:10
examples/batch_processing/src/context_manager_access_output_pydantic.txt:aws-access-token:15
examples/batch_processing/src/context_manager_access_output.txt:aws-access-token:10
274 changes: 248 additions & 26 deletions docs/utilities/batch.md

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions examples/batch_processing/src/context_manager_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ def record_handler(record: SQSRecord):
@logger.inject_lambda_context
@tracer.capture_lambda_handler
def lambda_handler(event, context: LambdaContext):
batch = event["Records"]
batch = event["Records"] # (1)!
with processor(records=batch, handler=record_handler):
processed_messages: List[Tuple] = processor.process()

for message in processed_messages:
status: Literal["success"] | Literal["fail"] = message[0]
cause: str = message[1] # (2)!
record: SQSRecord = message[2]

logger.info(status, record=record)
logger.info(status, record=record, cause=cause)

return processor.response()
12 changes: 12 additions & 0 deletions examples/batch_processing/src/context_manager_access_output.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[
(
"fail",
"<class 'Exception': Failed to process record", # (1)!
<aws_lambda_powertools.utilities.data_classes.sqs_event.SQSRecord object at 0x103c590a0>
),
(
"success",
"success",
{'messageId': '88891c36-32eb-4a25-9905-654a32916893', 'receiptHandle': 'AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a', 'body': 'success', 'attributes': {'ApproximateReceiveCount': '1', 'SentTimestamp': '1545082649183', 'SenderId': 'AIDAIENQZJOLO23YVJ4VO', 'ApproximateFirstReceiveTimestamp': '1545082649185'}, 'messageAttributes': {}, 'md5OfBody': 'e4e68fb7bd0e697a0ae8f1bb342846b3', 'eventSource': 'aws:sqs', 'eventSourceARN': 'arn:aws:sqs:us-east-2:123456789012:my-queue', 'awsRegion': 'us-east-1'}
)
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[
(
"fail", # (1)!
"<class 'pydantic.error_wrappers.ValidationError'>:1 validation error for OrderSqs\nbody\n JSON object must be str, bytes or bytearray (type=type_error.json)",
<aws_lambda_powertools.utilities.data_classes.sqs_event.SQSRecord object at 0x103c590a0>
),
(
"success",
"success",
{'messageId': '88891c36-32eb-4a25-9905-654a32916893', 'receiptHandle': 'AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a', 'body': 'success', 'attributes': {'ApproximateReceiveCount': '1', 'SentTimestamp': '1545082649183', 'SenderId': 'AIDAIENQZJOLO23YVJ4VO', 'ApproximateFirstReceiveTimestamp': '1545082649185'}, 'messageAttributes': {}, 'md5OfBody': 'e4e68fb7bd0e697a0ae8f1bb342846b3', 'eventSource': 'aws:sqs', 'eventSourceARN': 'arn:aws:sqs:us-east-2:123456789012:my-queue', 'awsRegion': 'us-east-1'}
),
(
"fail", # (2)!
"<class 'Exception'>:Failed to process record.",
OrderSqs(messageId='9d0bfba5-d213-4b64-89bd-f4fbd7e58358', receiptHandle='AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a', body=Order(item={'type': 'fail'}), attributes=SqsAttributesModel(ApproximateReceiveCount='1', ApproximateFirstReceiveTimestamp=datetime.datetime(2018, 12, 17, 21, 37, 29, 185000, tzinfo=datetime.timezone.utc), MessageDeduplicationId=None, MessageGroupId=None, SenderId='AIDAIENQZJOLO23YVJ4VO', SentTimestamp=datetime.datetime(2018, 12, 17, 21, 37, 29, 183000, tzinfo=datetime.timezone.utc), SequenceNumber=None, AWSTraceHeader=None), messageAttributes={}, md5OfBody='e4e68fb7bd0e697a0ae8f1bb342846b3', md5OfMessageAttributes=None, eventSource='aws:sqs', eventSourceARN='arn:aws:sqs:us-east-2:123456789012:my-queue', awsRegion='us-east-1')
)
]
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
)
from aws_lambda_powertools.utilities.typing import LambdaContext

processor = BatchProcessor(event_type=EventType.DynamoDBStreams)
processor = BatchProcessor(event_type=EventType.DynamoDBStreams) # (1)!
tracer = Tracer()
logger = Logger()

Expand Down
35 changes: 35 additions & 0 deletions examples/batch_processing/src/getting_started_error_handling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import (
BatchProcessor,
EventType,
process_partial_response,
)
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
from aws_lambda_powertools.utilities.typing import LambdaContext

processor = BatchProcessor(event_type=EventType.SQS)
tracer = Tracer()
logger = Logger()


class InvalidPayload(Exception):
...


@tracer.capture_method
def record_handler(record: SQSRecord):
payload: str = record.body
logger.info(payload)
if not payload:
raise InvalidPayload("Payload does not contain minimum information to be processed.") # (1)!


@logger.inject_lambda_context
@tracer.capture_lambda_handler
def lambda_handler(event, context: LambdaContext):
return process_partial_response( # (2)!
event=event,
record_handler=record_handler,
processor=processor,
context=context,
)
2 changes: 1 addition & 1 deletion examples/batch_processing/src/getting_started_kinesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
)
from aws_lambda_powertools.utilities.typing import LambdaContext

processor = BatchProcessor(event_type=EventType.KinesisDataStreams)
processor = BatchProcessor(event_type=EventType.KinesisDataStreams) # (1)!
tracer = Tracer()
logger = Logger()

Expand Down
19 changes: 10 additions & 9 deletions examples/batch_processing/src/getting_started_sqs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import json

from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import (
BatchProcessor,
Expand All @@ -9,20 +7,23 @@
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
from aws_lambda_powertools.utilities.typing import LambdaContext

processor = BatchProcessor(event_type=EventType.SQS)
processor = BatchProcessor(event_type=EventType.SQS) # (1)!
tracer = Tracer()
logger = Logger()


@tracer.capture_method
def record_handler(record: SQSRecord):
payload: str = record.body
if payload:
item: dict = json.loads(payload)
logger.info(item)
def record_handler(record: SQSRecord): # (2)!
payload: str = record.json_body # if json string data, otherwise record.body for str
logger.info(payload)


@logger.inject_lambda_context
@tracer.capture_lambda_handler
def lambda_handler(event, context: LambdaContext):
return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context)
return process_partial_response( # (3)!
event=event,
record_handler=record_handler,
processor=processor,
context=context,
)
10 changes: 3 additions & 7 deletions examples/batch_processing/src/getting_started_sqs_fifo.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import json

from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import (
SqsFifoPartialProcessor,
Expand All @@ -8,17 +6,15 @@
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
from aws_lambda_powertools.utilities.typing import LambdaContext

processor = SqsFifoPartialProcessor()
processor = SqsFifoPartialProcessor() # (1)!
tracer = Tracer()
logger = Logger()


@tracer.capture_method
def record_handler(record: SQSRecord):
payload: str = record.body
if payload:
item: dict = json.loads(payload)
logger.info(item)
payload: str = record.json_body # if json string data, otherwise record.body for str
logger.info(payload)


@logger.inject_lambda_context
Expand Down