Skip to content

Split batches which would go over the threshold #44

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
target/
.idea/
*.iml
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@
import java.io.Writer;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.*;
import java.util.Map.Entry;

import com.amazonaws.AmazonClientException;
Expand Down Expand Up @@ -707,6 +705,13 @@ public ChangeMessageVisibilityResult changeMessageVisibility(ChangeMessageVisibi
* S3 when necessary.
* </p>
* <p>
* As a batch can become larger than the threshold, this method will split
* the batch into smaller batches, preserving the order of messages. If any of the
* calls to send a message batch results in an error, the sending process will stop.
* This can result, for large messages just under the threshold, in a batch of size n
* producing n batches.
* </p>
* <p>
* If the <code>DelaySeconds</code> parameter is not specified for an entry,
* the default for the queue is used.
* </p>
Expand Down Expand Up @@ -741,7 +746,8 @@ public ChangeMessageVisibilityResult changeMessageVisibility(ChangeMessageVisibi
* SendMessageBatch service method on AmazonSQS.
*
* @return The response from the SendMessageBatch service method, as
* returned by AmazonSQS.
* returned by AmazonSQS. <b>Note:</b> if there are multiple calls
* then this is a composite of the calls.
*
* @throws BatchEntryIdsNotDistinctException
* @throws TooManyEntriesInBatchRequestException
Expand Down Expand Up @@ -774,16 +780,88 @@ public SendMessageBatchResult sendMessageBatch(SendMessageBatchRequest sendMessa
}

List<SendMessageBatchRequestEntry> batchEntries = sendMessageBatchRequest.getEntries();
List<Long> messageSizes = measureMessagesAndMoveLargeOnesToS3(batchEntries);

return splitAndSendMessageBatches(sendMessageBatchRequest, batchEntries, messageSizes);
}

private SendMessageBatchResult splitAndSendMessageBatches(SendMessageBatchRequest sendMessageBatchRequest,
List<SendMessageBatchRequestEntry> batchEntries,
List<Long> messageSizes) {
int range = getSliceOfBatchToSend(messageSizes);

// modify the send batch request to use the messages that fit
List<SendMessageBatchRequestEntry> sendThisTime = batchEntries.subList(0, range);
sendMessageBatchRequest.setEntries(sendThisTime);

SendMessageBatchResult result = super.sendMessageBatch(sendMessageBatchRequest);

// calculate how many will be sent in the next slice
List<SendMessageBatchRequestEntry> remainder = batchEntries.subList(range, batchEntries.size());
List<Long> remainingSizes = messageSizes.subList(range, batchEntries.size());

// return this result if there were errors, or if there's
// nothing further to send
if (!result.getFailed().isEmpty() || remainder.isEmpty()) {
return result;
}

// recurse into the method to send the remainder
SendMessageBatchResult recursiveResult =
splitAndSendMessageBatches(sendMessageBatchRequest, remainder, remainingSizes);

// add the messages successfully sent by the earlier call
// preserving the order of successful messages
// Note: the result from earlier cannot have failures in it so we don't need to copy them
// similarly, we want the recursive result's overall status as it's the last call in the chain.
result.getSuccessful().addAll(recursiveResult.getSuccessful());
recursiveResult.setSuccessful(result.getSuccessful());

return recursiveResult;
}

private int getSliceOfBatchToSend(List<Long> messageSizes) {
int range = 0;
long totalSize = 0;
for (Long messageSize : messageSizes) {
// measure the total including this, to see if
// we've exceeded the maximum size
totalSize += messageSize;
if (isLarge(totalSize)) {
// stop here
break;
}

// we can include this item
range++;
}

// the earlier code should already have made this impossible, but add error handling rather than
// a confusing infinite loop/stack overflow
if (range == 0) {
throw new IllegalStateException("A message in the batch is larger than the threshold when " +
"it should already have been exported to S3");
}
return range;
}

private List<Long> measureMessagesAndMoveLargeOnesToS3(List<SendMessageBatchRequestEntry> batchEntries) {
int index = 0;
List<Long> messageSizes = new LinkedList<>();
for (SendMessageBatchRequestEntry entry : batchEntries) {
if (clientConfiguration.isAlwaysThroughS3() || isLarge(entry)) {
batchEntries.set(index, storeMessageInS3(entry));
long entrySize = sizeOf(entry);

if (clientConfiguration.isAlwaysThroughS3() || isLarge(entrySize)) {
SendMessageBatchRequestEntry storedVersion = storeMessageInS3(entry);
batchEntries.set(index, storedVersion);
messageSizes.add(sizeOf(storedVersion));
} else {
messageSizes.add(entrySize);
}

++index;
}

return super.sendMessageBatch(sendMessageBatchRequest);
return messageSizes;
}

/**
Expand Down Expand Up @@ -1215,14 +1293,17 @@ private boolean isLarge(SendMessageRequest sendMessageRequest) {
int msgAttributesSize = getMsgAttributesSize(sendMessageRequest.getMessageAttributes());
long msgBodySize = getStringSizeInBytes(sendMessageRequest.getMessageBody());
long totalMsgSize = msgAttributesSize + msgBodySize;
return (totalMsgSize > clientConfiguration.getMessageSizeThreshold());
return isLarge(totalMsgSize);
}

private boolean isLarge(SendMessageBatchRequestEntry batchEntry) {
private boolean isLarge(long size) {
return (size > clientConfiguration.getMessageSizeThreshold());
}

private long sizeOf(SendMessageBatchRequestEntry batchEntry) {
int msgAttributesSize = getMsgAttributesSize(batchEntry.getMessageAttributes());
long msgBodySize = getStringSizeInBytes(batchEntry.getMessageBody());
long totalMsgSize = msgAttributesSize + msgBodySize;
return (totalMsgSize > clientConfiguration.getMessageSizeThreshold());
return msgAttributesSize + msgBodySize;
}

private int getMsgAttributesSize(Map<String, MessageAttributeValue> msgAttributes) {
Expand Down
Loading