Skip to content

Commit 789b91b

Browse files
committed
document parallel processing
1 parent d9e809c commit 789b91b

File tree

1 file changed

+60
-29
lines changed

1 file changed

+60
-29
lines changed

docs/utilities/batch.md

+60-29
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ stateDiagram-v2
3030

3131
* Reports batch item failures to reduce number of retries for a record upon errors
3232
* Simple interface to process each batch record
33+
* Parallel processing of batches
3334
* Integrates with Java Events library and the deserialization module
3435
* Build your own batch processor by extending primitives
3536

@@ -110,16 +111,9 @@ You can use your preferred deployment framework to set the correct configuration
110111
while the `powertools-batch` module handles generating the response, which simply needs to be returned as the result of
111112
your Lambda handler.
112113

113-
A complete [Serverless Application Model](https://aws.amazon.com/serverless/sam/) example can be found
114-
[here](https://github.com/aws-powertools/powertools-lambda-java/tree/main/examples/powertools-examples-batch) covering
115-
all of the batch sources.
116-
117-
For more information on configuring `ReportBatchItemFailures`,
118-
see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting),
119-
[Kinesis](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-batchfailurereporting),and
120-
[DynamoDB Streams](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-batchfailurereporting).
121-
114+
A complete [Serverless Application Model](https://aws.amazon.com/serverless/sam/) example can be found [here](https://github.com/aws-powertools/powertools-lambda-java/tree/main/examples/powertools-examples-batch) covering all the batch sources.
122115

116+
For more information on configuring `ReportBatchItemFailures`, see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting), [Kinesis](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-batchfailurereporting), and [DynamoDB Streams](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-batchfailurereporting).
123117

124118

125119
!!! note "You do not need any additional IAM permissions to use this utility, except for what each event source requires."
@@ -150,12 +144,10 @@ see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.
150144
public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) {
151145
return handler.processBatch(sqsEvent, context);
152146
}
153-
154-
147+
155148
private void processMessage(Product p, Context c) {
156149
// Process the product
157150
}
158-
159151
}
160152
```
161153

@@ -276,7 +268,6 @@ see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.
276268
private void processMessage(Product p, Context c) {
277269
// process the product
278270
}
279-
280271
}
281272
```
282273

@@ -475,6 +466,46 @@ see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.
475466
}
476467
```
477468

469+
## Parallel processing
470+
You can choose to process batch items in parallel using the `BatchMessageHandler#processBatchInParallel()`
471+
instead of `BatchMessageHandler#processBatch()`. Partial batch failure works the same way but items are processed
472+
in parallel rather than sequentially.
473+
474+
This feature is available for SQS, Kinesis and DynamoDB Streams but cannot be
475+
used with SQS FIFO. In that case, items will be processed sequentially, even with the `processBatchInParallel` method.
476+
477+
!!! warning
478+
Note that parallel processing is not always better than sequential processing,
479+
and you should benchmark your code to determine the best approach for your use case.
480+
481+
!!! info
482+
To get more threads available (more vCPUs), you need to increase the amount of memory allocated to your Lambda function.
483+
While the exact vCPU allocation isn't published, from observing common patterns customers see an allocation of one vCPU per 1024 MB of memory.
484+
485+
=== "Example with SQS"
486+
487+
```java hl_lines="13"
488+
public class SqsBatchHandler implements RequestHandler<SQSEvent, SQSBatchResponse> {
489+
490+
private final BatchMessageHandler<SQSEvent, SQSBatchResponse> handler;
491+
492+
public SqsBatchHandler() {
493+
handler = new BatchMessageHandlerBuilder()
494+
.withSqsBatchHandler()
495+
.buildWithMessageHandler(this::processMessage, Product.class);
496+
}
497+
498+
@Override
499+
public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) {
500+
return handler.processBatchInParallel(sqsEvent, context);
501+
}
502+
503+
private void processMessage(Product p, Context c) {
504+
// Process the product
505+
}
506+
}
507+
```
508+
478509

479510
## Handling Messages
480511

@@ -490,7 +521,7 @@ In general, the deserialized message handler should be used unless you need acce
490521

491522
=== "Raw Message Handler"
492523

493-
```java
524+
```java hl_lines="4 7"
494525
public void setup() {
495526
BatchMessageHandler<SQSEvent, SQSBatchResponse> handler = new BatchMessageHandlerBuilder()
496527
.withSqsBatchHandler()
@@ -505,7 +536,7 @@ In general, the deserialized message handler should be used unless you need acce
505536

506537
=== "Deserialized Message Handler"
507538

508-
```java
539+
```java hl_lines="4 7"
509540
public void setup() {
510541
BatchMessageHandler<SQSEvent, SQSBatchResponse> handler = new BatchMessageHandlerBuilder()
511542
.withSqsBatchHandler()
@@ -529,20 +560,20 @@ provide a custom failure handler.
529560
Handlers can be provided when building the batch processor and are available for all event sources.
530561
For instance for DynamoDB:
531562

532-
```java
533-
BatchMessageHandler<DynamodbEvent, StreamsEventResponse> handler = new BatchMessageHandlerBuilder()
534-
.withDynamoDbBatchHandler()
535-
.withSuccessHandler((m) -> {
536-
// Success handler receives the raw message
537-
LOGGER.info("Message with sequenceNumber {} was successfully processed",
538-
m.getDynamodb().getSequenceNumber());
539-
})
540-
.withFailureHandler((m, e) -> {
541-
// Failure handler receives the raw message and the exception thrown.
542-
LOGGER.info("Message with sequenceNumber {} failed to be processed: {}"
543-
, e.getDynamodb().getSequenceNumber(), e);
544-
})
545-
.buildWithMessageHander(this::processMessage);
563+
```java hl_lines="3 8"
564+
BatchMessageHandler<DynamodbEvent, StreamsEventResponse> handler = new BatchMessageHandlerBuilder()
565+
.withDynamoDbBatchHandler()
566+
.withSuccessHandler((m) -> {
567+
// Success handler receives the raw message
568+
LOGGER.info("Message with sequenceNumber {} was successfully processed",
569+
m.getDynamodb().getSequenceNumber());
570+
})
571+
.withFailureHandler((m, e) -> {
572+
// Failure handler receives the raw message and the exception thrown.
573+
LOGGER.info("Message with sequenceNumber {} failed to be processed: {}"
574+
, e.getDynamodb().getSequenceNumber(), e);
575+
})
576+
.buildWithMessageHander(this::processMessage);
546577
```
547578

548579
!!! info

0 commit comments

Comments
 (0)