From e798aa266d9884e331eb853e47178c10e9e7bd06 Mon Sep 17 00:00:00 2001 From: Gabija Balvociute Date: Thu, 15 Sep 2022 19:30:55 -0700 Subject: [PATCH] Delete SQS messages when S3 payload not found --- .../AmazonSQSExtendedClient.java | 16 +++++- .../AmazonSQSExtendedClientTest.java | 57 +++++++++++++++++++ 2 files changed, 72 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java b/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java index 7520706..8f1e733 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java +++ b/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java @@ -33,6 +33,7 @@ import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.core.util.VersionInfo; +import software.amazon.awssdk.services.s3.model.NoSuchKeyException; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.BatchEntryIdsNotDistinctException; @@ -339,7 +340,20 @@ public ReceiveMessageResponse receiveMessage(ReceiveMessageRequest receiveMessag String largeMessagePointer = message.body(); largeMessagePointer = largeMessagePointer.replace("com.amazon.sqs.javamessaging.MessageS3Pointer", "software.amazon.payloadoffloading.PayloadS3Pointer"); - messageBuilder.body(payloadStore.getOriginalPayload(largeMessagePointer)); + try { + messageBuilder.body(payloadStore.getOriginalPayload(largeMessagePointer)); + } catch (SdkException e) { + if (e.getCause() instanceof NoSuchKeyException && clientConfiguration.ignoresPayloadNotFound()) { + DeleteMessageRequest deleteMessageRequest = DeleteMessageRequest + .builder() + .queueUrl(receiveMessageRequest.queueUrl()) + .receiptHandle(message.receiptHandle()) + .build(); + deleteMessage(deleteMessageRequest); + LOG.warn("Message deleted from SQS since payload with pointer could not be found in S3."); + continue; + } else throw e; + } // remove the additional attribute before returning the message // to user. diff --git a/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClientTest.java b/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClientTest.java index d2e7798..07925a0 100644 --- a/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClientTest.java +++ b/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClientTest.java @@ -31,12 +31,14 @@ import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration; import software.amazon.awssdk.core.ApiName; import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.http.AbortableInputStream; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.NoSuchKeyException; import software.amazon.awssdk.services.s3.model.ObjectCannedACL; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.sqs.SqsClient; @@ -60,6 +62,7 @@ import static com.amazon.sqs.javamessaging.AmazonSQSExtendedClient.USER_AGENT_VERSION; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doThrow; @@ -641,6 +644,60 @@ private void testReceiveMessage_when_MessageIsLarge(String reservedAttributeName verify(mockS3, times(1)).getObject(isA(GetObjectRequest.class)); } + @Test + public void testReceiveMessage_when_ignorePayloadNotFound_then_messageWithPayloadNotFoundIsDeletedFromSQS() { + ExtendedClientConfiguration extendedClientConfiguration = new ExtendedClientConfiguration() + .withPayloadSupportEnabled(mockS3, S3_BUCKET_NAME) + .withIgnorePayloadNotFound(true); + SqsClient sqsExtended = spy(new AmazonSQSExtendedClient(mockSqsBackend, extendedClientConfiguration)); + + String receiptHandle = "receipt-handle"; + Message message = Message.builder() + .messageAttributes(ImmutableMap.of(SQSExtendedClientConstants.RESERVED_ATTRIBUTE_NAME, MessageAttributeValue.builder().build())) + .body(new PayloadS3Pointer(S3_BUCKET_NAME, "S3Key").toJson()) + .receiptHandle(receiptHandle) + .build(); + + when(mockSqsBackend.receiveMessage(isA(ReceiveMessageRequest.class))).thenReturn(ReceiveMessageResponse.builder().messages(message).build()); + doThrow(NoSuchKeyException.class).when(mockS3).getObject(any(GetObjectRequest.class)); + + ReceiveMessageRequest messageRequest = ReceiveMessageRequest.builder().queueUrl(SQS_QUEUE_URL).build(); + ReceiveMessageResponse receiveMessageResponse = sqsExtended.receiveMessage(messageRequest); + + Assert.assertTrue(receiveMessageResponse.messages().isEmpty()); + + ArgumentCaptor deleteMessageRequestArgumentCaptor = ArgumentCaptor.forClass(DeleteMessageRequest.class); + verify(mockSqsBackend).deleteMessage(deleteMessageRequestArgumentCaptor.capture()); + Assert.assertEquals(SQS_QUEUE_URL, deleteMessageRequestArgumentCaptor.getValue().queueUrl()); + Assert.assertEquals(receiptHandle, deleteMessageRequestArgumentCaptor.getValue().receiptHandle()); + } + + @Test + public void testReceiveMessage_when_ignorePayloadNotFoundIsFalse_then_messageWithPayloadNotFoundThrowsException() { + ExtendedClientConfiguration extendedClientConfiguration = new ExtendedClientConfiguration() + .withPayloadSupportEnabled(mockS3, S3_BUCKET_NAME) + .withIgnorePayloadNotFound(false); + SqsClient sqsExtended = spy(new AmazonSQSExtendedClient(mockSqsBackend, extendedClientConfiguration)); + + Message message = Message.builder() + .messageAttributes(ImmutableMap.of(SQSExtendedClientConstants.RESERVED_ATTRIBUTE_NAME, MessageAttributeValue.builder().build())) + .body(new PayloadS3Pointer(S3_BUCKET_NAME, "S3Key").toJson()) + .receiptHandle("receipt-handle") + .build(); + + when(mockSqsBackend.receiveMessage(isA(ReceiveMessageRequest.class))).thenReturn(ReceiveMessageResponse.builder().messages(message).build()); + doThrow(NoSuchKeyException.class).when(mockS3).getObject(any(GetObjectRequest.class)); + + ReceiveMessageRequest messageRequest = ReceiveMessageRequest.builder().build(); + try { + sqsExtended.receiveMessage(messageRequest); + fail("Expected exception after receiving NoSuchKeyException from S3 was not thrown."); + } catch (SdkException e) { + assertEquals(NoSuchKeyException.class.getName(), e.getCause().getClass().getName()); + verify(mockSqsBackend, never()).deleteMessage(any(DeleteMessageRequest.class)); + } + } + private DeleteMessageBatchRequest generateLargeDeleteBatchRequest(List originalReceiptHandles) { List deleteEntries = IntStream.range(0, originalReceiptHandles.size()) .mapToObj(i -> DeleteMessageBatchRequestEntry.builder()