diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index c2072105d..95704b8a0 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -4,6 +4,9 @@ description: Utility --- The SQS batch processing utility provides a way to handle partial failures when processing batches of messages from SQS. +The utility handles batch processing for both +[standard](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/standard-queues.html) and +[FIFO](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html) SQS queues. **Key Features** @@ -177,8 +180,11 @@ Both have nearly the same behaviour when it comes to processing messages from th * **Entire batch has been successfully processed**, where your Lambda handler returned successfully, we will let SQS delete the batch to optimize your cost * **Entire Batch has been partially processed successfully**, where exceptions were raised within your `SqsMessageHandler` interface implementation, we will: - **1)** Delete successfully processed messages from the queue by directly calling `sqs:DeleteMessageBatch` - - **2)** if, non retryable exceptions occur, messages resulting in configured exceptions during processing will be immediately moved to the dead letter queue associated to the source SQS queue or deleted from the source SQS queue if `deleteNonRetryableMessageFromQueue` is set to `true`. - - **3)** Raise `SQSBatchProcessingException` to ensure failed messages return to your SQS queue + - **2)** If a message with a [message group ID](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagegroupid-property.html) fails, + the processing of the batch will be stopped and the remainder of the messages will be returned to SQS. + This behaviour [is required to handle SQS FIFO queues](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting). + - **3)** if non retryable exceptions occur, messages resulting in configured exceptions during processing will be immediately moved to the dead letter queue associated to the source SQS queue or deleted from the source SQS queue if `deleteNonRetryableMessageFromQueue` is set to `true`. + - **4)** Raise `SQSBatchProcessingException` to ensure failed messages return to your SQS queue The only difference is that **SqsUtils Utility API** will give you access to return from the processed messages if you need. Exception `SQSBatchProcessingException` thrown from the utility will have access to both successful and failed messaged along with failure exceptions. diff --git a/powertools-sqs/pom.xml b/powertools-sqs/pom.xml index 5eafcc8d3..618aa948c 100644 --- a/powertools-sqs/pom.xml +++ b/powertools-sqs/pom.xml @@ -45,6 +45,10 @@ software.amazon.lambda powertools-core + + com.amazonaws + aws-lambda-java-tests + com.amazonaws aws-lambda-java-core diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java index 3461c6755..9fff4dc6f 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java @@ -15,7 +15,9 @@ import java.lang.reflect.Constructor; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; +import java.util.Queue; import java.util.function.Function; import java.util.stream.Collectors; @@ -26,6 +28,7 @@ import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.lambda.powertools.sqs.exception.SkippedMessageDueToFailedBatchException; import software.amazon.lambda.powertools.sqs.internal.BatchContext; import software.amazon.payloadoffloading.PayloadS3Pointer; import software.amazon.lambda.powertools.sqs.internal.SqsLargeMessageAspect; @@ -43,6 +46,9 @@ public final class SqsUtils { private static SqsClient client; private static S3Client s3Client; + // The attribute on an SQS-FIFO message used to record the message group ID + private static final String MESSAGE_GROUP_ID = "MessageGroupId"; + private SqsUtils() { } @@ -490,19 +496,52 @@ public static List batchProcessor(final SQSEvent event, } BatchContext batchContext = new BatchContext(client); - - for (SQSMessage message : event.getRecords()) { + int offset = 0; + boolean failedBatch = false; + while (offset < event.getRecords().size() && !failedBatch) { + // Get the current message and advance to the next. Doing this here + // makes it easier for us to know where we are up to if we have to + // break out of here early. + SQSMessage message = event.getRecords().get(offset); + offset++; + + // If the batch hasn't failed, try process the message try { handlerReturn.add(handler.process(message)); batchContext.addSuccess(message); } catch (Exception e) { + + // Record the failure batchContext.addFailure(message, e); + + // If we are trying to process a message that has a messageGroupId, we are on a FIFO queue. A failure + // now stops us from processing the rest of the batch; we break out of the loop leaving unprocessed + // messages in the queue + // https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting + String messageGroupId = message.getAttributes() != null ? + message.getAttributes().get(MESSAGE_GROUP_ID) : null; + if (messageGroupId != null) { + LOG.info("A message in a message batch with messageGroupId {} and messageId {} failed; failing the rest of the batch too" + , messageGroupId, message.getMessageId()); + failedBatch = true; + } LOG.error("Encountered issue processing message: {}", message.getMessageId(), e); } } - batchContext.processSuccessAndHandleFailed(handlerReturn, suppressException, deleteNonRetryableMessageFromQueue, nonRetryableExceptions); + // If we have a FIFO batch failure, unprocessed messages will remain on the queue + // past the failed message. We have to add these to the errors + if (offset < event.getRecords().size()) { + event.getRecords() + .subList(offset, event.getRecords().size()) + .forEach(message -> { + LOG.info("Skipping message {} as another message with a message group failed in this batch", + message.getMessageId()); + batchContext.addFailure(message, new SkippedMessageDueToFailedBatchException()); + }); + } + batchContext.processSuccessAndHandleFailed(handlerReturn, suppressException, deleteNonRetryableMessageFromQueue, nonRetryableExceptions); return handlerReturn; } diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/exception/SkippedMessageDueToFailedBatchException.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/exception/SkippedMessageDueToFailedBatchException.java new file mode 100644 index 000000000..9dbb66509 --- /dev/null +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/exception/SkippedMessageDueToFailedBatchException.java @@ -0,0 +1,12 @@ +package software.amazon.lambda.powertools.sqs.exception; + +/** + * Indicates that a message has been skipped due to the batch it is + * within failing. + */ +public class SkippedMessageDueToFailedBatchException extends Exception { + + public SkippedMessageDueToFailedBatchException() { + } + +} diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/SqsUtilsFifoBatchProcessorTest.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/SqsUtilsFifoBatchProcessorTest.java new file mode 100644 index 000000000..cfa79dc36 --- /dev/null +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/SqsUtilsFifoBatchProcessorTest.java @@ -0,0 +1,166 @@ +package software.amazon.lambda.powertools.sqs; + +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import com.amazonaws.services.lambda.runtime.tests.EventLoader; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.*; +import org.mockito.quality.Strictness; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.MockitoAnnotations.openMocks; +import static software.amazon.lambda.powertools.sqs.SqsUtils.batchProcessor; +import static software.amazon.lambda.powertools.sqs.SqsUtils.overrideSqsClient; + +public class SqsUtilsFifoBatchProcessorTest { + + private static SQSEvent sqsBatchEvent; + private MockitoSession session; + + @Mock + private SqsClient sqsClient; + + @Captor + private ArgumentCaptor deleteMessageBatchCaptor; + + public SqsUtilsFifoBatchProcessorTest() throws IOException { + sqsBatchEvent = EventLoader.loadSQSEvent("SqsFifoBatchEvent.json"); + } + + @BeforeEach + public void setup() { + // Establish a strict mocking session. This means that any + // calls to a mock that have not been stubbed will throw + this.session = Mockito.mockitoSession() + .strictness(Strictness.STRICT_STUBS) + .initMocks(this) + .startMocking(); + + overrideSqsClient(sqsClient); + } + + @AfterEach + public void tearDown() { + session.finishMocking(); + } + + @Test + public void processWholeBatch() { + // Act + AtomicInteger processedCount = new AtomicInteger(); + List results = batchProcessor(sqsBatchEvent, false, (message) -> { + processedCount.getAndIncrement(); + return true; + }); + + // Assert + assertThat(processedCount.get()).isEqualTo(3); + assertThat(results.size()).isEqualTo(3); + } + + /** + * Check that a failure in the middle of the batch: + * - deletes the successful message explicitly from SQS + * - marks the failed and subsequent message as failed + * - does not delete the failed or subsequent message + */ + @Test + public void singleFailureInMiddleOfBatch() { + // Arrange + Mockito.when(sqsClient.deleteMessageBatch(deleteMessageBatchCaptor.capture())).thenReturn(DeleteMessageBatchResponse + .builder().build()); + + // Act + AtomicInteger processedCount = new AtomicInteger(); + assertThatExceptionOfType(SQSBatchProcessingException.class) + .isThrownBy(() -> batchProcessor(sqsBatchEvent, false, (message) -> { + int value = processedCount.getAndIncrement(); + if (value == 1) { + throw new RuntimeException("Whoops"); + } + return true; + })) + + // Assert + .isInstanceOf(SQSBatchProcessingException.class) + .satisfies(e -> { + List failures = ((SQSBatchProcessingException)e).getFailures(); + assertThat(failures.size()).isEqualTo(2); + List failureIds = failures.stream() + .map(SQSEvent.SQSMessage::getMessageId) + .collect(Collectors.toList()); + assertThat(failureIds).contains(sqsBatchEvent.getRecords().get(1).getMessageId()); + assertThat(failureIds).contains(sqsBatchEvent.getRecords().get(2).getMessageId()); + }); + + DeleteMessageBatchRequest deleteRequest = deleteMessageBatchCaptor.getValue(); + List messageIds = deleteRequest.entries().stream() + .map(DeleteMessageBatchRequestEntry::id) + .collect(Collectors.toList()); + assertThat(deleteRequest.entries().size()).isEqualTo(1); + assertThat(messageIds.contains(sqsBatchEvent.getRecords().get(0).getMessageId())).isTrue(); + + } + + @Test + public void singleFailureAtEndOfBatch() { + + // Arrange + Mockito.when(sqsClient.deleteMessageBatch(deleteMessageBatchCaptor.capture())).thenReturn(DeleteMessageBatchResponse + .builder().build()); + + + // Act + AtomicInteger processedCount = new AtomicInteger(); + assertThatExceptionOfType(SQSBatchProcessingException.class) + .isThrownBy(() -> batchProcessor(sqsBatchEvent, false, (message) -> { + int value = processedCount.getAndIncrement(); + if (value == 2) { + throw new RuntimeException("Whoops"); + } + return true; + })); + + // Assert + DeleteMessageBatchRequest deleteRequest = deleteMessageBatchCaptor.getValue(); + List messageIds = deleteRequest.entries().stream() + .map(DeleteMessageBatchRequestEntry::id) + .collect(Collectors.toList()); + assertThat(deleteRequest.entries().size()).isEqualTo(2); + assertThat(messageIds.contains(sqsBatchEvent.getRecords().get(0).getMessageId())).isTrue(); + assertThat(messageIds.contains(sqsBatchEvent.getRecords().get(1).getMessageId())).isTrue(); + assertThat(messageIds.contains(sqsBatchEvent.getRecords().get(2).getMessageId())).isFalse(); + } + + @Test + public void messageFailureStopsGroupProcessing() { + String groupToFail = sqsBatchEvent.getRecords().get(0).getAttributes().get("MessageGroupId"); + + assertThatExceptionOfType(SQSBatchProcessingException.class) + .isThrownBy(() -> batchProcessor(sqsBatchEvent, (message) -> { + String groupId = message.getAttributes().get("MessageGroupId"); + if (groupId.equals(groupToFail)) { + throw new RuntimeException("Failed processing"); + } + return groupId; + })) + .satisfies(e -> { + assertThat(e.successMessageReturnValues().size()).isEqualTo(0); + assertThat(e.successMessageReturnValues().contains(groupToFail)).isFalse(); + }); + } + +} diff --git a/powertools-sqs/src/test/resources/SqsFifoBatchEvent.json b/powertools-sqs/src/test/resources/SqsFifoBatchEvent.json new file mode 100644 index 000000000..726e45ea1 --- /dev/null +++ b/powertools-sqs/src/test/resources/SqsFifoBatchEvent.json @@ -0,0 +1,73 @@ +{ + "Records": [ + { + "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", + "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", + "body": "Test message.", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082649183", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082649185", + "MessageGroupId": "Group1" + }, + "messageAttributes": {}, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-1:906126917321:sqs-lambda-MySqsQueue-4DYWWJIE97N5", + "awsRegion": "us-east-2" + }, + { + "messageId": "2e1424d4-f796-459a-8184-9c92662be6da", + "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...", + "body": "Test message.", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082650636", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082650649", + "MessageGroupId": "Group2" + }, + "messageAttributes": { + "Attribute3" : { + "binaryValue" : "MTEwMA==", + "dataType" : "Binary" + }, + "Attribute2" : { + "stringValue" : "123", + "dataType" : "Number" + }, + "Attribute1" : { + "stringValue" : "AttributeValue1", + "dataType" : "String" + } + }, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-1:906126917321:sqs-lambda-MySqsQueue-4DYWWJIE97N5", + "awsRegion": "us-east-2" + }, + { + "messageId": "2e1424d4-f796-459a-9696-9c92662ba5da", + "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...", + "body": "Test message.", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082650636", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082650649", + "MessageGroupId": "Group1" + }, + "messageAttributes": { + "Attribute2" : { + "stringValue" : "123", + "dataType" : "Number" + } + }, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-1:906126917321:sqs-lambda-MySqsQueue-4DYWWJIE97N5", + "awsRegion": "us-east-2" + } + ] +} \ No newline at end of file