Skip to content

Commit 7a852bf

Browse files
committed
ISSUE-30: added ignorePayloadNotFound optional parameter in case you want to get rid of SQS messages that do not have their corresponding payload
1 parent 0208e1a commit 7a852bf

File tree

3 files changed

+108
-1
lines changed

3 files changed

+108
-1
lines changed

src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,7 @@ public ReceiveMessageResult receiveMessage(ReceiveMessageRequest receiveMessageR
349349
ReceiveMessageResult receiveMessageResult = super.receiveMessage(receiveMessageRequest);
350350

351351
List<Message> messages = receiveMessageResult.getMessages();
352+
List<Message> messagesToIgnore = new ArrayList<>();
352353
for (Message message : messages) {
353354

354355
// for each received message check if they are stored in S3.
@@ -357,7 +358,22 @@ public ReceiveMessageResult receiveMessage(ReceiveMessageRequest receiveMessageR
357358
String largeMessagePointer = message.getBody();
358359
largeMessagePointer = largeMessagePointer.replace("com.amazon.sqs.javamessaging.MessageS3Pointer", "software.amazon.payloadoffloading.PayloadS3Pointer");
359360

360-
message.setBody(payloadStore.getOriginalPayload(largeMessagePointer));
361+
final String originalBody;
362+
try {
363+
originalBody = payloadStore.getOriginalPayload(largeMessagePointer);
364+
} catch (AmazonServiceException e) {
365+
boolean isNoSuchKeyException = ((AmazonServiceException) e.getCause()).getErrorCode().equals("NoSuchKey");
366+
if (isNoSuchKeyException && clientConfiguration.ignoresPayloadNotFound()) {
367+
deleteMessage(receiveMessageRequest.getQueueUrl(), message.getReceiptHandle());
368+
LOG.warn("SQS message deleted as it could not be found in S3");
369+
messagesToIgnore.add(message);
370+
continue;
371+
}
372+
throw e;
373+
}
374+
375+
// Replace the large message pointer with the original message body
376+
message.setBody(originalBody);
361377

362378
// remove the additional attribute before returning the message
363379
// to user.
@@ -371,6 +387,7 @@ public ReceiveMessageResult receiveMessage(ReceiveMessageRequest receiveMessageR
371387
message.setReceiptHandle(modifiedReceiptHandle);
372388
}
373389
}
390+
receiveMessageResult.getMessages().removeAll(messagesToIgnore);
374391
return receiveMessageResult;
375392
}
376393

src/main/java/com/amazon/sqs/javamessaging/ExtendedClientConfiguration.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ public class ExtendedClientConfiguration extends PayloadStorageConfiguration {
3131

3232
private boolean cleanupS3Payload = true;
3333
private boolean useLegacyReservedAttributeName = true;
34+
private boolean ignorePayloadNotFound = false;
3435

3536
public ExtendedClientConfiguration() {
3637
super();
@@ -41,6 +42,7 @@ public ExtendedClientConfiguration(ExtendedClientConfiguration other) {
4142
super(other);
4243
this.cleanupS3Payload = other.doesCleanupS3Payload();
4344
this.useLegacyReservedAttributeName = other.usesLegacyReservedAttributeName();
45+
this.ignorePayloadNotFound = other.ignoresPayloadNotFound();
4446
}
4547

4648
/**
@@ -100,6 +102,32 @@ public ExtendedClientConfiguration withLegacyReservedAttributeNameDisabled() {
100102
return this;
101103
}
102104

105+
/**
106+
* Sets whether or not messages should be removed from Amazon SQS
107+
* when payloads are not found in Amazon S3.
108+
*
109+
* @param ignorePayloadNotFound
110+
* Whether or not messages should be removed from Amazon SQS
111+
* when payloads are not found in Amazon S3. Default: false
112+
*/
113+
public void setIgnorePayloadNotFound(boolean ignorePayloadNotFound) {
114+
this.ignorePayloadNotFound = ignorePayloadNotFound;
115+
}
116+
117+
/**
118+
* Sets whether or not messages should be removed from Amazon SQS
119+
* when payloads are not found in Amazon S3.
120+
*
121+
* @param ignorePayloadNotFound
122+
* Whether or not messages should be removed from Amazon SQS
123+
* when payloads are not found in Amazon S3. Default: false
124+
* @return the updated ExtendedClientConfiguration object.
125+
*/
126+
public ExtendedClientConfiguration withIgnorePayloadNotFound(boolean ignorePayloadNotFound) {
127+
setIgnorePayloadNotFound(ignorePayloadNotFound);
128+
return this;
129+
}
130+
103131
/**
104132
* Checks whether or not clean up large objects in S3 is enabled.
105133
*
@@ -121,6 +149,17 @@ public boolean usesLegacyReservedAttributeName() {
121149
return useLegacyReservedAttributeName;
122150
}
123151

152+
/**
153+
* Checks whether or not messages should be removed from Amazon SQS
154+
* when payloads are not found in Amazon S3.
155+
*
156+
* @return True if messages should be removed from Amazon SQS
157+
* when payloads are not found in Amazon S3. Default: false
158+
*/
159+
public boolean ignoresPayloadNotFound() {
160+
return ignorePayloadNotFound;
161+
}
162+
124163
@Override
125164
public ExtendedClientConfiguration withAlwaysThroughS3(boolean alwaysThroughS3) {
126165
setAlwaysThroughS3(alwaysThroughS3);

src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClientTest.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.stream.Collectors;
2020
import java.util.stream.IntStream;
2121

22+
import com.amazonaws.AmazonServiceException;
2223
import com.amazonaws.services.s3.AmazonS3;
2324
import com.amazonaws.services.s3.model.*;
2425
import com.amazonaws.services.sqs.AmazonSQS;
@@ -33,6 +34,7 @@
3334
import software.amazon.payloadoffloading.PayloadS3Pointer;
3435

3536
import static org.mockito.Matchers.eq;
37+
import static org.mockito.Mockito.doThrow;
3638
import static org.mockito.Mockito.mock;
3739
import static org.mockito.Mockito.isA;
3840
import static org.mockito.Mockito.when;
@@ -407,6 +409,55 @@ public void testWhenLargeMessageIsSentThenAttributeWithPayloadSizeIsAdded() {
407409
Assert.assertEquals(messageLength, (int) Integer.valueOf(attributes.get(AmazonSQSExtendedClient.LEGACY_RESERVED_ATTRIBUTE_NAME).getStringValue()));
408410
}
409411

412+
@Test
413+
public void testWhenIgnorePayloadNotFoundIsSentThenNotFoundKeysInS3AreDeletedInSQS() {
414+
ExtendedClientConfiguration extendedClientConfiguration = new ExtendedClientConfiguration()
415+
.withPayloadSupportEnabled(mockS3, S3_BUCKET_NAME).withIgnorePayloadNotFound(true);
416+
417+
AmazonServiceException mockException = mock(AmazonServiceException.class);
418+
when(mockException.getErrorCode()).thenReturn("NoSuchKey");
419+
420+
Message message = new Message().addMessageAttributesEntry(SQSExtendedClientConstants.RESERVED_ATTRIBUTE_NAME, mock(MessageAttributeValue.class));
421+
String pointer = new PayloadS3Pointer(S3_BUCKET_NAME, "S3Key").toJson();
422+
message.setBody(pointer);
423+
message.setReceiptHandle("receipt-handle");
424+
425+
when(mockSqsBackend.receiveMessage(isA(ReceiveMessageRequest.class))).thenReturn(new ReceiveMessageResult().withMessages(message));
426+
doThrow(mockException).when(mockS3).getObject(any(GetObjectRequest.class));
427+
428+
AmazonSQS sqsExtended = spy(new AmazonSQSExtendedClient(mockSqsBackend, extendedClientConfiguration));
429+
ReceiveMessageRequest messageRequest = new ReceiveMessageRequest().withQueueUrl(SQS_QUEUE_URL);
430+
ReceiveMessageResult actualReceiveMessageResult = sqsExtended.receiveMessage(messageRequest);
431+
Assert.assertTrue(actualReceiveMessageResult.getMessages().isEmpty());
432+
433+
ArgumentCaptor<DeleteMessageRequest> deleteMessageRequestArgumentCaptor = ArgumentCaptor.forClass(DeleteMessageRequest.class);
434+
verify(mockSqsBackend).deleteMessage(deleteMessageRequestArgumentCaptor.capture());
435+
Assert.assertEquals(SQS_QUEUE_URL, deleteMessageRequestArgumentCaptor.getValue().getQueueUrl());
436+
Assert.assertEquals("receipt-handle", deleteMessageRequestArgumentCaptor.getValue().getReceiptHandle());
437+
}
438+
439+
@Test
440+
public void testWhenIgnorePayloadNotFoundIsNotSentThenNotFoundKeysInS3AreNotDeletedInSQS() {
441+
AmazonServiceException mockException = mock(AmazonServiceException.class);
442+
when(mockException.getErrorCode()).thenReturn("NoSuchKey");
443+
444+
Message message = new Message().addMessageAttributesEntry(SQSExtendedClientConstants.RESERVED_ATTRIBUTE_NAME, mock(MessageAttributeValue.class));
445+
String pointer = new PayloadS3Pointer(S3_BUCKET_NAME, "S3Key").toJson();
446+
message.setBody(pointer);
447+
448+
when(mockSqsBackend.receiveMessage(isA(ReceiveMessageRequest.class))).thenReturn(new ReceiveMessageResult().withMessages(message));
449+
450+
doThrow(mockException).when(mockS3).getObject(any(GetObjectRequest.class));
451+
ReceiveMessageRequest messageRequest = new ReceiveMessageRequest();
452+
453+
try {
454+
extendedSqsWithDefaultConfig.receiveMessage(messageRequest);
455+
Assert.fail("exception should have been thrown");
456+
} catch (AmazonServiceException e) {
457+
verify(mockSqsBackend, never()).deleteMessage(any(DeleteMessageRequest.class));
458+
}
459+
}
460+
410461
@Test
411462
public void testDefaultExtendedClientDeletesSmallMessage() {
412463
// given

0 commit comments

Comments
 (0)