diff --git a/.gitignore b/.gitignore index 2f7896d..ffac74e 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ target/ +*.iml diff --git a/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java b/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java index 737888a..8e8dc03 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java +++ b/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java @@ -551,7 +551,8 @@ public DeleteMessageResult deleteMessage(DeleteMessageRequest deleteMessageReque String receiptHandle = deleteMessageRequest.getReceiptHandle(); String origReceiptHandle = receiptHandle; if (isS3ReceiptHandle(receiptHandle)) { - deleteMessagePayloadFromS3(receiptHandle); + if (!clientConfiguration.isRetainS3Messages()) + deleteMessagePayloadFromS3(receiptHandle); origReceiptHandle = getOrigReceiptHandle(receiptHandle); } deleteMessageRequest.setReceiptHandle(origReceiptHandle); @@ -913,7 +914,8 @@ public DeleteMessageBatchResult deleteMessageBatch(DeleteMessageBatchRequest del String receiptHandle = entry.getReceiptHandle(); String origReceiptHandle = receiptHandle; if (isS3ReceiptHandle(receiptHandle)) { - deleteMessagePayloadFromS3(receiptHandle); + if (!clientConfiguration.isRetainS3Messages()) + deleteMessagePayloadFromS3(receiptHandle); origReceiptHandle = getOrigReceiptHandle(receiptHandle); } entry.setReceiptHandle(origReceiptHandle); @@ -1250,7 +1252,7 @@ private SendMessageBatchRequestEntry storeMessageInS3(SendMessageBatchRequestEnt checkMessageAttributes(batchEntry.getMessageAttributes()); - String s3Key = UUID.randomUUID().toString(); + String s3Key = clientConfiguration.getS3KeyGenerator().generateObjectKey(batchEntry); // Read the content of the message from message body String messageContentStr = batchEntry.getMessageBody(); @@ -1283,7 +1285,7 @@ private SendMessageRequest storeMessageInS3(SendMessageRequest sendMessageReques checkMessageAttributes(sendMessageRequest.getMessageAttributes()); - String s3Key = UUID.randomUUID().toString(); + String s3Key = clientConfiguration.getS3KeyGenerator().generateObjectKey(sendMessageRequest); // Read the content of the message from message body String messageContentStr = sendMessageRequest.getMessageBody(); diff --git a/src/main/java/com/amazon/sqs/javamessaging/ExtendedClientConfiguration.java b/src/main/java/com/amazon/sqs/javamessaging/ExtendedClientConfiguration.java index c88dd28..19dd9db 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/ExtendedClientConfiguration.java +++ b/src/main/java/com/amazon/sqs/javamessaging/ExtendedClientConfiguration.java @@ -17,11 +17,14 @@ import com.amazonaws.AmazonClientException; import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry; +import com.amazonaws.services.sqs.model.SendMessageRequest; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.http.annotation.NotThreadSafe; import java.util.List; +import java.util.UUID; /** * Amazon SQS extended client configuration options such as Amazon S3 client, @@ -35,7 +38,12 @@ public class ExtendedClientConfiguration { private String s3BucketName; private boolean largePayloadSupport = false; private boolean alwaysThroughS3 = false; + private boolean retainS3Messages = false; private int messageSizeThreshold = SQSExtendedClientConstants.DEFAULT_MESSAGE_SIZE_THRESHOLD; + private S3KeyGenerator s3KeyGenerator = new S3KeyGenerator() { + public String generateObjectKey(SendMessageRequest sendMessageRequest) { return UUID.randomUUID().toString();} + public String generateObjectKey(SendMessageBatchRequestEntry batchEntry) { return UUID.randomUUID().toString();} + }; public ExtendedClientConfiguration() { s3 = null; @@ -47,7 +55,9 @@ public ExtendedClientConfiguration(ExtendedClientConfiguration other) { this.s3BucketName = other.s3BucketName; this.largePayloadSupport = other.largePayloadSupport; this.alwaysThroughS3 = other.alwaysThroughS3; + this.retainS3Messages = other.retainS3Messages; this.messageSizeThreshold = other.messageSizeThreshold; + this.s3KeyGenerator = other.s3KeyGenerator; } /** @@ -214,4 +224,72 @@ public ExtendedClientConfiguration withAlwaysThroughS3(boolean alwaysThroughS3) public boolean isAlwaysThroughS3() { return alwaysThroughS3; } + + /** + * Sets whether or not messages are deleted in S3 when they are delete from + * the queue. + * + * @param retainS3Messages + * Whether or not messages are deleted in S3 when they are delete + * from the queue. Default: false + */ + public void setRetainS3Messages(boolean retainS3Messages) { + this.retainS3Messages = retainS3Messages; + } + + /** + * Sets whether or not messages are deleted in S3 when they are delete from + * the queue. + * + * @param retainS3Messages + * Whether or not messages are deleted in S3 when they are delete + * from the queue. Default: false + * @return the updated ExtendedClientConfiguration object. + */ + public ExtendedClientConfiguration withRetainS3Messages(boolean retainS3Messages) { + setRetainS3Messages(retainS3Messages); + return this; + } + + /** + * Checks whether or not messages are deleted in S3 when they are delete from + * the queue. + * + * @return True if messages are delete when they are deleted from the queue. + * Default: false + */ + public boolean isRetainS3Messages() { + return retainS3Messages; + } + + /** + * Get the S3KeyGenerator used to generate the S3 object keys for new messages. + * + * @return the S3KeyGenerator used to generate the S3 object keys for new messages. + */ + public S3KeyGenerator getS3KeyGenerator() { + return s3KeyGenerator; + } + + /** + * Set the S3KeyGenerator used to generate the S3 object keys for new messages. + * + * @param s3KeyGenerator the S3KeyGenerator used to generate the S3 object keys for + * new messages. + */ + public void setS3KeyGenerator(S3KeyGenerator s3KeyGenerator) { + this.s3KeyGenerator = s3KeyGenerator; + } + + /** + * Set the S3KeyGenerator used to generate the S3 object keys for new messages. + * + * @param s3KeyGenerator the S3KeyGenerator used to generate the S3 object keys for + * new messages. + * @return the updated ExtendedClientConfiguration object. + */ + public ExtendedClientConfiguration withS3KeyGenerator(S3KeyGenerator s3KeyGenerator) { + setS3KeyGenerator(s3KeyGenerator); + return this; + } } diff --git a/src/main/java/com/amazon/sqs/javamessaging/S3KeyGenerator.java b/src/main/java/com/amazon/sqs/javamessaging/S3KeyGenerator.java new file mode 100644 index 0000000..717062f --- /dev/null +++ b/src/main/java/com/amazon/sqs/javamessaging/S3KeyGenerator.java @@ -0,0 +1,45 @@ +/* + * Copyright 2010-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.sqs.javamessaging; + +import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry; +import com.amazonaws.services.sqs.model.SendMessageRequest; + +/** + * Defines the contract of a S3 object key generator to use when persisting + * SQS messages to an S3 bucket. + * + * Your implementation must always return a unique key, even if it is passed + * the same SendMessageRequest or SendMessageBatchRequestEntry instance. + */ +public interface S3KeyGenerator { + /** + * Method to generate a unique S3 object key from a SendMessageRequest instance. + * + * @param sendMessageRequest the request object for the new message. + * @return a unique S3 object key. + */ + String generateObjectKey(SendMessageRequest sendMessageRequest); + + /** + * Method to generate a unique S3 object key from a SendMessageBatchRequestEntry + * instance. + * + * @param batchEntry the batch request object for the new message. + * @return a unique S3 object key. + */ + String generateObjectKey(SendMessageBatchRequestEntry batchEntry); +} diff --git a/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClientIntegrationTest.java b/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClientIntegrationTest.java new file mode 100644 index 0000000..85cb260 --- /dev/null +++ b/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClientIntegrationTest.java @@ -0,0 +1,146 @@ +/* + * Copyright 2010-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.sqs.javamessaging; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.AmazonSQSClient; +import com.amazonaws.services.sqs.model.*; +import org.junit.Before; +import org.junit.Test; + +import java.text.SimpleDateFormat; +import java.util.*; + + +/** + * Tests the AmazonSQSExtendedClient class. + */ +public class AmazonSQSExtendedClientIntegrationTest { + + private AmazonSQSClient client; + private AmazonSQS sqs; + private AmazonS3 s3; + private static final AWSCredentials AWS_CREDS = new BasicAWSCredentials("[YOUR_AWS_KEY]", "[YOUR_AWS_SECRET]"); + private static final String S3_BUCKET_NAME = "[YOUR_EXISTING_S3_BUCKET_NAME]"; + private static final String SQS_QUEUE_URL = "[YOUR_EXISTING_SQS_QUEUE_NAME]"; + private static final int SQS_SIZE_LIMIT = 262144; + + /** + * A simple S3KeyGenerator implementation that pre-pends the current date in front of the unique S3 key name. + */ + class DateKeyGenerator implements S3KeyGenerator { + SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd"); + + @Override + public String generateObjectKey(SendMessageRequest sendMessageRequest) { + return formatter.format(Calendar.getInstance().getTime()) + "/" + UUID.randomUUID().toString(); + } + + @Override + public String generateObjectKey(SendMessageBatchRequestEntry batchEntry) { + return formatter.format(Calendar.getInstance().getTime()) + "/" + UUID.randomUUID().toString(); + } + } + + /** + * A S3KeyGenerator implementation that uses the message meta data to construct a unique S3 key name. + */ + class MetaBasedKeyGenerator implements S3KeyGenerator { + + private String metaKey; + + public MetaBasedKeyGenerator(String metaKey) { + this.metaKey = metaKey; + } + + private String getPrefix(Map attribsMap) { + if (attribsMap.containsKey(metaKey)) + return attribsMap.get(metaKey).getStringValue(); + else + return "NOT_SPECIFIED"; + } + + @Override + public String generateObjectKey(SendMessageRequest sendMessageRequest) { + return getPrefix(sendMessageRequest.getMessageAttributes()) + "/" + UUID.randomUUID().toString(); + } + + @Override + public String generateObjectKey(SendMessageBatchRequestEntry batchEntry) { + return getPrefix(batchEntry.getMessageAttributes()) + "/" + UUID.randomUUID().toString(); + } + } + + static final String EVENT_TYPE = "EVENT_TYPE"; + + @Before + public void setupClient() { + + s3 = new AmazonS3Client(AWS_CREDS); + s3.setEndpoint("s3-eu-west-1.amazonaws.com"); + + ExtendedClientConfiguration extendedClientConfiguration = new ExtendedClientConfiguration() + .withLargePayloadSupportEnabled(s3, S3_BUCKET_NAME) + //.withS3KeyGenerator(new DateKeyGenerator()); + .withS3KeyGenerator(new MetaBasedKeyGenerator(EVENT_TYPE)) + .withRetainS3Messages(true); + + client = new AmazonSQSClient(AWS_CREDS); //mock(AmazonSQSClient.class) + sqs = new AmazonSQSExtendedClient(client, extendedClientConfiguration); + } + + /* UNCOMMENT THIS TEST ONCE YOU HAVE FILLED IN YOUR AWS DETAILS IN THE STATIC VARS ABOVE + + @Test + public void exercise() { + + int messageLength = SQS_SIZE_LIMIT + 1; + String messageBody = generateString(messageLength); + + MessageAttributeValue messageAttributeValue = new MessageAttributeValue(); + messageAttributeValue.setDataType("String"); + messageAttributeValue.setStringValue("Payment_Success"); + + Map metaData = new HashMap(); + metaData.put(EVENT_TYPE, messageAttributeValue); + + SendMessageRequest messageRequest = new SendMessageRequest(SQS_QUEUE_URL, messageBody) + .withMessageAttributes(metaData); + + SendMessageResult sendResult = sqs.sendMessage(messageRequest); + System.out.println(sendResult); + + ReceiveMessageRequest requestMsg = new ReceiveMessageRequest(SQS_QUEUE_URL).withMaxNumberOfMessages(1); + + ReceiveMessageResult receiveResult = sqs.receiveMessage(requestMsg); + //System.out.println(receiveResult); + + DeleteMessageRequest deleteRequest = new DeleteMessageRequest(SQS_QUEUE_URL, receiveResult.getMessages().get(0).getReceiptHandle()); + + sqs.deleteMessage(deleteRequest); + }*/ + + + private String generateString(int messageLength) { + char[] charArray = new char[messageLength]; + Arrays.fill(charArray, 'x'); + return new String(charArray); + } +} diff --git a/src/test/java/com/amazon/sqs/javamessaging/ExtendedClientConfigurationTest.java b/src/test/java/com/amazon/sqs/javamessaging/ExtendedClientConfigurationTest.java index d6c1af3..343831a 100644 --- a/src/test/java/com/amazon/sqs/javamessaging/ExtendedClientConfigurationTest.java +++ b/src/test/java/com/amazon/sqs/javamessaging/ExtendedClientConfigurationTest.java @@ -17,9 +17,14 @@ import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry; +import com.amazonaws.services.sqs.model.SendMessageRequest; import junit.framework.Assert; import org.junit.Before; import org.junit.Test; + +import java.util.UUID; + import static org.mockito.Matchers.isA; import static org.mockito.Mockito.*; @@ -42,12 +47,19 @@ public void testCopyConstructor() { when(s3.putObject(isA(PutObjectRequest.class))).thenReturn(null); boolean alwaysThroughS3 = true; + boolean retainS3Messages = true; int messageSizeThreshold = 500; ExtendedClientConfiguration extendedClientConfig = new ExtendedClientConfiguration(); + S3KeyGenerator s3KeyGenerator = new S3KeyGenerator() { + public String generateObjectKey(SendMessageRequest sendMessageRequest) { return UUID.randomUUID().toString();} + public String generateObjectKey(SendMessageBatchRequestEntry batchEntry) { return UUID.randomUUID().toString();} + }; + extendedClientConfig.withLargePayloadSupportEnabled(s3, s3BucketName) - .withAlwaysThroughS3(alwaysThroughS3).withMessageSizeThreshold(messageSizeThreshold); + .withAlwaysThroughS3(alwaysThroughS3).withMessageSizeThreshold(messageSizeThreshold) + .withRetainS3Messages(retainS3Messages).withS3KeyGenerator(s3KeyGenerator); ExtendedClientConfiguration newExtendedClientConfig = new ExtendedClientConfiguration(extendedClientConfig); @@ -55,7 +67,9 @@ public void testCopyConstructor() { Assert.assertEquals(s3BucketName, newExtendedClientConfig.getS3BucketName()); Assert.assertTrue(newExtendedClientConfig.isLargePayloadSupportEnabled()); Assert.assertEquals(alwaysThroughS3, newExtendedClientConfig.isAlwaysThroughS3()); + Assert.assertEquals(retainS3Messages, newExtendedClientConfig.isRetainS3Messages()); Assert.assertEquals(messageSizeThreshold, newExtendedClientConfig.getMessageSizeThreshold()); + Assert.assertEquals(s3KeyGenerator, newExtendedClientConfig.getS3KeyGenerator()); Assert.assertNotSame(newExtendedClientConfig, extendedClientConfig); }