Skip to content

Commit a19def3

Browse files
authored
fix: Handle batch failures in FIFO queues correctly (#1183)
1 parent 5ba9cf4 commit a19def3

File tree

6 files changed

+305
-5
lines changed

6 files changed

+305
-5
lines changed

docs/utilities/batch.md

+8-2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ description: Utility
44
---
55

66
The SQS batch processing utility provides a way to handle partial failures when processing batches of messages from SQS.
7+
The utility handles batch processing for both
8+
[standard](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/standard-queues.html) and
9+
[FIFO](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html) SQS queues.
710

811
**Key Features**
912

@@ -177,8 +180,11 @@ Both have nearly the same behaviour when it comes to processing messages from th
177180
* **Entire batch has been successfully processed**, where your Lambda handler returned successfully, we will let SQS delete the batch to optimize your cost
178181
* **Entire Batch has been partially processed successfully**, where exceptions were raised within your `SqsMessageHandler` interface implementation, we will:
179182
- **1)** Delete successfully processed messages from the queue by directly calling `sqs:DeleteMessageBatch`
180-
- **2)** if, non retryable exceptions occur, messages resulting in configured exceptions during processing will be immediately moved to the dead letter queue associated to the source SQS queue or deleted from the source SQS queue if `deleteNonRetryableMessageFromQueue` is set to `true`.
181-
- **3)** Raise `SQSBatchProcessingException` to ensure failed messages return to your SQS queue
183+
- **2)** If a message with a [message group ID](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagegroupid-property.html) fails,
184+
the processing of the batch will be stopped and the remainder of the messages will be returned to SQS.
185+
This behaviour [is required to handle SQS FIFO queues](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting).
186+
- **3)** if non retryable exceptions occur, messages resulting in configured exceptions during processing will be immediately moved to the dead letter queue associated to the source SQS queue or deleted from the source SQS queue if `deleteNonRetryableMessageFromQueue` is set to `true`.
187+
- **4)** Raise `SQSBatchProcessingException` to ensure failed messages return to your SQS queue
182188

183189
The only difference is that **SqsUtils Utility API** will give you access to return from the processed messages if you need. Exception `SQSBatchProcessingException` thrown from the
184190
utility will have access to both successful and failed messaged along with failure exceptions.

powertools-sqs/pom.xml

+4
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@
4545
<groupId>software.amazon.lambda</groupId>
4646
<artifactId>powertools-core</artifactId>
4747
</dependency>
48+
<dependency>
49+
<groupId>com.amazonaws</groupId>
50+
<artifactId>aws-lambda-java-tests</artifactId>
51+
</dependency>
4852
<dependency>
4953
<groupId>com.amazonaws</groupId>
5054
<artifactId>aws-lambda-java-core</artifactId>

powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java

+42-3
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515

1616
import java.lang.reflect.Constructor;
1717
import java.util.ArrayList;
18+
import java.util.LinkedList;
1819
import java.util.List;
20+
import java.util.Queue;
1921
import java.util.function.Function;
2022
import java.util.stream.Collectors;
2123

@@ -26,6 +28,7 @@
2628
import org.slf4j.LoggerFactory;
2729
import software.amazon.awssdk.services.s3.S3Client;
2830
import software.amazon.awssdk.services.sqs.SqsClient;
31+
import software.amazon.lambda.powertools.sqs.exception.SkippedMessageDueToFailedBatchException;
2932
import software.amazon.lambda.powertools.sqs.internal.BatchContext;
3033
import software.amazon.payloadoffloading.PayloadS3Pointer;
3134
import software.amazon.lambda.powertools.sqs.internal.SqsLargeMessageAspect;
@@ -43,6 +46,9 @@ public final class SqsUtils {
4346
private static SqsClient client;
4447
private static S3Client s3Client;
4548

49+
// The attribute on an SQS-FIFO message used to record the message group ID
50+
private static final String MESSAGE_GROUP_ID = "MessageGroupId";
51+
4652
private SqsUtils() {
4753
}
4854

@@ -490,19 +496,52 @@ public static <R> List<R> batchProcessor(final SQSEvent event,
490496
}
491497

492498
BatchContext batchContext = new BatchContext(client);
493-
494-
for (SQSMessage message : event.getRecords()) {
499+
int offset = 0;
500+
boolean failedBatch = false;
501+
while (offset < event.getRecords().size() && !failedBatch) {
502+
// Get the current message and advance to the next. Doing this here
503+
// makes it easier for us to know where we are up to if we have to
504+
// break out of here early.
505+
SQSMessage message = event.getRecords().get(offset);
506+
offset++;
507+
508+
// If the batch hasn't failed, try process the message
495509
try {
496510
handlerReturn.add(handler.process(message));
497511
batchContext.addSuccess(message);
498512
} catch (Exception e) {
513+
514+
// Record the failure
499515
batchContext.addFailure(message, e);
516+
517+
// If we are trying to process a message that has a messageGroupId, we are on a FIFO queue. A failure
518+
// now stops us from processing the rest of the batch; we break out of the loop leaving unprocessed
519+
// messages in the queue
520+
// https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting
521+
String messageGroupId = message.getAttributes() != null ?
522+
message.getAttributes().get(MESSAGE_GROUP_ID) : null;
523+
if (messageGroupId != null) {
524+
LOG.info("A message in a message batch with messageGroupId {} and messageId {} failed; failing the rest of the batch too"
525+
, messageGroupId, message.getMessageId());
526+
failedBatch = true;
527+
}
500528
LOG.error("Encountered issue processing message: {}", message.getMessageId(), e);
501529
}
502530
}
503531

504-
batchContext.processSuccessAndHandleFailed(handlerReturn, suppressException, deleteNonRetryableMessageFromQueue, nonRetryableExceptions);
532+
// If we have a FIFO batch failure, unprocessed messages will remain on the queue
533+
// past the failed message. We have to add these to the errors
534+
if (offset < event.getRecords().size()) {
535+
event.getRecords()
536+
.subList(offset, event.getRecords().size())
537+
.forEach(message -> {
538+
LOG.info("Skipping message {} as another message with a message group failed in this batch",
539+
message.getMessageId());
540+
batchContext.addFailure(message, new SkippedMessageDueToFailedBatchException());
541+
});
542+
}
505543

544+
batchContext.processSuccessAndHandleFailed(handlerReturn, suppressException, deleteNonRetryableMessageFromQueue, nonRetryableExceptions);
506545
return handlerReturn;
507546
}
508547

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package software.amazon.lambda.powertools.sqs.exception;
2+
3+
/**
4+
* Indicates that a message has been skipped due to the batch it is
5+
* within failing.
6+
*/
7+
public class SkippedMessageDueToFailedBatchException extends Exception {
8+
9+
public SkippedMessageDueToFailedBatchException() {
10+
}
11+
12+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
package software.amazon.lambda.powertools.sqs;
2+
3+
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
4+
import com.amazonaws.services.lambda.runtime.tests.EventLoader;
5+
import com.fasterxml.jackson.databind.ObjectMapper;
6+
import org.junit.jupiter.api.AfterEach;
7+
import org.junit.jupiter.api.BeforeEach;
8+
import org.junit.jupiter.api.Test;
9+
import org.mockito.*;
10+
import org.mockito.quality.Strictness;
11+
import software.amazon.awssdk.services.sqs.SqsClient;
12+
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
13+
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry;
14+
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse;
15+
16+
import java.io.IOException;
17+
import java.util.List;
18+
import java.util.concurrent.atomic.AtomicInteger;
19+
import java.util.stream.Collectors;
20+
21+
import static org.assertj.core.api.Assertions.assertThat;
22+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
23+
import static org.mockito.Mockito.verifyNoMoreInteractions;
24+
import static org.mockito.MockitoAnnotations.openMocks;
25+
import static software.amazon.lambda.powertools.sqs.SqsUtils.batchProcessor;
26+
import static software.amazon.lambda.powertools.sqs.SqsUtils.overrideSqsClient;
27+
28+
public class SqsUtilsFifoBatchProcessorTest {
29+
30+
private static SQSEvent sqsBatchEvent;
31+
private MockitoSession session;
32+
33+
@Mock
34+
private SqsClient sqsClient;
35+
36+
@Captor
37+
private ArgumentCaptor<DeleteMessageBatchRequest> deleteMessageBatchCaptor;
38+
39+
public SqsUtilsFifoBatchProcessorTest() throws IOException {
40+
sqsBatchEvent = EventLoader.loadSQSEvent("SqsFifoBatchEvent.json");
41+
}
42+
43+
@BeforeEach
44+
public void setup() {
45+
// Establish a strict mocking session. This means that any
46+
// calls to a mock that have not been stubbed will throw
47+
this.session = Mockito.mockitoSession()
48+
.strictness(Strictness.STRICT_STUBS)
49+
.initMocks(this)
50+
.startMocking();
51+
52+
overrideSqsClient(sqsClient);
53+
}
54+
55+
@AfterEach
56+
public void tearDown() {
57+
session.finishMocking();
58+
}
59+
60+
@Test
61+
public void processWholeBatch() {
62+
// Act
63+
AtomicInteger processedCount = new AtomicInteger();
64+
List<Object> results = batchProcessor(sqsBatchEvent, false, (message) -> {
65+
processedCount.getAndIncrement();
66+
return true;
67+
});
68+
69+
// Assert
70+
assertThat(processedCount.get()).isEqualTo(3);
71+
assertThat(results.size()).isEqualTo(3);
72+
}
73+
74+
/**
75+
* Check that a failure in the middle of the batch:
76+
* - deletes the successful message explicitly from SQS
77+
* - marks the failed and subsequent message as failed
78+
* - does not delete the failed or subsequent message
79+
*/
80+
@Test
81+
public void singleFailureInMiddleOfBatch() {
82+
// Arrange
83+
Mockito.when(sqsClient.deleteMessageBatch(deleteMessageBatchCaptor.capture())).thenReturn(DeleteMessageBatchResponse
84+
.builder().build());
85+
86+
// Act
87+
AtomicInteger processedCount = new AtomicInteger();
88+
assertThatExceptionOfType(SQSBatchProcessingException.class)
89+
.isThrownBy(() -> batchProcessor(sqsBatchEvent, false, (message) -> {
90+
int value = processedCount.getAndIncrement();
91+
if (value == 1) {
92+
throw new RuntimeException("Whoops");
93+
}
94+
return true;
95+
}))
96+
97+
// Assert
98+
.isInstanceOf(SQSBatchProcessingException.class)
99+
.satisfies(e -> {
100+
List<SQSEvent.SQSMessage> failures = ((SQSBatchProcessingException)e).getFailures();
101+
assertThat(failures.size()).isEqualTo(2);
102+
List<String> failureIds = failures.stream()
103+
.map(SQSEvent.SQSMessage::getMessageId)
104+
.collect(Collectors.toList());
105+
assertThat(failureIds).contains(sqsBatchEvent.getRecords().get(1).getMessageId());
106+
assertThat(failureIds).contains(sqsBatchEvent.getRecords().get(2).getMessageId());
107+
});
108+
109+
DeleteMessageBatchRequest deleteRequest = deleteMessageBatchCaptor.getValue();
110+
List<String> messageIds = deleteRequest.entries().stream()
111+
.map(DeleteMessageBatchRequestEntry::id)
112+
.collect(Collectors.toList());
113+
assertThat(deleteRequest.entries().size()).isEqualTo(1);
114+
assertThat(messageIds.contains(sqsBatchEvent.getRecords().get(0).getMessageId())).isTrue();
115+
116+
}
117+
118+
@Test
119+
public void singleFailureAtEndOfBatch() {
120+
121+
// Arrange
122+
Mockito.when(sqsClient.deleteMessageBatch(deleteMessageBatchCaptor.capture())).thenReturn(DeleteMessageBatchResponse
123+
.builder().build());
124+
125+
126+
// Act
127+
AtomicInteger processedCount = new AtomicInteger();
128+
assertThatExceptionOfType(SQSBatchProcessingException.class)
129+
.isThrownBy(() -> batchProcessor(sqsBatchEvent, false, (message) -> {
130+
int value = processedCount.getAndIncrement();
131+
if (value == 2) {
132+
throw new RuntimeException("Whoops");
133+
}
134+
return true;
135+
}));
136+
137+
// Assert
138+
DeleteMessageBatchRequest deleteRequest = deleteMessageBatchCaptor.getValue();
139+
List<String> messageIds = deleteRequest.entries().stream()
140+
.map(DeleteMessageBatchRequestEntry::id)
141+
.collect(Collectors.toList());
142+
assertThat(deleteRequest.entries().size()).isEqualTo(2);
143+
assertThat(messageIds.contains(sqsBatchEvent.getRecords().get(0).getMessageId())).isTrue();
144+
assertThat(messageIds.contains(sqsBatchEvent.getRecords().get(1).getMessageId())).isTrue();
145+
assertThat(messageIds.contains(sqsBatchEvent.getRecords().get(2).getMessageId())).isFalse();
146+
}
147+
148+
@Test
149+
public void messageFailureStopsGroupProcessing() {
150+
String groupToFail = sqsBatchEvent.getRecords().get(0).getAttributes().get("MessageGroupId");
151+
152+
assertThatExceptionOfType(SQSBatchProcessingException.class)
153+
.isThrownBy(() -> batchProcessor(sqsBatchEvent, (message) -> {
154+
String groupId = message.getAttributes().get("MessageGroupId");
155+
if (groupId.equals(groupToFail)) {
156+
throw new RuntimeException("Failed processing");
157+
}
158+
return groupId;
159+
}))
160+
.satisfies(e -> {
161+
assertThat(e.successMessageReturnValues().size()).isEqualTo(0);
162+
assertThat(e.successMessageReturnValues().contains(groupToFail)).isFalse();
163+
});
164+
}
165+
166+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
{
2+
"Records": [
3+
{
4+
"messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
5+
"receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
6+
"body": "Test message.",
7+
"attributes": {
8+
"ApproximateReceiveCount": "1",
9+
"SentTimestamp": "1545082649183",
10+
"SenderId": "AIDAIENQZJOLO23YVJ4VO",
11+
"ApproximateFirstReceiveTimestamp": "1545082649185",
12+
"MessageGroupId": "Group1"
13+
},
14+
"messageAttributes": {},
15+
"md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
16+
"eventSource": "aws:sqs",
17+
"eventSourceARN": "arn:aws:sqs:us-east-1:906126917321:sqs-lambda-MySqsQueue-4DYWWJIE97N5",
18+
"awsRegion": "us-east-2"
19+
},
20+
{
21+
"messageId": "2e1424d4-f796-459a-8184-9c92662be6da",
22+
"receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...",
23+
"body": "Test message.",
24+
"attributes": {
25+
"ApproximateReceiveCount": "1",
26+
"SentTimestamp": "1545082650636",
27+
"SenderId": "AIDAIENQZJOLO23YVJ4VO",
28+
"ApproximateFirstReceiveTimestamp": "1545082650649",
29+
"MessageGroupId": "Group2"
30+
},
31+
"messageAttributes": {
32+
"Attribute3" : {
33+
"binaryValue" : "MTEwMA==",
34+
"dataType" : "Binary"
35+
},
36+
"Attribute2" : {
37+
"stringValue" : "123",
38+
"dataType" : "Number"
39+
},
40+
"Attribute1" : {
41+
"stringValue" : "AttributeValue1",
42+
"dataType" : "String"
43+
}
44+
},
45+
"md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
46+
"eventSource": "aws:sqs",
47+
"eventSourceARN": "arn:aws:sqs:us-east-1:906126917321:sqs-lambda-MySqsQueue-4DYWWJIE97N5",
48+
"awsRegion": "us-east-2"
49+
},
50+
{
51+
"messageId": "2e1424d4-f796-459a-9696-9c92662ba5da",
52+
"receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...",
53+
"body": "Test message.",
54+
"attributes": {
55+
"ApproximateReceiveCount": "1",
56+
"SentTimestamp": "1545082650636",
57+
"SenderId": "AIDAIENQZJOLO23YVJ4VO",
58+
"ApproximateFirstReceiveTimestamp": "1545082650649",
59+
"MessageGroupId": "Group1"
60+
},
61+
"messageAttributes": {
62+
"Attribute2" : {
63+
"stringValue" : "123",
64+
"dataType" : "Number"
65+
}
66+
},
67+
"md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
68+
"eventSource": "aws:sqs",
69+
"eventSourceARN": "arn:aws:sqs:us-east-1:906126917321:sqs-lambda-MySqsQueue-4DYWWJIE97N5",
70+
"awsRegion": "us-east-2"
71+
}
72+
]
73+
}

0 commit comments

Comments
 (0)