Skip to content

Commit 7dc7c34

Browse files
committed
implement parallel processing
1 parent daec77d commit 7dc7c34

File tree

6 files changed

+323
-104
lines changed

6 files changed

+323
-104
lines changed

powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/BatchMessageHandler.java

+16-1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,21 @@ public interface BatchMessageHandler<E, R> {
3333
* @param context The lambda context
3434
* @return A partial batch response
3535
*/
36-
public abstract R processBatch(E event, Context context);
36+
R processBatch(E event, Context context);
3737

38+
/**
39+
* Processes the given batch in parallel returning a partial batch
40+
* response indicating the success and failure of individual
41+
* messages within the batch. <br/>
42+
* Note that parallel processing is not always better than sequential processing,
43+
* and you should benchmark your code to determine the best approach for your use case. <br/>
44+
* Also note that to get more threads available (more vCPUs),
45+
* you need to increase the amount of memory allocated to your Lambda function. <br/>
46+
47+
*
48+
* @param event The Lambda event containing the batch to process
49+
* @param context The lambda context
50+
* @return A partial batch response
51+
*/
52+
R processBatchInParallel(E event, Context context);
3853
}

powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/DynamoDbBatchMessageHandler.java

+53-25
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,13 @@
1919
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
2020
import java.util.ArrayList;
2121
import java.util.List;
22+
import java.util.Optional;
2223
import java.util.function.BiConsumer;
2324
import java.util.function.Consumer;
25+
import java.util.stream.Collectors;
2426
import org.slf4j.Logger;
2527
import org.slf4j.LoggerFactory;
28+
import software.amazon.lambda.powertools.batch.internal.MultiThreadMDC;
2629

2730
/**
2831
* A batch message processor for DynamoDB Streams batches.
@@ -46,35 +49,60 @@ public DynamoDbBatchMessageHandler(Consumer<DynamodbEvent.DynamodbStreamRecord>
4649

4750
@Override
4851
public StreamsEventResponse processBatch(DynamodbEvent event, Context context) {
49-
List<StreamsEventResponse.BatchItemFailure> batchFailures = new ArrayList<>();
52+
StreamsEventResponse response = StreamsEventResponse.builder().withBatchItemFailures(new ArrayList<>()).build();
5053

51-
for (DynamodbEvent.DynamodbStreamRecord record : event.getRecords()) {
52-
try {
54+
for (DynamodbEvent.DynamodbStreamRecord streamRecord : event.getRecords()) {
55+
processBatchItem(streamRecord, context).ifPresent(batchItemFailure -> response.getBatchItemFailures().add(batchItemFailure));
56+
}
5357

54-
rawMessageHandler.accept(record, context);
55-
// Report success if we have a handler
56-
if (this.successHandler != null) {
57-
this.successHandler.accept(record);
58-
}
59-
} catch (Throwable t) {
60-
String sequenceNumber = record.getDynamodb().getSequenceNumber();
61-
LOGGER.error("Error while processing record with id {}: {}, adding it to batch item failures",
62-
sequenceNumber, t.getMessage());
63-
LOGGER.error("Error was", t);
64-
batchFailures.add(new StreamsEventResponse.BatchItemFailure(sequenceNumber));
65-
66-
// Report failure if we have a handler
67-
if (this.failureHandler != null) {
68-
// A failing failure handler is no reason to fail the batch
69-
try {
70-
this.failureHandler.accept(record, t);
71-
} catch (Throwable t2) {
72-
LOGGER.warn("failureHandler threw handling failure", t2);
73-
}
58+
return response;
59+
}
60+
61+
62+
@Override
63+
public StreamsEventResponse processBatchInParallel(DynamodbEvent event, Context context) {
64+
MultiThreadMDC multiThreadMDC = new MultiThreadMDC();
65+
66+
List<StreamsEventResponse.BatchItemFailure> batchItemFailures = event.getRecords()
67+
.parallelStream() // Parallel processing
68+
.map(eventRecord -> {
69+
multiThreadMDC.copyMDCToThread(Thread.currentThread().getName());
70+
return processBatchItem(eventRecord, context);
71+
})
72+
.filter(Optional::isPresent)
73+
.map(Optional::get)
74+
.collect(Collectors.toList());
75+
76+
return StreamsEventResponse.builder().withBatchItemFailures(batchItemFailures).build();
77+
}
78+
79+
private Optional<StreamsEventResponse.BatchItemFailure> processBatchItem(DynamodbEvent.DynamodbStreamRecord streamRecord, Context context) {
80+
try {
81+
LOGGER.debug("Processing item {}", streamRecord.getEventID());
82+
83+
rawMessageHandler.accept(streamRecord, context);
84+
85+
// Report success if we have a handler
86+
if (this.successHandler != null) {
87+
this.successHandler.accept(streamRecord);
88+
}
89+
return Optional.empty();
90+
} catch (Throwable t) {
91+
String sequenceNumber = streamRecord.getDynamodb().getSequenceNumber();
92+
LOGGER.error("Error while processing record with id {}: {}, adding it to batch item failures",
93+
sequenceNumber, t.getMessage());
94+
LOGGER.error("Error was", t);
95+
96+
// Report failure if we have a handler
97+
if (this.failureHandler != null) {
98+
// A failing failure handler is no reason to fail the batch
99+
try {
100+
this.failureHandler.accept(streamRecord, t);
101+
} catch (Throwable t2) {
102+
LOGGER.warn("failureHandler threw handling failure", t2);
74103
}
75104
}
105+
return Optional.of(StreamsEventResponse.BatchItemFailure.builder().withItemIdentifier(sequenceNumber).build());
76106
}
77-
78-
return new StreamsEventResponse(batchFailures);
79107
}
80108
}

powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/KinesisStreamsBatchMessageHandler.java

+59-32
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,13 @@
2020
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
2121
import java.util.ArrayList;
2222
import java.util.List;
23+
import java.util.Optional;
2324
import java.util.function.BiConsumer;
2425
import java.util.function.Consumer;
26+
import java.util.stream.Collectors;
2527
import org.slf4j.Logger;
2628
import org.slf4j.LoggerFactory;
29+
import software.amazon.lambda.powertools.batch.internal.MultiThreadMDC;
2730
import software.amazon.lambda.powertools.utilities.EventDeserializer;
2831

2932
/**
@@ -57,42 +60,66 @@ public KinesisStreamsBatchMessageHandler(BiConsumer<KinesisEvent.KinesisEventRec
5760

5861
@Override
5962
public StreamsEventResponse processBatch(KinesisEvent event, Context context) {
60-
List<StreamsEventResponse.BatchItemFailure> batchFailures = new ArrayList<>();
61-
62-
for (KinesisEvent.KinesisEventRecord record : event.getRecords()) {
63-
try {
64-
if (this.rawMessageHandler != null) {
65-
rawMessageHandler.accept(record, context);
66-
} else {
67-
M messageDeserialized = EventDeserializer.extractDataFrom(record).as(messageClass);
68-
messageHandler.accept(messageDeserialized, context);
69-
}
63+
StreamsEventResponse response = StreamsEventResponse.builder().withBatchItemFailures(new ArrayList<>()).build();
7064

71-
// Report success if we have a handler
72-
if (this.successHandler != null) {
73-
this.successHandler.accept(record);
74-
}
75-
} catch (Throwable t) {
76-
String sequenceNumber = record.getEventID();
77-
LOGGER.error("Error while processing record with eventID {}: {}, adding it to batch item failures",
78-
sequenceNumber, t.getMessage());
79-
LOGGER.error("Error was", t);
80-
81-
batchFailures.add(new StreamsEventResponse.BatchItemFailure(record.getKinesis().getSequenceNumber()));
82-
83-
// Report failure if we have a handler
84-
if (this.failureHandler != null) {
85-
// A failing failure handler is no reason to fail the batch
86-
try {
87-
this.failureHandler.accept(record, t);
88-
} catch (Throwable t2) {
89-
LOGGER.warn("failureHandler threw handling failure", t2);
90-
}
65+
for (KinesisEvent.KinesisEventRecord eventRecord : event.getRecords()) {
66+
processBatchItem(eventRecord, context).ifPresent(batchItemFailure -> response.getBatchItemFailures().add(batchItemFailure));
67+
}
68+
69+
return response;
70+
}
71+
72+
@Override
73+
public StreamsEventResponse processBatchInParallel(KinesisEvent event, Context context) {
74+
MultiThreadMDC multiThreadMDC = new MultiThreadMDC();
75+
76+
List<StreamsEventResponse.BatchItemFailure> batchItemFailures = event.getRecords()
77+
.parallelStream() // Parallel processing
78+
.map(eventRecord -> {
79+
multiThreadMDC.copyMDCToThread(Thread.currentThread().getName());
80+
return processBatchItem(eventRecord, context);
81+
})
82+
.filter(Optional::isPresent)
83+
.map(Optional::get)
84+
.collect(Collectors.toList());
85+
86+
return StreamsEventResponse.builder().withBatchItemFailures(batchItemFailures).build();
87+
}
88+
89+
private Optional<StreamsEventResponse.BatchItemFailure> processBatchItem(KinesisEvent.KinesisEventRecord eventRecord, Context context) {
90+
try {
91+
LOGGER.debug("Processing item {}", eventRecord.getEventID());
92+
93+
if (this.rawMessageHandler != null) {
94+
rawMessageHandler.accept(eventRecord, context);
95+
} else {
96+
M messageDeserialized = EventDeserializer.extractDataFrom(eventRecord).as(messageClass);
97+
messageHandler.accept(messageDeserialized, context);
98+
}
99+
100+
// Report success if we have a handler
101+
if (this.successHandler != null) {
102+
this.successHandler.accept(eventRecord);
103+
}
104+
return Optional.empty();
105+
} catch (Throwable t) {
106+
String sequenceNumber = eventRecord.getEventID();
107+
LOGGER.error("Error while processing record with eventID {}: {}, adding it to batch item failures",
108+
sequenceNumber, t.getMessage());
109+
LOGGER.error("Error was", t);
110+
111+
// Report failure if we have a handler
112+
if (this.failureHandler != null) {
113+
// A failing failure handler is no reason to fail the batch
114+
try {
115+
this.failureHandler.accept(eventRecord, t);
116+
} catch (Throwable t2) {
117+
LOGGER.warn("failureHandler threw handling failure", t2);
91118
}
92119
}
93-
}
94120

95-
return new StreamsEventResponse(batchFailures);
121+
return Optional.of(StreamsEventResponse.BatchItemFailure.builder().withItemIdentifier(eventRecord.getKinesis().getSequenceNumber()).build());
122+
}
96123
}
97124
}
98125

powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/SqsBatchMessageHandler.java

+68-37
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,15 @@
1818
import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse;
1919
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
2020
import java.util.ArrayList;
21+
import java.util.List;
22+
import java.util.Optional;
23+
import java.util.concurrent.atomic.AtomicBoolean;
2124
import java.util.function.BiConsumer;
2225
import java.util.function.Consumer;
26+
import java.util.stream.Collectors;
2327
import org.slf4j.Logger;
2428
import org.slf4j.LoggerFactory;
29+
import software.amazon.lambda.powertools.batch.internal.MultiThreadMDC;
2530
import software.amazon.lambda.powertools.utilities.EventDeserializer;
2631

2732
/**
@@ -61,57 +66,27 @@ public SQSBatchResponse processBatch(SQSEvent event, Context context) {
6166
// If we are working on a FIFO queue, when any message fails we should stop processing and return the
6267
// rest of the batch as failed too. We use this variable to track when that has happened.
6368
// https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting
64-
boolean failWholeBatch = false;
69+
AtomicBoolean failWholeBatch = new AtomicBoolean(false);
6570

6671
int messageCursor = 0;
67-
for (; messageCursor < event.getRecords().size() && !failWholeBatch; messageCursor++) {
72+
for (; messageCursor < event.getRecords().size() && !failWholeBatch.get(); messageCursor++) {
6873
SQSEvent.SQSMessage message = event.getRecords().get(messageCursor);
6974

7075
String messageGroupId = message.getAttributes() != null ?
7176
message.getAttributes().get(MESSAGE_GROUP_ID_KEY) : null;
7277

73-
try {
74-
if (this.rawMessageHandler != null) {
75-
rawMessageHandler.accept(message, context);
76-
} else {
77-
M messageDeserialized = EventDeserializer.extractDataFrom(message).as(messageClass);
78-
messageHandler.accept(messageDeserialized, context);
79-
}
80-
81-
// Report success if we have a handler
82-
if (this.successHandler != null) {
83-
this.successHandler.accept(message);
84-
}
85-
86-
} catch (Throwable t) {
87-
LOGGER.error("Error while processing message with messageId {}: {}, adding it to batch item failures",
88-
message.getMessageId(), t.getMessage());
89-
LOGGER.error("Error was", t);
90-
91-
response.getBatchItemFailures()
92-
.add(SQSBatchResponse.BatchItemFailure.builder().withItemIdentifier(message.getMessageId())
93-
.build());
78+
processBatchItem(message, context).ifPresent(batchItemFailure -> {
79+
response.getBatchItemFailures().add(batchItemFailure);
9480
if (messageGroupId != null) {
95-
failWholeBatch = true;
81+
failWholeBatch.set(true);
9682
LOGGER.info(
9783
"A message in a batch with messageGroupId {} and messageId {} failed; failing the rest of the batch too"
9884
, messageGroupId, message.getMessageId());
9985
}
100-
101-
// Report failure if we have a handler
102-
if (this.failureHandler != null) {
103-
// A failing failure handler is no reason to fail the batch
104-
try {
105-
this.failureHandler.accept(message, t);
106-
} catch (Throwable t2) {
107-
LOGGER.warn("failureHandler threw handling failure", t2);
108-
}
109-
}
110-
111-
}
86+
});
11287
}
11388

114-
if (failWholeBatch) {
89+
if (failWholeBatch.get()) {
11590
// Add the remaining messages to the batch item failures
11691
event.getRecords()
11792
.subList(messageCursor, event.getRecords().size())
@@ -121,4 +96,60 @@ public SQSBatchResponse processBatch(SQSEvent event, Context context) {
12196
}
12297
return response;
12398
}
99+
100+
@Override
101+
public SQSBatchResponse processBatchInParallel(SQSEvent event, Context context) {
102+
if (!event.getRecords().isEmpty() && event.getRecords().get(0).getAttributes().get(MESSAGE_GROUP_ID_KEY) != null) {
103+
LOGGER.warn("FIFO queues are not supported in parallel mode, proceeding in sequence");
104+
return processBatch(event, context);
105+
}
106+
107+
MultiThreadMDC multiThreadMDC = new MultiThreadMDC();
108+
List<SQSBatchResponse.BatchItemFailure> batchItemFailures = event.getRecords()
109+
.parallelStream() // Parallel processing
110+
.map(sqsMessage -> {
111+
multiThreadMDC.copyMDCToThread(Thread.currentThread().getName());
112+
return processBatchItem(sqsMessage, context);
113+
})
114+
.filter(Optional::isPresent)
115+
.map(Optional::get)
116+
.collect(Collectors.toList());
117+
118+
return SQSBatchResponse.builder().withBatchItemFailures(batchItemFailures).build();
119+
}
120+
121+
private Optional<SQSBatchResponse.BatchItemFailure> processBatchItem(SQSEvent.SQSMessage message, Context context) {
122+
try {
123+
LOGGER.debug("Processing message {}", message.getMessageId());
124+
125+
if (this.rawMessageHandler != null) {
126+
rawMessageHandler.accept(message, context);
127+
} else {
128+
M messageDeserialized = EventDeserializer.extractDataFrom(message).as(messageClass);
129+
messageHandler.accept(messageDeserialized, context);
130+
}
131+
132+
// Report success if we have a handler
133+
if (this.successHandler != null) {
134+
this.successHandler.accept(message);
135+
}
136+
return Optional.empty();
137+
} catch (Throwable t) {
138+
LOGGER.error("Error while processing message with messageId {}: {}, adding it to batch item failures",
139+
message.getMessageId(), t.getMessage());
140+
LOGGER.error("Error was", t);
141+
142+
// Report failure if we have a handler
143+
if (this.failureHandler != null) {
144+
// A failing failure handler is no reason to fail the batch
145+
try {
146+
this.failureHandler.accept(message, t);
147+
} catch (Throwable t2) {
148+
LOGGER.warn("failureHandler threw handling failure", t2);
149+
}
150+
}
151+
return Optional.of(SQSBatchResponse.BatchItemFailure.builder().withItemIdentifier(message.getMessageId())
152+
.build());
153+
}
154+
}
124155
}

0 commit comments

Comments
 (0)