Skip to content

Commit 1b13512

Browse files
Adding documentation
1 parent a8d8c9d commit 1b13512

File tree

2 files changed

+57
-6
lines changed

2 files changed

+57
-6
lines changed

docs/utilities/batch.md

+34-6
Original file line numberDiff line numberDiff line change
@@ -143,12 +143,9 @@ Processing batches from SQS works in three stages:
143143

144144
When working with [SQS FIFO queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html){target="_blank"}, you should know that a batch may include messages from different group IDs.
145145

146-
By default, we will stop processing at the first failure and mark unprocessed messages as failed to preserve ordering.
146+
By default, we will stop processing at the first failure and mark unprocessed messages as failed to preserve ordering. However, this behavior may not be optimal for customers who wish to proceed with processing messages from a different group ID.
147147

148-
By default, message processing halts after the initial failure, returning all failed and unprocessed messages in `batchItemFailures` to preserve the ordering of messages in your queue. However, customers can opt to continue processing messages and retrieve failed messages within a message group ID by setting `return_on_first_error` to False.
149-
150-
???+ notice "Having problems with DLQ?"
151-
`AsyncBatchProcessor` uses `asyncio.gather`. This might cause [side effects and reach trace limits at high concurrency](../core/tracer.md#concurrent-asynchronous-functions){target="_blank"}.
148+
Enable the `skip_group_on_error` option for seamless processing of messages from various group IDs. This setup ensures that messages from a failed group ID are sent back to SQS, enabling uninterrupted processing of messages from the subsequent group ID.
152149

153150
=== "Recommended"
154151

@@ -170,6 +167,12 @@ By default, message processing halts after the initial failure, returning all fa
170167
--8<-- "examples/batch_processing/src/getting_started_sqs_fifo_decorator.py"
171168
```
172169

170+
=== "Enabling skip_group_on_error flag"
171+
172+
```python hl_lines="2-6 9 23"
173+
--8<-- "examples/batch_processing/src/getting_started_sqs_fifo_skip_on_error.py"
174+
```
175+
173176
### Processing messages from Kinesis
174177

175178
Processing batches from Kinesis works in three stages:
@@ -317,7 +320,7 @@ sequenceDiagram
317320

318321
> Read more about [Batch Failure Reporting feature in AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting){target="_blank"}.
319322
320-
Sequence diagram to explain how [`SqsFifoPartialProcessor` works](#fifo-queues) with SQS FIFO queues.
323+
Sequence diagram to explain how [`SqsFifoPartialProcessor` works](#fifo-queues) with SQS FIFO queues without `skip_group_on_error` flag.
321324

322325
<center>
323326
```mermaid
@@ -341,6 +344,31 @@ sequenceDiagram
341344
<i>SQS FIFO mechanism with Batch Item Failures</i>
342345
</center>
343346

347+
Sequence diagram to explain how [`SqsFifoPartialProcessor` works](#fifo-queues) with SQS FIFO queues with `skip_group_on_error` flag.
348+
349+
<center>
350+
```mermaid
351+
sequenceDiagram
352+
autonumber
353+
participant SQS queue
354+
participant Lambda service
355+
participant Lambda function
356+
Lambda service->>SQS queue: Poll
357+
Lambda service->>Lambda function: Invoke (batch event)
358+
activate Lambda function
359+
Lambda function-->Lambda function: Process 2 out of 10 batch items
360+
Lambda function--xLambda function: Fail on 3rd batch item
361+
Lambda function-->Lambda function: Process messages from another MessageGroupID
362+
Lambda function->>Lambda service: Report 3rd batch item and all messages within the same MessageGroupID as failure
363+
deactivate Lambda function
364+
activate SQS queue
365+
Lambda service->>SQS queue: Delete successful messages processed
366+
SQS queue-->>SQS queue: Failed messages return
367+
deactivate SQS queue
368+
```
369+
<i>SQS FIFO mechanism with Batch Item Failures</i>
370+
</center>
371+
344372
#### Kinesis and DynamoDB Streams
345373

346374
> Read more about [Batch Failure Reporting feature](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-batchfailurereporting){target="_blank"}.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from aws_lambda_powertools import Logger, Tracer
2+
from aws_lambda_powertools.utilities.batch import (
3+
SqsFifoPartialProcessor,
4+
process_partial_response,
5+
)
6+
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
7+
from aws_lambda_powertools.utilities.typing import LambdaContext
8+
9+
processor = SqsFifoPartialProcessor(skip_group_on_error=True)
10+
tracer = Tracer()
11+
logger = Logger()
12+
13+
14+
@tracer.capture_method
15+
def record_handler(record: SQSRecord):
16+
payload: str = record.json_body # if json string data, otherwise record.body for str
17+
logger.info(payload)
18+
19+
20+
@logger.inject_lambda_context
21+
@tracer.capture_lambda_handler
22+
def lambda_handler(event, context: LambdaContext):
23+
return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context)

0 commit comments

Comments
 (0)