From 3c8a447eeb8db024079b25a23f66257a5c163bd0 Mon Sep 17 00:00:00 2001 From: Ashley Frieze Date: Wed, 17 Jul 2019 21:49:30 +0100 Subject: [PATCH] Prevent a batch of messages going over the threshold --- .gitignore | 2 + .../AmazonSQSExtendedClient.java | 49 ++++++++--- .../AmazonSQSExtendedClientTest.java | 81 +++++++++++++++++-- 3 files changed, 115 insertions(+), 17 deletions(-) diff --git a/.gitignore b/.gitignore index 2f7896d..ec10551 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,3 @@ target/ +.idea/ +*.iml diff --git a/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java b/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java index 8200d7b..c1926df 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java +++ b/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java @@ -22,9 +22,7 @@ import java.io.Writer; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; -import java.util.List; -import java.util.Map; -import java.util.UUID; +import java.util.*; import java.util.Map.Entry; import com.amazonaws.AmazonClientException; @@ -775,15 +773,41 @@ public SendMessageBatchResult sendMessageBatch(SendMessageBatchRequest sendMessa List batchEntries = sendMessageBatchRequest.getEntries(); + List smallMessages = new LinkedList<>(); + long totalSize = moveIndividuallyLargeMessagesToS3(batchEntries, smallMessages); + + if (isLarge(totalSize) && !smallMessages.isEmpty()) { + moveRemainingMessagesToS3(batchEntries, smallMessages); + } + + return super.sendMessageBatch(sendMessageBatchRequest); + } + + private void moveRemainingMessagesToS3(List batchEntries, List smallMessages) { + for (Integer smallMessageIndex : smallMessages) { + batchEntries.set(smallMessageIndex, storeMessageInS3(batchEntries.get(smallMessageIndex))); + } + } + + private long moveIndividuallyLargeMessagesToS3(List batchEntries, + List smallMessages) { int index = 0; + long totalSize = 0; for (SendMessageBatchRequestEntry entry : batchEntries) { - if (clientConfiguration.isAlwaysThroughS3() || isLarge(entry)) { - batchEntries.set(index, storeMessageInS3(entry)); + long entrySize = sizeOf(entry); + + if (clientConfiguration.isAlwaysThroughS3() || isLarge(entrySize)) { + SendMessageBatchRequestEntry storedVersion = storeMessageInS3(entry); + batchEntries.set(index, storedVersion); + totalSize += sizeOf(storedVersion); + } else { + totalSize += entrySize; + smallMessages.add(index); } + ++index; } - - return super.sendMessageBatch(sendMessageBatchRequest); + return totalSize; } /** @@ -1215,14 +1239,17 @@ private boolean isLarge(SendMessageRequest sendMessageRequest) { int msgAttributesSize = getMsgAttributesSize(sendMessageRequest.getMessageAttributes()); long msgBodySize = getStringSizeInBytes(sendMessageRequest.getMessageBody()); long totalMsgSize = msgAttributesSize + msgBodySize; - return (totalMsgSize > clientConfiguration.getMessageSizeThreshold()); + return isLarge(totalMsgSize); } - private boolean isLarge(SendMessageBatchRequestEntry batchEntry) { + private boolean isLarge(long size) { + return (size > clientConfiguration.getMessageSizeThreshold()); + } + + private long sizeOf(SendMessageBatchRequestEntry batchEntry) { int msgAttributesSize = getMsgAttributesSize(batchEntry.getMessageAttributes()); long msgBodySize = getStringSizeInBytes(batchEntry.getMessageBody()); - long totalMsgSize = msgAttributesSize + msgBodySize; - return (totalMsgSize > clientConfiguration.getMessageSizeThreshold()); + return msgAttributesSize + msgBodySize; } private int getMsgAttributesSize(Map msgAttributes) { diff --git a/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClientTest.java b/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClientTest.java index 3dd70c3..0cbf97c 100644 --- a/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClientTest.java +++ b/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClientTest.java @@ -61,7 +61,7 @@ public class AmazonSQSExtendedClientTest { private static final int MORE_THAN_SQS_SIZE_LIMIT = SQS_SIZE_LIMIT + 1; // should be > 1 and << SQS_SIZE_LIMIT - private static final int ARBITRATY_SMALLER_THRESSHOLD = 500; + private static final int ARBITRARY_SMALLER_THRESHOLD = 500; @Before public void setupClient() { @@ -128,10 +128,10 @@ public void testWhenSendMessageWithAlwaysThroughS3AndMessageIsSmallThenItIsStill @Test public void testWhenSendMessageWithSetMessageSizeThresholdThenThresholdIsHonored() { - int messageLength = ARBITRATY_SMALLER_THRESSHOLD * 2; + int messageLength = ARBITRARY_SMALLER_THRESHOLD * 2; String messageBody = generateStringWithLength(messageLength); ExtendedClientConfiguration extendedClientConfiguration = new ExtendedClientConfiguration() - .withLargePayloadSupportEnabled(mockS3, S3_BUCKET_NAME).withMessageSizeThreshold(ARBITRATY_SMALLER_THRESSHOLD); + .withLargePayloadSupportEnabled(mockS3, S3_BUCKET_NAME).withMessageSizeThreshold(ARBITRARY_SMALLER_THRESHOLD); AmazonSQS sqsExtended = spy(new AmazonSQSExtendedClient(mock(AmazonSQSClient.class), extendedClientConfiguration)); @@ -158,9 +158,43 @@ public void testReceiveMessageMultipleTimesDoesNotAdditionallyAlterReceiveMessag Assert.assertEquals(expectedRequest, messageRequest); } + @Test + public void testWhenSmallMessageBatchIsSentThenNoMessagesStoredInS3() { + // This creates 10 messages all well within the threshold + + int[] messageLengthForCounter = new int[] { + 1_000, + 1_000, + 1_000, + 1_000, + 1_000, + 1_000, + 1_000, + 1_000, + 1_000, + 1_000 + }; + + List batchEntries = new ArrayList(); + for (int i = 0; i < 10; i++) { + SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry(); + int messageLength = messageLengthForCounter[i]; + String messageBody = generateStringWithLength(messageLength); + entry.setMessageBody(messageBody); + entry.setId("entry_" + i); + batchEntries.add(entry); + } + + SendMessageBatchRequest batchRequest = new SendMessageBatchRequest(SQS_QUEUE_URL, batchEntries); + extendedSqsWithDefaultConfig.sendMessageBatch(batchRequest); + + // There should be no puts + verify(mockS3, never()).putObject(isA(PutObjectRequest.class)); + } + @Test public void testWhenMessageBatchIsSentThenOnlyMessagesLargerThanThresholdAreStoredInS3() { - // This creates 10 messages, out of which only two are below the threshold (100K and 200K), + // This creates 10 messages, out of which only two are below the threshold (100K and 20K), // and the other 8 are above the threshold int[] messageLengthForCounter = new int[] { @@ -172,7 +206,7 @@ public void testWhenMessageBatchIsSentThenOnlyMessagesLargerThanThresholdAreStor 700_000, 800_000, 900_000, - 200_000, + 20_000, 1000_000 }; @@ -189,10 +223,45 @@ public void testWhenMessageBatchIsSentThenOnlyMessagesLargerThanThresholdAreStor SendMessageBatchRequest batchRequest = new SendMessageBatchRequest(SQS_QUEUE_URL, batchEntries); extendedSqsWithDefaultConfig.sendMessageBatch(batchRequest); - // There should be 8 puts for the 8 messages above the threshhold + // There should be 8 puts for the 8 messages above the threshold verify(mockS3, times(8)).putObject(isA(PutObjectRequest.class)); } + @Test + public void testWhenMessageBatchIsSentWhereSumOfMessageSizesIsOverTheThresholdThenAllArePutToS3() { + // This creates 10 messages, all of which are below the threshold, but together would make + // a single request over the threshold + + int[] messageLengthForCounter = new int[] { + 26_214, + 26_214, + 26_214, + 26_214, + 26_214, + 26_214, + 26_214, + 26_214, + 26_214, + 26_219 + }; + + List batchEntries = new ArrayList(); + for (int i = 0; i < 10; i++) { + SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry(); + int messageLength = messageLengthForCounter[i]; + String messageBody = generateStringWithLength(messageLength); + entry.setMessageBody(messageBody); + entry.setId("entry_" + i); + batchEntries.add(entry); + } + + SendMessageBatchRequest batchRequest = new SendMessageBatchRequest(SQS_QUEUE_URL, batchEntries); + extendedSqsWithDefaultConfig.sendMessageBatch(batchRequest); + + // There should be 10 puts, as the whole batch is moved to s3 + verify(mockS3, times(10)).putObject(isA(PutObjectRequest.class)); + } + @Test public void testWhenSmallMessageIsSentThenNoAttributeIsAdded() { int messageLength = LESS_THAN_SQS_SIZE_LIMIT;