Skip to content

Commit b458ea0

Browse files
Delete SQS messages when S3 payload not found (#78)
Co-authored-by: Mit Suthar <[email protected]>
1 parent 07d988c commit b458ea0

File tree

2 files changed

+72
-1
lines changed

2 files changed

+72
-1
lines changed

src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import software.amazon.awssdk.core.exception.SdkClientException;
3939
import software.amazon.awssdk.core.exception.SdkException;
4040
import software.amazon.awssdk.core.util.VersionInfo;
41+
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
4142
import software.amazon.awssdk.services.sqs.SqsClient;
4243
import software.amazon.awssdk.services.sqs.model.BatchEntryIdsNotDistinctException;
4344
import software.amazon.awssdk.services.sqs.model.BatchRequestTooLongException;
@@ -340,7 +341,20 @@ public ReceiveMessageResponse receiveMessage(ReceiveMessageRequest receiveMessag
340341
String largeMessagePointer = message.body();
341342
largeMessagePointer = largeMessagePointer.replace("com.amazon.sqs.javamessaging.MessageS3Pointer", "software.amazon.payloadoffloading.PayloadS3Pointer");
342343

343-
messageBuilder.body(payloadStore.getOriginalPayload(largeMessagePointer));
344+
try {
345+
messageBuilder.body(payloadStore.getOriginalPayload(largeMessagePointer));
346+
} catch (SdkException e) {
347+
if (e.getCause() instanceof NoSuchKeyException && clientConfiguration.ignoresPayloadNotFound()) {
348+
DeleteMessageRequest deleteMessageRequest = DeleteMessageRequest
349+
.builder()
350+
.queueUrl(receiveMessageRequest.queueUrl())
351+
.receiptHandle(message.receiptHandle())
352+
.build();
353+
deleteMessage(deleteMessageRequest);
354+
LOG.warn("Message deleted from SQS since payload with pointer could not be found in S3.");
355+
continue;
356+
} else throw e;
357+
}
344358

345359
// remove the additional attribute before returning the message
346360
// to user.

src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClientTest.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,14 @@
2525
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
2626
import software.amazon.awssdk.core.ApiName;
2727
import software.amazon.awssdk.core.ResponseInputStream;
28+
import software.amazon.awssdk.core.exception.SdkException;
2829
import software.amazon.awssdk.core.sync.RequestBody;
2930
import software.amazon.awssdk.http.AbortableInputStream;
3031
import software.amazon.awssdk.services.s3.S3Client;
3132
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
3233
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
3334
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
35+
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
3436
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
3537
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
3638
import software.amazon.awssdk.services.sqs.SqsClient;
@@ -59,6 +61,7 @@
5961

6062
import static com.amazon.sqs.javamessaging.AmazonSQSExtendedClient.USER_AGENT_NAME;
6163
import static com.amazon.sqs.javamessaging.AmazonSQSExtendedClient.USER_AGENT_VERSION;
64+
6265
import static org.junit.jupiter.api.Assertions.assertEquals;
6366
import static org.junit.jupiter.api.Assertions.assertFalse;
6467
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -696,6 +699,60 @@ private void testReceiveMessage_when_MessageIsLarge(String reservedAttributeName
696699
verify(mockS3, times(1)).getObject(isA(GetObjectRequest.class));
697700
}
698701

702+
@Test
703+
public void testReceiveMessage_when_ignorePayloadNotFound_then_messageWithPayloadNotFoundIsDeletedFromSQS() {
704+
ExtendedClientConfiguration extendedClientConfiguration = new ExtendedClientConfiguration()
705+
.withPayloadSupportEnabled(mockS3, S3_BUCKET_NAME)
706+
.withIgnorePayloadNotFound(true);
707+
SqsClient sqsExtended = spy(new AmazonSQSExtendedClient(mockSqsBackend, extendedClientConfiguration));
708+
709+
String receiptHandle = "receipt-handle";
710+
Message message = Message.builder()
711+
.messageAttributes(ImmutableMap.of(SQSExtendedClientConstants.RESERVED_ATTRIBUTE_NAME, MessageAttributeValue.builder().build()))
712+
.body(new PayloadS3Pointer(S3_BUCKET_NAME, "S3Key").toJson())
713+
.receiptHandle(receiptHandle)
714+
.build();
715+
716+
when(mockSqsBackend.receiveMessage(isA(ReceiveMessageRequest.class))).thenReturn(ReceiveMessageResponse.builder().messages(message).build());
717+
doThrow(NoSuchKeyException.class).when(mockS3).getObject(any(GetObjectRequest.class));
718+
719+
ReceiveMessageRequest messageRequest = ReceiveMessageRequest.builder().queueUrl(SQS_QUEUE_URL).build();
720+
ReceiveMessageResponse receiveMessageResponse = sqsExtended.receiveMessage(messageRequest);
721+
722+
Assert.assertTrue(receiveMessageResponse.messages().isEmpty());
723+
724+
ArgumentCaptor<DeleteMessageRequest> deleteMessageRequestArgumentCaptor = ArgumentCaptor.forClass(DeleteMessageRequest.class);
725+
verify(mockSqsBackend).deleteMessage(deleteMessageRequestArgumentCaptor.capture());
726+
Assert.assertEquals(SQS_QUEUE_URL, deleteMessageRequestArgumentCaptor.getValue().queueUrl());
727+
Assert.assertEquals(receiptHandle, deleteMessageRequestArgumentCaptor.getValue().receiptHandle());
728+
}
729+
730+
@Test
731+
public void testReceiveMessage_when_ignorePayloadNotFoundIsFalse_then_messageWithPayloadNotFoundThrowsException() {
732+
ExtendedClientConfiguration extendedClientConfiguration = new ExtendedClientConfiguration()
733+
.withPayloadSupportEnabled(mockS3, S3_BUCKET_NAME)
734+
.withIgnorePayloadNotFound(false);
735+
SqsClient sqsExtended = spy(new AmazonSQSExtendedClient(mockSqsBackend, extendedClientConfiguration));
736+
737+
Message message = Message.builder()
738+
.messageAttributes(ImmutableMap.of(SQSExtendedClientConstants.RESERVED_ATTRIBUTE_NAME, MessageAttributeValue.builder().build()))
739+
.body(new PayloadS3Pointer(S3_BUCKET_NAME, "S3Key").toJson())
740+
.receiptHandle("receipt-handle")
741+
.build();
742+
743+
when(mockSqsBackend.receiveMessage(isA(ReceiveMessageRequest.class))).thenReturn(ReceiveMessageResponse.builder().messages(message).build());
744+
doThrow(NoSuchKeyException.class).when(mockS3).getObject(any(GetObjectRequest.class));
745+
746+
ReceiveMessageRequest messageRequest = ReceiveMessageRequest.builder().build();
747+
try {
748+
sqsExtended.receiveMessage(messageRequest);
749+
fail("Expected exception after receiving NoSuchKeyException from S3 was not thrown.");
750+
} catch (SdkException e) {
751+
assertEquals(NoSuchKeyException.class.getName(), e.getCause().getClass().getName());
752+
verify(mockSqsBackend, never()).deleteMessage(any(DeleteMessageRequest.class));
753+
}
754+
}
755+
699756
private DeleteMessageBatchRequest generateLargeDeleteBatchRequest(List<String> originalReceiptHandles) {
700757
List<DeleteMessageBatchRequestEntry> deleteEntries = IntStream.range(0, originalReceiptHandles.size())
701758
.mapToObj(i -> DeleteMessageBatchRequestEntry.builder()

0 commit comments

Comments
 (0)