From a5a0a8251a4fce316faa0c85d85d7029308e8461 Mon Sep 17 00:00:00 2001 From: Scott Gerring Date: Thu, 8 Jun 2023 13:13:29 +0800 Subject: [PATCH 01/15] Sketch out batch processing failures for FIFO queues --- .../lambda/powertools/sqs/SqsUtils.java | 23 +++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java index 9ae81b77d..5608312db 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java @@ -43,6 +43,9 @@ public final class SqsUtils { private static SqsClient client; private static S3Client s3Client; + // The attribute on an SQS-FIFO message used to record the message group ID + private static final String MessageGroupIdKey = "MessageGroupId"; + private SqsUtils() { } @@ -491,12 +494,28 @@ public static List batchProcessor(final SQSEvent event, BatchContext batchContext = new BatchContext(client); + List failedMessageGroupIds = new ArrayList<>(); // Track which message groups have failed for (SQSMessage message : event.getRecords()) { + + // If we are trying to process a message that has a messageGroupId, we are on a FIFO queue, + // and should not process it if a previous message in the same group in this batch has failed. + String messageGroupId = message.getAttributes() != null? + message.getAttributes().get(MessageGroupIdKey) : null; + try { - handlerReturn.add(handler.process(message)); - batchContext.addSuccess(message); + if (messageGroupId != null && failedMessageGroupIds.contains(messageGroupId)) { + batchContext.addFailure(message, null); // TODO - is this sensible + } else { + handlerReturn.add(handler.process(message)); + batchContext.addSuccess(message); + } } catch (Exception e) { batchContext.addFailure(message, e); + + // Record that this messageGroupId has a failure + if (messageGroupId != null) { + failedMessageGroupIds.add(messageGroupId); + } LOG.error("Encountered issue processing message: {}", message.getMessageId(), e); } } From d0699df0584a78a38ef6d0cf31dba661eb63ca97 Mon Sep 17 00:00:00 2001 From: Scott Gerring Date: Thu, 8 Jun 2023 14:28:04 +0800 Subject: [PATCH 02/15] Write some tests --- .../lambda/powertools/sqs/SqsUtils.java | 8 +- .../sqs/SqsUtilsFifoBatchProcessorTest.java | 63 ++++++++++++++++ .../src/test/resources/SqsFifoBatchEvent.json | 73 +++++++++++++++++++ 3 files changed, 142 insertions(+), 2 deletions(-) create mode 100644 powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/SqsUtilsFifoBatchProcessorTest.java create mode 100644 powertools-sqs/src/test/resources/SqsFifoBatchEvent.json diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java index 5608312db..a70c94ad0 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java @@ -503,8 +503,12 @@ public static List batchProcessor(final SQSEvent event, message.getAttributes().get(MessageGroupIdKey) : null; try { - if (messageGroupId != null && failedMessageGroupIds.contains(messageGroupId)) { - batchContext.addFailure(message, null); // TODO - is this sensible + if (messageGroupId == null && failedMessageGroupIds.contains(messageGroupId)) { + // TODO - do we need to record that we're skipping this? If we add it to the failure + // list it's a bit weird as it's not a failure .... + // batchContext.addFailure(message, null); + LOG.info("Skipping messge {} as another message in messageGroup {} failed in this batch", + message.getMessageId(), messageGroupId); } else { handlerReturn.add(handler.process(message)); batchContext.addSuccess(message); diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/SqsUtilsFifoBatchProcessorTest.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/SqsUtilsFifoBatchProcessorTest.java new file mode 100644 index 000000000..c40da2116 --- /dev/null +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/SqsUtilsFifoBatchProcessorTest.java @@ -0,0 +1,63 @@ +package software.amazon.lambda.powertools.sqs; + +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import software.amazon.awssdk.services.sqs.SqsClient; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.mockito.MockitoAnnotations.openMocks; +import static software.amazon.lambda.powertools.sqs.SqsUtils.batchProcessor; +import static software.amazon.lambda.powertools.sqs.SqsUtils.overrideSqsClient; + +public class SqsUtilsFifoBatchProcessorTest { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + private static SQSEvent sqsBatchEvent; + + @Mock + private SqsClient sqsClient; + + public SqsUtilsFifoBatchProcessorTest() throws IOException { + openMocks(this); + overrideSqsClient(sqsClient); + sqsBatchEvent = MAPPER.readValue(SqsUtilsFifoBatchProcessorTest.class.getResource("/SqsFifoBatchEvent.json"), SQSEvent.class); + } + + @Test + public void processWholeBatch() { + AtomicInteger processedCount = new AtomicInteger(); + List results = batchProcessor(sqsBatchEvent, false, (message) -> { + processedCount.getAndIncrement(); + return true; + }); + + assertThat(processedCount.get()).isEqualTo(3); + assertThat(results.size()).isEqualTo(3); + } + + @Test + public void messageFailureStopsGroupProcessing() { + String groupToFail = sqsBatchEvent.getRecords().get(0).getAttributes().get("MessageGroupId"); + + assertThatExceptionOfType(SQSBatchProcessingException.class) + .isThrownBy(() -> batchProcessor(sqsBatchEvent, (message) -> { + String groupId = message.getAttributes().get("MessageGroupId"); + if (groupId.equals(groupToFail)) { + throw new RuntimeException("Failed processing"); + } + return groupId; + })) + .satisfies(e -> { + assertThat(e.successMessageReturnValues().size()).isEqualTo(1); + assertThat(e.successMessageReturnValues().contains(groupToFail)).isFalse(); + }); + } + +} diff --git a/powertools-sqs/src/test/resources/SqsFifoBatchEvent.json b/powertools-sqs/src/test/resources/SqsFifoBatchEvent.json new file mode 100644 index 000000000..a66dacd60 --- /dev/null +++ b/powertools-sqs/src/test/resources/SqsFifoBatchEvent.json @@ -0,0 +1,73 @@ +{ + "records": [ + { + "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", + "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", + "body": "Test message.", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082649183", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082649185", + "MessageGroupId": "Group1" + }, + "messageAttributes": {}, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceArn": "arn:aws:sqs:us-east-1:906126917321:sqs-lambda-MySqsQueue-4DYWWJIE97N5", + "awsRegion": "us-east-2" + }, + { + "messageId": "2e1424d4-f796-459a-8184-9c92662be6da", + "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...", + "body": "Test message.", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082650636", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082650649", + "MessageGroupId": "Group2" + }, + "messageAttributes": { + "Attribute3" : { + "binaryValue" : "MTEwMA==", + "dataType" : "Binary" + }, + "Attribute2" : { + "stringValue" : "123", + "dataType" : "Number" + }, + "Attribute1" : { + "stringValue" : "AttributeValue1", + "dataType" : "String" + } + }, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceArn": "arn:aws:sqs:us-east-1:906126917321:sqs-lambda-MySqsQueue-4DYWWJIE97N5", + "awsRegion": "us-east-2" + }, + { + "messageId": "2e1424d4-f796-459a-9696-9c92662ba5da", + "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...", + "body": "Test message.", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082650636", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082650649", + "MessageGroupId": "Group1" + }, + "messageAttributes": { + "Attribute2" : { + "stringValue" : "123", + "dataType" : "Number" + } + }, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceArn": "arn:aws:sqs:us-east-1:906126917321:sqs-lambda-MySqsQueue-4DYWWJIE97N5", + "awsRegion": "us-east-2" + } + ] +} \ No newline at end of file From 44c7128ee7404b66238f5f6a56ee332730717d11 Mon Sep 17 00:00:00 2001 From: Scott Gerring Date: Thu, 8 Jun 2023 14:31:35 +0800 Subject: [PATCH 03/15] Fix bug --- .../java/software/amazon/lambda/powertools/sqs/SqsUtils.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java index a70c94ad0..332a0cb33 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java @@ -503,11 +503,11 @@ public static List batchProcessor(final SQSEvent event, message.getAttributes().get(MessageGroupIdKey) : null; try { - if (messageGroupId == null && failedMessageGroupIds.contains(messageGroupId)) { + if (messageGroupId != null && failedMessageGroupIds.contains(messageGroupId)) { // TODO - do we need to record that we're skipping this? If we add it to the failure // list it's a bit weird as it's not a failure .... // batchContext.addFailure(message, null); - LOG.info("Skipping messge {} as another message in messageGroup {} failed in this batch", + LOG.info("Skipping message {} as another message in messageGroup {} failed in this batch", message.getMessageId(), messageGroupId); } else { handlerReturn.add(handler.process(message)); From 4c061e4868bbe624fcce2b208188c9d8f3ac053b Mon Sep 17 00:00:00 2001 From: Scott Gerring Date: Thu, 8 Jun 2023 14:42:49 +0800 Subject: [PATCH 04/15] Report skipped messages as failures --- .../amazon/lambda/powertools/sqs/SqsUtils.java | 9 ++++++--- .../SkippedMessageDueToFailedBatchException.java | 13 +++++++++++++ 2 files changed, 19 insertions(+), 3 deletions(-) create mode 100644 powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/exception/SkippedMessageDueToFailedBatchException.java diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java index 332a0cb33..c81e83f1f 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java @@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.lambda.powertools.sqs.exception.SkippedMessageDueToFailedBatchException; import software.amazon.lambda.powertools.sqs.internal.BatchContext; import software.amazon.payloadoffloading.PayloadS3Pointer; import software.amazon.lambda.powertools.sqs.internal.SqsLargeMessageAspect; @@ -504,11 +505,13 @@ public static List batchProcessor(final SQSEvent event, try { if (messageGroupId != null && failedMessageGroupIds.contains(messageGroupId)) { - // TODO - do we need to record that we're skipping this? If we add it to the failure - // list it's a bit weird as it's not a failure .... - // batchContext.addFailure(message, null); LOG.info("Skipping message {} as another message in messageGroup {} failed in this batch", message.getMessageId(), messageGroupId); + + // We need to add a failure for this message, so that we report back to Lambda that this + // message should be retried + // https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting + batchContext.addFailure(message, new SkippedMessageDueToFailedBatchException(messageGroupId)); } else { handlerReturn.add(handler.process(message)); batchContext.addSuccess(message); diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/exception/SkippedMessageDueToFailedBatchException.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/exception/SkippedMessageDueToFailedBatchException.java new file mode 100644 index 000000000..b75b2735b --- /dev/null +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/exception/SkippedMessageDueToFailedBatchException.java @@ -0,0 +1,13 @@ +package software.amazon.lambda.powertools.sqs.exception; + +public class SkippedMessageDueToFailedBatchException extends Exception { + private final String messageGroupId; + + public SkippedMessageDueToFailedBatchException(String messageGroupId) { + this.messageGroupId = messageGroupId; + } + + public String getMessageGroupId() { + return messageGroupId; + } +} From 4d5bd9affbfea05f0d7398647c65c1eb0246d047 Mon Sep 17 00:00:00 2001 From: Scott Gerring Date: Thu, 15 Jun 2023 10:55:13 +0800 Subject: [PATCH 05/15] Clean up and refactor a bit --- .../lambda/powertools/sqs/SqsUtils.java | 54 ++++++++++--------- ...ippedMessageDueToFailedBatchException.java | 10 ++-- .../sqs/SqsUtilsFifoBatchProcessorTest.java | 40 +++++++++++++- 3 files changed, 72 insertions(+), 32 deletions(-) diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java index c81e83f1f..7299405a7 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java @@ -493,37 +493,41 @@ public static List batchProcessor(final SQSEvent event, client = SqsClient.create(); } + // If we are working on a FIFO queue, when any message fails we should stop processing and return the + // rest of the batch as failed too. We use this variable to track when that has happened. + // https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting + boolean failWholeBatch = false; + BatchContext batchContext = new BatchContext(client); - List failedMessageGroupIds = new ArrayList<>(); // Track which message groups have failed for (SQSMessage message : event.getRecords()) { - - // If we are trying to process a message that has a messageGroupId, we are on a FIFO queue, - // and should not process it if a previous message in the same group in this batch has failed. - String messageGroupId = message.getAttributes() != null? - message.getAttributes().get(MessageGroupIdKey) : null; - - try { - if (messageGroupId != null && failedMessageGroupIds.contains(messageGroupId)) { - LOG.info("Skipping message {} as another message in messageGroup {} failed in this batch", - message.getMessageId(), messageGroupId); - - // We need to add a failure for this message, so that we report back to Lambda that this - // message should be retried - // https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting - batchContext.addFailure(message, new SkippedMessageDueToFailedBatchException(messageGroupId)); - } else { + // If the batch has already failed, we mark each message past the failure point as failed + if (failWholeBatch) { + LOG.info("Skipping message {} as another message with a message group failed in this batch", + message.getMessageId()); + batchContext.addFailure(message, new SkippedMessageDueToFailedBatchException()); + } else { + // If the batch hasn't failed, try process the message + try { handlerReturn.add(handler.process(message)); batchContext.addSuccess(message); + } catch(Exception e){ + + // Record the failure + batchContext.addFailure(message, e); + + // If we are trying to process a message that has a messageGroupId, we are on a FIFO queue. A failure + // now stops us from processing the rest of the batch. + String messageGroupId = message.getAttributes() != null ? + message.getAttributes().get(MessageGroupIdKey) : null; + + if (messageGroupId != null) { + failWholeBatch = true; + LOG.info("A message in a message batch with messageGroupId {} and messageId {} failed; failing the rest of the batch too" + , messageGroupId, message.getMessageId()); + } + LOG.error("Encountered issue processing message: {}", message.getMessageId(), e); } - } catch (Exception e) { - batchContext.addFailure(message, e); - - // Record that this messageGroupId has a failure - if (messageGroupId != null) { - failedMessageGroupIds.add(messageGroupId); - } - LOG.error("Encountered issue processing message: {}", message.getMessageId(), e); } } diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/exception/SkippedMessageDueToFailedBatchException.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/exception/SkippedMessageDueToFailedBatchException.java index b75b2735b..280cfe568 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/exception/SkippedMessageDueToFailedBatchException.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/exception/SkippedMessageDueToFailedBatchException.java @@ -1,13 +1,11 @@ package software.amazon.lambda.powertools.sqs.exception; +/** + * Thrown to indicate that a + */ public class SkippedMessageDueToFailedBatchException extends Exception { - private final String messageGroupId; - public SkippedMessageDueToFailedBatchException(String messageGroupId) { - this.messageGroupId = messageGroupId; + public SkippedMessageDueToFailedBatchException() { } - public String getMessageGroupId() { - return messageGroupId; - } } diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/SqsUtilsFifoBatchProcessorTest.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/SqsUtilsFifoBatchProcessorTest.java index c40da2116..8a1851e9b 100644 --- a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/SqsUtilsFifoBatchProcessorTest.java +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/SqsUtilsFifoBatchProcessorTest.java @@ -3,8 +3,12 @@ import com.amazonaws.services.lambda.runtime.events.SQSEvent; import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; +import org.mockito.Mockito; import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse; import java.io.IOException; import java.util.List; @@ -24,6 +28,9 @@ public class SqsUtilsFifoBatchProcessorTest { @Mock private SqsClient sqsClient; + @Mock + private ArgumentCaptor deleteMessageCaptor; + public SqsUtilsFifoBatchProcessorTest() throws IOException { openMocks(this); overrideSqsClient(sqsClient); @@ -32,16 +39,47 @@ public SqsUtilsFifoBatchProcessorTest() throws IOException { @Test public void processWholeBatch() { + // Act AtomicInteger processedCount = new AtomicInteger(); List results = batchProcessor(sqsBatchEvent, false, (message) -> { processedCount.getAndIncrement(); return true; }); + // Assert assertThat(processedCount.get()).isEqualTo(3); assertThat(results.size()).isEqualTo(3); } +// @Test +// public void singleFailureAtEndOfBatch() { +// +// // Arrange +// Mockito.when(sqsClient.deleteMessageBatch(deleteMessageCaptor.capture())).thenReturn(DeleteMessageBatchResponse +// .builder().build()); +// +// // Act +// AtomicInteger processedCount = new AtomicInteger(); +// List results = batchProcessor(sqsBatchEvent, false, (message) -> { +// int value = processedCount.getAndIncrement(); +// if (value == 2) { +// throw new RuntimeException("Whoops"); +// } +// return true; +// }); +// +// // Assert +// // - two messages processed +// assertThat(processedCount.get()).isEqualTo(2); +// assertThat(results.size()).isEqualTo(2); +// +// // - one message explicitly deleted +// assertThat(deleteMessageCaptor.getValue().entries().size()).isEqualTo(1); +// assertThat(deleteMessageCaptor.getValue().entries().get(0).id()) +// .isEqualTo(sqsBatchEvent.getRecords().get(sqsBatchEvent.getRecords().size() -1).getMessageId()); +// +// } + @Test public void messageFailureStopsGroupProcessing() { String groupToFail = sqsBatchEvent.getRecords().get(0).getAttributes().get("MessageGroupId"); @@ -55,7 +93,7 @@ public void messageFailureStopsGroupProcessing() { return groupId; })) .satisfies(e -> { - assertThat(e.successMessageReturnValues().size()).isEqualTo(1); + assertThat(e.successMessageReturnValues().size()).isEqualTo(0); assertThat(e.successMessageReturnValues().contains(groupToFail)).isFalse(); }); } From bac258164197ce165234d55fed51fc67d872d4cb Mon Sep 17 00:00:00 2001 From: Scott Gerring Date: Thu, 15 Jun 2023 11:36:25 +0800 Subject: [PATCH 06/15] Better testing --- .../sqs/SqsUtilsFifoBatchProcessorTest.java | 91 ++++++++++++------- 1 file changed, 56 insertions(+), 35 deletions(-) diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/SqsUtilsFifoBatchProcessorTest.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/SqsUtilsFifoBatchProcessorTest.java index 8a1851e9b..a5fba686d 100644 --- a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/SqsUtilsFifoBatchProcessorTest.java +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/SqsUtilsFifoBatchProcessorTest.java @@ -2,20 +2,24 @@ import com.amazonaws.services.lambda.runtime.events.SQSEvent; import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.mockito.ArgumentCaptor; -import org.mockito.Mock; -import org.mockito.Mockito; +import org.mockito.*; +import org.mockito.quality.Strictness; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse; import java.io.IOException; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.MockitoAnnotations.openMocks; import static software.amazon.lambda.powertools.sqs.SqsUtils.batchProcessor; import static software.amazon.lambda.powertools.sqs.SqsUtils.overrideSqsClient; @@ -24,19 +28,35 @@ public class SqsUtilsFifoBatchProcessorTest { private static final ObjectMapper MAPPER = new ObjectMapper(); private static SQSEvent sqsBatchEvent; + private MockitoSession session; @Mock private SqsClient sqsClient; - @Mock - private ArgumentCaptor deleteMessageCaptor; + @Captor + private ArgumentCaptor deleteMessageBatchCaptor; public SqsUtilsFifoBatchProcessorTest() throws IOException { - openMocks(this); - overrideSqsClient(sqsClient); sqsBatchEvent = MAPPER.readValue(SqsUtilsFifoBatchProcessorTest.class.getResource("/SqsFifoBatchEvent.json"), SQSEvent.class); } + @BeforeEach + public void setup() { + // Establish a strict mocking session. This means that any + // calls to a mock that have not been stubbed will throw + this.session = Mockito.mockitoSession() + .strictness(Strictness.STRICT_STUBS) + .initMocks(this) + .startMocking(); + + overrideSqsClient(sqsClient); + } + + @AfterEach + public void tearDown() { + session.finishMocking(); + } + @Test public void processWholeBatch() { // Act @@ -51,34 +71,35 @@ public void processWholeBatch() { assertThat(results.size()).isEqualTo(3); } -// @Test -// public void singleFailureAtEndOfBatch() { -// -// // Arrange -// Mockito.when(sqsClient.deleteMessageBatch(deleteMessageCaptor.capture())).thenReturn(DeleteMessageBatchResponse -// .builder().build()); -// -// // Act -// AtomicInteger processedCount = new AtomicInteger(); -// List results = batchProcessor(sqsBatchEvent, false, (message) -> { -// int value = processedCount.getAndIncrement(); -// if (value == 2) { -// throw new RuntimeException("Whoops"); -// } -// return true; -// }); -// -// // Assert -// // - two messages processed -// assertThat(processedCount.get()).isEqualTo(2); -// assertThat(results.size()).isEqualTo(2); -// -// // - one message explicitly deleted -// assertThat(deleteMessageCaptor.getValue().entries().size()).isEqualTo(1); -// assertThat(deleteMessageCaptor.getValue().entries().get(0).id()) -// .isEqualTo(sqsBatchEvent.getRecords().get(sqsBatchEvent.getRecords().size() -1).getMessageId()); -// -// } + @Test + public void singleFailureAtEndOfBatch() { + + // Arrange + Mockito.when(sqsClient.deleteMessageBatch(deleteMessageBatchCaptor.capture())).thenReturn(DeleteMessageBatchResponse + .builder().build()); + + + // Act + AtomicInteger processedCount = new AtomicInteger(); + assertThatExceptionOfType(SQSBatchProcessingException.class) + .isThrownBy(() -> batchProcessor(sqsBatchEvent, false, (message) -> { + int value = processedCount.getAndIncrement(); + if (value == 2) { + throw new RuntimeException("Whoops"); + } + return true; + })); + + // Assert + DeleteMessageBatchRequest deleteRequest = deleteMessageBatchCaptor.getValue(); + List messageIds = deleteRequest.entries().stream() + .map(DeleteMessageBatchRequestEntry::id) + .collect(Collectors.toList()); + assertThat(deleteRequest.entries().size()).isEqualTo(2); + assertThat(messageIds.contains(sqsBatchEvent.getRecords().get(0).getMessageId())).isTrue(); + assertThat(messageIds.contains(sqsBatchEvent.getRecords().get(1).getMessageId())).isTrue(); + assertThat(messageIds.contains(sqsBatchEvent.getRecords().get(2).getMessageId())).isFalse(); + } @Test public void messageFailureStopsGroupProcessing() { From 8d42118d48bac829eb01f07136e73b53a3341a70 Mon Sep 17 00:00:00 2001 From: Scott Gerring Date: Thu, 15 Jun 2023 11:59:00 +0800 Subject: [PATCH 07/15] Update doco --- docs/utilities/batch.md | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index 0a2add081..d782c109b 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -4,6 +4,9 @@ description: Utility --- The SQS batch processing utility provides a way to handle partial failures when processing batches of messages from SQS. +The utility handles batch processing for both +[standard](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/standard-queues.html) and +[FIFO](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html) SQS queues. **Key Features** @@ -110,8 +113,11 @@ Both have nearly the same behaviour when it comes to processing messages from th * **Entire batch has been successfully processed**, where your Lambda handler returned successfully, we will let SQS delete the batch to optimize your cost * **Entire Batch has been partially processed successfully**, where exceptions were raised within your `SqsMessageHandler` interface implementation, we will: - **1)** Delete successfully processed messages from the queue by directly calling `sqs:DeleteMessageBatch` - - **2)** if, non retryable exceptions occur, messages resulting in configured exceptions during processing will be immediately moved to the dead letter queue associated to the source SQS queue or deleted from the source SQS queue if `deleteNonRetryableMessageFromQueue` is set to `true`. - - **3)** Raise `SQSBatchProcessingException` to ensure failed messages return to your SQS queue + - **2)** If a message with a [message group ID](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagegroupid-property.html) fails, + the processing of the batch will be stopped and the remainder of the messages will be returned to SQS. + This behaviour [is required to handle SQS FIFO queues](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting). + - **3)** if non retryable exceptions occur, messages resulting in configured exceptions during processing will be immediately moved to the dead letter queue associated to the source SQS queue or deleted from the source SQS queue if `deleteNonRetryableMessageFromQueue` is set to `true`. + - **4)** Raise `SQSBatchProcessingException` to ensure failed messages return to your SQS queue The only difference is that **SqsUtils Utility API** will give you access to return from the processed messages if you need. Exception `SQSBatchProcessingException` thrown from the utility will have access to both successful and failed messaged along with failure exceptions. From 7105b0e239f800307c1b55fd7a7ed62d5e7e8f8e Mon Sep 17 00:00:00 2001 From: Scott Gerring Date: Wed, 5 Jul 2023 15:11:16 +0200 Subject: [PATCH 08/15] Address some review comments --- .../java/software/amazon/lambda/powertools/sqs/SqsUtils.java | 4 ++-- .../exception/SkippedMessageDueToFailedBatchException.java | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java index 0c4e1e1c2..c3892c64b 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java @@ -45,7 +45,7 @@ public final class SqsUtils { private static S3Client s3Client; // The attribute on an SQS-FIFO message used to record the message group ID - private static final String MessageGroupIdKey = "MessageGroupId"; + private static final String MESSAGE_GROUP_ID = "MessageGroupId"; private SqsUtils() { } @@ -519,7 +519,7 @@ public static List batchProcessor(final SQSEvent event, // If we are trying to process a message that has a messageGroupId, we are on a FIFO queue. A failure // now stops us from processing the rest of the batch. String messageGroupId = message.getAttributes() != null ? - message.getAttributes().get(MessageGroupIdKey) : null; + message.getAttributes().get(MESSAGE_GROUP_ID) : null; if (messageGroupId != null) { failWholeBatch = true; diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/exception/SkippedMessageDueToFailedBatchException.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/exception/SkippedMessageDueToFailedBatchException.java index 280cfe568..9dbb66509 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/exception/SkippedMessageDueToFailedBatchException.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/exception/SkippedMessageDueToFailedBatchException.java @@ -1,7 +1,8 @@ package software.amazon.lambda.powertools.sqs.exception; /** - * Thrown to indicate that a + * Indicates that a message has been skipped due to the batch it is + * within failing. */ public class SkippedMessageDueToFailedBatchException extends Exception { From 617ec8a272004cc3b2c2d524d7d96c2c425f3831 Mon Sep 17 00:00:00 2001 From: Scott Gerring Date: Mon, 10 Jul 2023 20:14:57 +0200 Subject: [PATCH 09/15] Use event serialization helpers --- powertools-sqs/pom.xml | 4 ++++ .../powertools/sqs/SqsUtilsFifoBatchProcessorTest.java | 4 +++- powertools-sqs/src/test/resources/SqsFifoBatchEvent.json | 8 ++++---- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/powertools-sqs/pom.xml b/powertools-sqs/pom.xml index 5eafcc8d3..618aa948c 100644 --- a/powertools-sqs/pom.xml +++ b/powertools-sqs/pom.xml @@ -45,6 +45,10 @@ software.amazon.lambda powertools-core + + com.amazonaws + aws-lambda-java-tests + com.amazonaws aws-lambda-java-core diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/SqsUtilsFifoBatchProcessorTest.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/SqsUtilsFifoBatchProcessorTest.java index a5fba686d..d94a74dea 100644 --- a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/SqsUtilsFifoBatchProcessorTest.java +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/SqsUtilsFifoBatchProcessorTest.java @@ -1,6 +1,7 @@ package software.amazon.lambda.powertools.sqs; import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import com.amazonaws.services.lambda.runtime.tests.EventLoader; import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -37,7 +38,8 @@ public class SqsUtilsFifoBatchProcessorTest { private ArgumentCaptor deleteMessageBatchCaptor; public SqsUtilsFifoBatchProcessorTest() throws IOException { - sqsBatchEvent = MAPPER.readValue(SqsUtilsFifoBatchProcessorTest.class.getResource("/SqsFifoBatchEvent.json"), SQSEvent.class); + //sqsBatchEvent = MAPPER.readValue(SqsUtilsFifoBatchProcessorTest.class.getResource("/SqsFifoBatchEvent.json"), SQSEvent.class); + sqsBatchEvent = EventLoader.loadSQSEvent("SqsFifoBatchEvent.json"); } @BeforeEach diff --git a/powertools-sqs/src/test/resources/SqsFifoBatchEvent.json b/powertools-sqs/src/test/resources/SqsFifoBatchEvent.json index a66dacd60..726e45ea1 100644 --- a/powertools-sqs/src/test/resources/SqsFifoBatchEvent.json +++ b/powertools-sqs/src/test/resources/SqsFifoBatchEvent.json @@ -1,5 +1,5 @@ { - "records": [ + "Records": [ { "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", @@ -14,7 +14,7 @@ "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", - "eventSourceArn": "arn:aws:sqs:us-east-1:906126917321:sqs-lambda-MySqsQueue-4DYWWJIE97N5", + "eventSourceARN": "arn:aws:sqs:us-east-1:906126917321:sqs-lambda-MySqsQueue-4DYWWJIE97N5", "awsRegion": "us-east-2" }, { @@ -44,7 +44,7 @@ }, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", - "eventSourceArn": "arn:aws:sqs:us-east-1:906126917321:sqs-lambda-MySqsQueue-4DYWWJIE97N5", + "eventSourceARN": "arn:aws:sqs:us-east-1:906126917321:sqs-lambda-MySqsQueue-4DYWWJIE97N5", "awsRegion": "us-east-2" }, { @@ -66,7 +66,7 @@ }, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", - "eventSourceArn": "arn:aws:sqs:us-east-1:906126917321:sqs-lambda-MySqsQueue-4DYWWJIE97N5", + "eventSourceARN": "arn:aws:sqs:us-east-1:906126917321:sqs-lambda-MySqsQueue-4DYWWJIE97N5", "awsRegion": "us-east-2" } ] From 0f7813339944bcc215cf02eb94936a3891e7f3b5 Mon Sep 17 00:00:00 2001 From: Scott Gerring Date: Mon, 10 Jul 2023 21:41:55 +0200 Subject: [PATCH 10/15] What about queues? --- .../lambda/powertools/sqs/SqsUtils.java | 62 ++++++++++--------- 1 file changed, 33 insertions(+), 29 deletions(-) diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java index c3892c64b..0b278dac6 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java @@ -15,7 +15,9 @@ import java.lang.reflect.Constructor; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; +import java.util.Queue; import java.util.function.Function; import java.util.stream.Collectors; @@ -496,41 +498,43 @@ public static List batchProcessor(final SQSEvent event, // If we are working on a FIFO queue, when any message fails we should stop processing and return the // rest of the batch as failed too. We use this variable to track when that has happened. // https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting - boolean failWholeBatch = false; BatchContext batchContext = new BatchContext(client); - - for (SQSMessage message : event.getRecords()) { - // If the batch has already failed, we mark each message past the failure point as failed - if (failWholeBatch) { - LOG.info("Skipping message {} as another message with a message group failed in this batch", - message.getMessageId()); - batchContext.addFailure(message, new SkippedMessageDueToFailedBatchException()); - } else { - // If the batch hasn't failed, try process the message - try { - handlerReturn.add(handler.process(message)); - batchContext.addSuccess(message); - } catch(Exception e){ - - // Record the failure - batchContext.addFailure(message, e); - - // If we are trying to process a message that has a messageGroupId, we are on a FIFO queue. A failure - // now stops us from processing the rest of the batch. - String messageGroupId = message.getAttributes() != null ? - message.getAttributes().get(MESSAGE_GROUP_ID) : null; - - if (messageGroupId != null) { - failWholeBatch = true; - LOG.info("A message in a message batch with messageGroupId {} and messageId {} failed; failing the rest of the batch too" - , messageGroupId, message.getMessageId()); - } - LOG.error("Encountered issue processing message: {}", message.getMessageId(), e); + Queue messagesToProcess = new LinkedList<>(event.getRecords()); + while (!messagesToProcess.isEmpty()) { + SQSMessage message = messagesToProcess.remove(); + // If the batch hasn't failed, try process the message + try { + handlerReturn.add(handler.process(message)); + batchContext.addSuccess(message); + } catch(Exception e){ + + // Record the failure + batchContext.addFailure(message, e); + + // If we are trying to process a message that has a messageGroupId, we are on a FIFO queue. A failure + // now stops us from processing the rest of the batch; we break out of the loop leaving unprocessed + // messages in the queu + String messageGroupId = message.getAttributes() != null ? + message.getAttributes().get(MESSAGE_GROUP_ID) : null; + if (messageGroupId != null) { + LOG.info("A message in a message batch with messageGroupId {} and messageId {} failed; failing the rest of the batch too" + , messageGroupId, message.getMessageId()); + break; } + LOG.error("Encountered issue processing message: {}", message.getMessageId(), e); } } + // If we have a FIFO batch failure, unprocessed messages will remain on the queue + // past the failed message. We have to add these to the errors + while (!messagesToProcess.isEmpty()) { + SQSMessage message = messagesToProcess.remove(); + LOG.info("Skipping message {} as another message with a message group failed in this batch", + message.getMessageId()); + batchContext.addFailure(message, new SkippedMessageDueToFailedBatchException()); + } + batchContext.processSuccessAndHandleFailed(handlerReturn, suppressException, deleteNonRetryableMessageFromQueue, nonRetryableExceptions); return handlerReturn; From c0cd20b1bdf8213c5ec911000ff3e503a0d7305b Mon Sep 17 00:00:00 2001 From: Scott Gerring Date: Mon, 10 Jul 2023 21:46:03 +0200 Subject: [PATCH 11/15] Rejig --- .../amazon/lambda/powertools/sqs/SqsUtils.java | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java index 0b278dac6..cd37e8be2 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java @@ -495,10 +495,6 @@ public static List batchProcessor(final SQSEvent event, client = SqsClient.create(); } - // If we are working on a FIFO queue, when any message fails we should stop processing and return the - // rest of the batch as failed too. We use this variable to track when that has happened. - // https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting - BatchContext batchContext = new BatchContext(client); Queue messagesToProcess = new LinkedList<>(event.getRecords()); while (!messagesToProcess.isEmpty()) { @@ -514,7 +510,8 @@ public static List batchProcessor(final SQSEvent event, // If we are trying to process a message that has a messageGroupId, we are on a FIFO queue. A failure // now stops us from processing the rest of the batch; we break out of the loop leaving unprocessed - // messages in the queu + // messages in the queue + // https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting String messageGroupId = message.getAttributes() != null ? message.getAttributes().get(MESSAGE_GROUP_ID) : null; if (messageGroupId != null) { @@ -528,15 +525,13 @@ public static List batchProcessor(final SQSEvent event, // If we have a FIFO batch failure, unprocessed messages will remain on the queue // past the failed message. We have to add these to the errors - while (!messagesToProcess.isEmpty()) { - SQSMessage message = messagesToProcess.remove(); + messagesToProcess.forEach(message -> { LOG.info("Skipping message {} as another message with a message group failed in this batch", message.getMessageId()); batchContext.addFailure(message, new SkippedMessageDueToFailedBatchException()); - } + }); batchContext.processSuccessAndHandleFailed(handlerReturn, suppressException, deleteNonRetryableMessageFromQueue, nonRetryableExceptions); - return handlerReturn; } From 8480a41bbd6966b0f2a9f6e5b9482464cb7369e6 Mon Sep 17 00:00:00 2001 From: Scott Gerring Date: Tue, 11 Jul 2023 09:52:33 +0200 Subject: [PATCH 12/15] Update powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Jérôme Van Der Linden <117538+jeromevdl@users.noreply.github.com> --- .../java/software/amazon/lambda/powertools/sqs/SqsUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java index cd37e8be2..c96f702d8 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java @@ -503,7 +503,7 @@ public static List batchProcessor(final SQSEvent event, try { handlerReturn.add(handler.process(message)); batchContext.addSuccess(message); - } catch(Exception e){ + } catch (Exception e) { // Record the failure batchContext.addFailure(message, e); From a99b842ebcf066b5ec8410a9f6a28ee14fcd3d05 Mon Sep 17 00:00:00 2001 From: Scott Gerring Date: Tue, 11 Jul 2023 09:55:34 +0200 Subject: [PATCH 13/15] Address review comments --- .../lambda/powertools/sqs/SqsUtilsFifoBatchProcessorTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/SqsUtilsFifoBatchProcessorTest.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/SqsUtilsFifoBatchProcessorTest.java index d94a74dea..517e46af1 100644 --- a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/SqsUtilsFifoBatchProcessorTest.java +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/SqsUtilsFifoBatchProcessorTest.java @@ -27,7 +27,6 @@ public class SqsUtilsFifoBatchProcessorTest { - private static final ObjectMapper MAPPER = new ObjectMapper(); private static SQSEvent sqsBatchEvent; private MockitoSession session; @@ -38,7 +37,6 @@ public class SqsUtilsFifoBatchProcessorTest { private ArgumentCaptor deleteMessageBatchCaptor; public SqsUtilsFifoBatchProcessorTest() throws IOException { - //sqsBatchEvent = MAPPER.readValue(SqsUtilsFifoBatchProcessorTest.class.getResource("/SqsFifoBatchEvent.json"), SQSEvent.class); sqsBatchEvent = EventLoader.loadSQSEvent("SqsFifoBatchEvent.json"); } From 685177605bc9d5a96f4a1ea96102679c06e9475b Mon Sep 17 00:00:00 2001 From: Scott Gerring Date: Tue, 11 Jul 2023 15:07:15 +0200 Subject: [PATCH 14/15] Increase coverage --- .../lambda/powertools/sqs/SqsUtils.java | 25 +++++++---- .../sqs/SqsUtilsFifoBatchProcessorTest.java | 44 +++++++++++++++++++ 2 files changed, 61 insertions(+), 8 deletions(-) diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java index c96f702d8..007ffb0b2 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java @@ -496,9 +496,14 @@ public static List batchProcessor(final SQSEvent event, } BatchContext batchContext = new BatchContext(client); - Queue messagesToProcess = new LinkedList<>(event.getRecords()); - while (!messagesToProcess.isEmpty()) { - SQSMessage message = messagesToProcess.remove(); + int offset = 0; + while (offset < event.getRecords().size()) { + // Get the current message and advance to the next. Doing this here + // makes it easier for us to know where we are up to if we have to + // break out of here early. + SQSMessage message = event.getRecords().get(offset); + offset++; + // If the batch hasn't failed, try process the message try { handlerReturn.add(handler.process(message)); @@ -525,11 +530,15 @@ public static List batchProcessor(final SQSEvent event, // If we have a FIFO batch failure, unprocessed messages will remain on the queue // past the failed message. We have to add these to the errors - messagesToProcess.forEach(message -> { - LOG.info("Skipping message {} as another message with a message group failed in this batch", - message.getMessageId()); - batchContext.addFailure(message, new SkippedMessageDueToFailedBatchException()); - }); + if (offset < event.getRecords().size()) { + event.getRecords() + .subList(offset, event.getRecords().size()) + .forEach(message -> { + LOG.info("Skipping message {} as another message with a message group failed in this batch", + message.getMessageId()); + batchContext.addFailure(message, new SkippedMessageDueToFailedBatchException()); + }); + } batchContext.processSuccessAndHandleFailed(handlerReturn, suppressException, deleteNonRetryableMessageFromQueue, nonRetryableExceptions); return handlerReturn; diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/SqsUtilsFifoBatchProcessorTest.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/SqsUtilsFifoBatchProcessorTest.java index 517e46af1..cfa79dc36 100644 --- a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/SqsUtilsFifoBatchProcessorTest.java +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/SqsUtilsFifoBatchProcessorTest.java @@ -71,6 +71,50 @@ public void processWholeBatch() { assertThat(results.size()).isEqualTo(3); } + /** + * Check that a failure in the middle of the batch: + * - deletes the successful message explicitly from SQS + * - marks the failed and subsequent message as failed + * - does not delete the failed or subsequent message + */ + @Test + public void singleFailureInMiddleOfBatch() { + // Arrange + Mockito.when(sqsClient.deleteMessageBatch(deleteMessageBatchCaptor.capture())).thenReturn(DeleteMessageBatchResponse + .builder().build()); + + // Act + AtomicInteger processedCount = new AtomicInteger(); + assertThatExceptionOfType(SQSBatchProcessingException.class) + .isThrownBy(() -> batchProcessor(sqsBatchEvent, false, (message) -> { + int value = processedCount.getAndIncrement(); + if (value == 1) { + throw new RuntimeException("Whoops"); + } + return true; + })) + + // Assert + .isInstanceOf(SQSBatchProcessingException.class) + .satisfies(e -> { + List failures = ((SQSBatchProcessingException)e).getFailures(); + assertThat(failures.size()).isEqualTo(2); + List failureIds = failures.stream() + .map(SQSEvent.SQSMessage::getMessageId) + .collect(Collectors.toList()); + assertThat(failureIds).contains(sqsBatchEvent.getRecords().get(1).getMessageId()); + assertThat(failureIds).contains(sqsBatchEvent.getRecords().get(2).getMessageId()); + }); + + DeleteMessageBatchRequest deleteRequest = deleteMessageBatchCaptor.getValue(); + List messageIds = deleteRequest.entries().stream() + .map(DeleteMessageBatchRequestEntry::id) + .collect(Collectors.toList()); + assertThat(deleteRequest.entries().size()).isEqualTo(1); + assertThat(messageIds.contains(sqsBatchEvent.getRecords().get(0).getMessageId())).isTrue(); + + } + @Test public void singleFailureAtEndOfBatch() { From 2a39a84a690d29426f731e7222feab58fcab791b Mon Sep 17 00:00:00 2001 From: Scott Gerring Date: Tue, 11 Jul 2023 16:00:43 +0200 Subject: [PATCH 15/15] Remove break --- .../java/software/amazon/lambda/powertools/sqs/SqsUtils.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java index 007ffb0b2..9fff4dc6f 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java @@ -497,7 +497,8 @@ public static List batchProcessor(final SQSEvent event, BatchContext batchContext = new BatchContext(client); int offset = 0; - while (offset < event.getRecords().size()) { + boolean failedBatch = false; + while (offset < event.getRecords().size() && !failedBatch) { // Get the current message and advance to the next. Doing this here // makes it easier for us to know where we are up to if we have to // break out of here early. @@ -522,7 +523,7 @@ public static List batchProcessor(final SQSEvent event, if (messageGroupId != null) { LOG.info("A message in a message batch with messageGroupId {} and messageId {} failed; failing the rest of the batch too" , messageGroupId, message.getMessageId()); - break; + failedBatch = true; } LOG.error("Encountered issue processing message: {}", message.getMessageId(), e); }