Skip to content

Commit 43f4957

Browse files
author
Damian Nardelli
committed
ISSUE-30: added ignorePayloadNotFound optional parameter in case you want to get rid of SQS messages that do not have their corresponding payload
1 parent df0c625 commit 43f4957

File tree

3 files changed

+93
-1
lines changed

3 files changed

+93
-1
lines changed

src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,17 @@ public ReceiveMessageResult receiveMessage(ReceiveMessageRequest receiveMessageR
365365
String s3MsgBucketName = s3Pointer.getS3BucketName();
366366
String s3MsgKey = s3Pointer.getS3Key();
367367

368-
String origMsgBody = getTextFromS3(s3MsgBucketName, s3MsgKey);
368+
final String origMsgBody;
369+
try {
370+
origMsgBody = getTextFromS3(s3MsgBucketName, s3MsgKey);
371+
} catch (AmazonServiceException e) {
372+
if (((AmazonServiceException) e.getCause()).getErrorCode().equals("NoSuchKey")) {
373+
deleteMessage(receiveMessageRequest.getQueueUrl(), message.getReceiptHandle());
374+
LOG.warn("SQS message deleted as it could not be found in S3");
375+
continue;
376+
}
377+
throw e;
378+
}
369379
LOG.info("S3 object read, Bucket name: " + s3MsgBucketName + ", Object key: " + s3MsgKey + ".");
370380

371381
message.setBody(origMsgBody);

src/main/java/com/amazon/sqs/javamessaging/ExtendedClientConfiguration.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public class ExtendedClientConfiguration {
3535
private String s3BucketName;
3636
private boolean largePayloadSupport = false;
3737
private boolean alwaysThroughS3 = false;
38+
private boolean ignorePayloadNotFound = false;
3839
private int messageSizeThreshold = SQSExtendedClientConstants.DEFAULT_MESSAGE_SIZE_THRESHOLD;
3940

4041
public ExtendedClientConfiguration() {
@@ -47,6 +48,7 @@ public ExtendedClientConfiguration(ExtendedClientConfiguration other) {
4748
this.s3BucketName = other.s3BucketName;
4849
this.largePayloadSupport = other.largePayloadSupport;
4950
this.alwaysThroughS3 = other.alwaysThroughS3;
51+
this.ignorePayloadNotFound = other.ignorePayloadNotFound;
5052
this.messageSizeThreshold = other.messageSizeThreshold;
5153
}
5254

@@ -214,4 +216,43 @@ public ExtendedClientConfiguration withAlwaysThroughS3(boolean alwaysThroughS3)
214216
public boolean isAlwaysThroughS3() {
215217
return alwaysThroughS3;
216218
}
219+
220+
/**
221+
* Sets whether or not messages should be removed from Amazon SQS
222+
* when payloads are not found in Amazon S3.
223+
*
224+
* @param ignorePayloadNotFound
225+
* Whether or not messages should be removed from Amazon SQS
226+
* when payloads are not found in Amazon S3. Default: false
227+
*/
228+
public void setIgnorePayloadNotFound(boolean ignorePayloadNotFound) {
229+
this.ignorePayloadNotFound = ignorePayloadNotFound;
230+
}
231+
232+
/**
233+
* Sets whether or not messages should be removed from Amazon SQS
234+
* when payloads are not found in Amazon S3.
235+
*
236+
* @param ignorePayloadNotFound
237+
* Whether or not messages should be removed from Amazon SQS
238+
* when payloads are not found in Amazon S3. Default: false
239+
* @return the updated ExtendedClientConfiguration object.
240+
*/
241+
public ExtendedClientConfiguration withIgnorePayloadNotFound(boolean ignorePayloadNotFound) {
242+
setIgnorePayloadNotFound(ignorePayloadNotFound);
243+
return this;
244+
}
245+
246+
/**
247+
* Checks whether or not messages should be removed from Amazon SQS
248+
* when payloads are not found in Amazon S3.
249+
*
250+
* @return True if messages should be removed from Amazon SQS
251+
* when payloads are not found in Amazon S3. Default: false
252+
*/
253+
public boolean isIgnorePayloadNotFound() {
254+
return ignorePayloadNotFound;
255+
}
256+
257+
217258
}

src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClientTest.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,19 @@
1717

1818
import java.util.ArrayList;
1919
import java.util.Arrays;
20+
import java.util.Collections;
21+
import java.util.HashMap;
2022
import java.util.List;
2123
import java.util.Map;
2224

25+
import com.amazonaws.AmazonServiceException;
2326
import com.amazonaws.services.s3.AmazonS3;
27+
import com.amazonaws.services.s3.model.GetObjectRequest;
2428
import com.amazonaws.services.s3.model.PutObjectRequest;
2529
import com.amazonaws.services.sqs.AmazonSQS;
2630
import com.amazonaws.services.sqs.AmazonSQSClient;
31+
import com.amazonaws.services.sqs.model.DeleteMessageRequest;
32+
import com.amazonaws.services.sqs.model.Message;
2733
import com.amazonaws.services.sqs.model.MessageAttributeValue;
2834
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
2935
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
@@ -36,7 +42,9 @@
3642
import org.junit.Test;
3743
import org.mockito.ArgumentCaptor;
3844

45+
import static org.mockito.Matchers.any;
3946
import static org.mockito.Matchers.eq;
47+
import static org.mockito.Mockito.doThrow;
4048
import static org.mockito.Mockito.isA;
4149
import static org.mockito.Mockito.mock;
4250
import static org.mockito.Mockito.never;
@@ -224,6 +232,39 @@ public void testWhenLargeMessgaeIsSentThenAttributeWithPayloadSizeIsAdded() {
224232
Assert.assertEquals(messageLength, (int)Integer.valueOf(attributes.get(SQSExtendedClientConstants.RESERVED_ATTRIBUTE_NAME).getStringValue()));
225233
}
226234

235+
@Test
236+
public void testWhenIgnorePayloadNotFoundIsSentThenNotFoundKeysInS3AreDeletedInSQS() throws Exception {
237+
ExtendedClientConfiguration extendedClientConfiguration = new ExtendedClientConfiguration()
238+
.withIgnorePayloadNotFound(true).withLargePayloadSupportEnabled(mockS3, S3_BUCKET_NAME);
239+
240+
AmazonServiceException mockException = mock(AmazonServiceException.class);
241+
when(mockException.getErrorCode()).thenReturn("NoSuchKey");
242+
243+
Message mockMessage = mock(Message.class);
244+
MessageS3Pointer messageS3Pointer = new MessageS3Pointer();
245+
when(mockMessage.getBody()).thenReturn(new JsonDataConverter().serializeToJson(messageS3Pointer));
246+
when(mockMessage.getReceiptHandle()).thenReturn("receipt-handle");
247+
Map<String, MessageAttributeValue> messageAttributes = new HashMap<>();
248+
messageAttributes.put(SQSExtendedClientConstants.RESERVED_ATTRIBUTE_NAME, new MessageAttributeValue());
249+
when(mockMessage.getMessageAttributes()).thenReturn(messageAttributes);
250+
251+
ReceiveMessageResult mockReceiveMessageResult = mock(ReceiveMessageResult.class);
252+
when(mockSqsBackend.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(mockReceiveMessageResult);
253+
when(mockReceiveMessageResult.getMessages()).thenReturn(Collections.singletonList(mockMessage));
254+
255+
doThrow(mockException).when(mockS3).getObject(any(GetObjectRequest.class));
256+
257+
AmazonSQS sqsExtended = spy(new AmazonSQSExtendedClient(mockSqsBackend, extendedClientConfiguration));
258+
259+
sqsExtended.receiveMessage(SQS_QUEUE_URL);
260+
261+
ArgumentCaptor<DeleteMessageRequest> deleteMessageRequestArgumentCaptor = ArgumentCaptor.forClass(DeleteMessageRequest.class);
262+
verify(mockSqsBackend).deleteMessage(deleteMessageRequestArgumentCaptor.capture());
263+
264+
Assert.assertEquals(SQS_QUEUE_URL, deleteMessageRequestArgumentCaptor.getValue().getQueueUrl());
265+
Assert.assertEquals("receipt-handle", deleteMessageRequestArgumentCaptor.getValue().getReceiptHandle());
266+
}
267+
227268
private String generateStringWithLength(int messageLength) {
228269
char[] charArray = new char[messageLength];
229270
Arrays.fill(charArray, 'x');

0 commit comments

Comments
 (0)