Skip to content

Commit eb75f07

Browse files
committed
Adding ExtendedClientConfiguration parameter 'retainS3Messages' that when set to 'true' prevents the companion S3 object of a message being removed from S3 on the sqs 'deleteMessage()' action. The S3 bucket will then retain a log of all messages (if the 'alwaysThroughS3' is also set to true) after they have been processed and deleted.
Also added the S3KeyGenerator interface and a default implementation to the ExtendedClientConfiguration class. The S3KeyGenerator interface allows custom implementations to add structure to the S3 bucket objects create (such as partionting on date created).
1 parent 821fe23 commit eb75f07

File tree

6 files changed

+291
-5
lines changed

6 files changed

+291
-5
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
target/
2+
*.iml

src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -540,7 +540,8 @@ public void deleteMessage(DeleteMessageRequest deleteMessageRequest) {
540540
String receiptHandle = deleteMessageRequest.getReceiptHandle();
541541
String origReceiptHandle = receiptHandle;
542542
if (isS3ReceiptHandle(receiptHandle)) {
543-
deleteMessagePayloadFromS3(receiptHandle);
543+
if (!clientConfiguration.isRetainS3Messages())
544+
deleteMessagePayloadFromS3(receiptHandle);
544545
origReceiptHandle = getOrigReceiptHandle(receiptHandle);
545546
}
546547
deleteMessageRequest.setReceiptHandle(origReceiptHandle);
@@ -817,7 +818,8 @@ public DeleteMessageBatchResult deleteMessageBatch(DeleteMessageBatchRequest del
817818
String receiptHandle = entry.getReceiptHandle();
818819
String origReceiptHandle = receiptHandle;
819820
if (isS3ReceiptHandle(receiptHandle)) {
820-
deleteMessagePayloadFromS3(receiptHandle);
821+
if (!clientConfiguration.isRetainS3Messages())
822+
deleteMessagePayloadFromS3(receiptHandle);
821823
origReceiptHandle = getOrigReceiptHandle(receiptHandle);
822824
}
823825
entry.setReceiptHandle(origReceiptHandle);
@@ -1084,7 +1086,7 @@ private SendMessageBatchRequestEntry storeMessageInS3(SendMessageBatchRequestEnt
10841086

10851087
checkMessageAttributes(batchEntry.getMessageAttributes());
10861088

1087-
String s3Key = UUID.randomUUID().toString();
1089+
String s3Key = clientConfiguration.getS3KeyGenerator().generateObjectKey(batchEntry);
10881090

10891091
// Read the content of the message from message body
10901092
String messageContentStr = batchEntry.getMessageBody();
@@ -1117,7 +1119,7 @@ private SendMessageRequest storeMessageInS3(SendMessageRequest sendMessageReques
11171119

11181120
checkMessageAttributes(sendMessageRequest.getMessageAttributes());
11191121

1120-
String s3Key = UUID.randomUUID().toString();
1122+
String s3Key = clientConfiguration.getS3KeyGenerator().generateObjectKey(sendMessageRequest);
11211123

11221124
// Read the content of the message from message body
11231125
String messageContentStr = sendMessageRequest.getMessageBody();

src/main/java/com/amazon/sqs/javamessaging/ExtendedClientConfiguration.java

+78
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,14 @@
1717

1818
import com.amazonaws.AmazonClientException;
1919
import com.amazonaws.services.s3.AmazonS3;
20+
import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
21+
import com.amazonaws.services.sqs.model.SendMessageRequest;
2022
import org.apache.commons.logging.Log;
2123
import org.apache.commons.logging.LogFactory;
2224
import org.apache.http.annotation.NotThreadSafe;
2325

2426
import java.util.List;
27+
import java.util.UUID;
2528

2629
/**
2730
* Amazon SQS extended client configuration options such as Amazon S3 client,
@@ -35,7 +38,12 @@ public class ExtendedClientConfiguration {
3538
private String s3BucketName;
3639
private boolean largePayloadSupport = false;
3740
private boolean alwaysThroughS3 = false;
41+
private boolean retainS3Messages = false;
3842
private int messageSizeThreshold = SQSExtendedClientConstants.DEFAULT_MESSAGE_SIZE_THRESHOLD;
43+
private S3KeyGenerator s3KeyGenerator = new S3KeyGenerator() {
44+
public String generateObjectKey(SendMessageRequest sendMessageRequest) { return UUID.randomUUID().toString();}
45+
public String generateObjectKey(SendMessageBatchRequestEntry batchEntry) { return UUID.randomUUID().toString();}
46+
};
3947

4048
public ExtendedClientConfiguration() {
4149
s3 = null;
@@ -47,7 +55,9 @@ public ExtendedClientConfiguration(ExtendedClientConfiguration other) {
4755
this.s3BucketName = other.s3BucketName;
4856
this.largePayloadSupport = other.largePayloadSupport;
4957
this.alwaysThroughS3 = other.alwaysThroughS3;
58+
this.retainS3Messages = other.retainS3Messages;
5059
this.messageSizeThreshold = other.messageSizeThreshold;
60+
this.s3KeyGenerator = other.s3KeyGenerator;
5161
}
5262

5363
/**
@@ -214,4 +224,72 @@ public ExtendedClientConfiguration withAlwaysThroughS3(boolean alwaysThroughS3)
214224
public boolean isAlwaysThroughS3() {
215225
return alwaysThroughS3;
216226
}
227+
228+
/**
229+
* Sets whether or not messages are deleted in S3 when they are delete from
230+
* the queue.
231+
*
232+
* @param retainS3Messages
233+
* Whether or not messages are deleted in S3 when they are delete
234+
* from the queue. Default: false
235+
*/
236+
public void setRetainS3Messages(boolean retainS3Messages) {
237+
this.retainS3Messages = retainS3Messages;
238+
}
239+
240+
/**
241+
* Sets whether or not messages are deleted in S3 when they are delete from
242+
* the queue.
243+
*
244+
* @param retainS3Messages
245+
* Whether or not messages are deleted in S3 when they are delete
246+
* from the queue. Default: false
247+
* @return the updated ExtendedClientConfiguration object.
248+
*/
249+
public ExtendedClientConfiguration withRetainS3Messages(boolean retainS3Messages) {
250+
setRetainS3Messages(retainS3Messages);
251+
return this;
252+
}
253+
254+
/**
255+
* Checks whether or not messages are deleted in S3 when they are delete from
256+
* the queue.
257+
*
258+
* @return True if messages are delete when they are deleted from the queue.
259+
* Default: false
260+
*/
261+
public boolean isRetainS3Messages() {
262+
return retainS3Messages;
263+
}
264+
265+
/**
266+
* Get the S3KeyGenerator used to generate the S3 object keys for new messages.
267+
*
268+
* @return the S3KeyGenerator used to generate the S3 object keys for new messages.
269+
*/
270+
public S3KeyGenerator getS3KeyGenerator() {
271+
return s3KeyGenerator;
272+
}
273+
274+
/**
275+
* Set the S3KeyGenerator used to generate the S3 object keys for new messages.
276+
*
277+
* @param s3KeyGenerator the S3KeyGenerator used to generate the S3 object keys for
278+
* new messages.
279+
*/
280+
public void setS3KeyGenerator(S3KeyGenerator s3KeyGenerator) {
281+
this.s3KeyGenerator = s3KeyGenerator;
282+
}
283+
284+
/**
285+
* Set the S3KeyGenerator used to generate the S3 object keys for new messages.
286+
*
287+
* @param s3KeyGenerator the S3KeyGenerator used to generate the S3 object keys for
288+
* new messages.
289+
* @return the updated ExtendedClientConfiguration object.
290+
*/
291+
public ExtendedClientConfiguration withS3KeyGenerator(S3KeyGenerator s3KeyGenerator) {
292+
setS3KeyGenerator(s3KeyGenerator);
293+
return this;
294+
}
217295
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2010-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package com.amazon.sqs.javamessaging;
17+
18+
import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
19+
import com.amazonaws.services.sqs.model.SendMessageRequest;
20+
21+
/**
22+
* Defines the contract of a S3 object key generator to use when persisting
23+
* SQS messages to an S3 bucket.
24+
*
25+
* Your implementation must always return a unique key, even if it is passed
26+
* the same SendMessageRequest or SendMessageBatchRequestEntry instance.
27+
*/
28+
public interface S3KeyGenerator {
29+
/**
30+
* Method to generate a unique S3 object key from a SendMessageRequest instance.
31+
*
32+
* @param sendMessageRequest the request object for the new message.
33+
* @return a unique S3 object key.
34+
*/
35+
String generateObjectKey(SendMessageRequest sendMessageRequest);
36+
37+
/**
38+
* Method to generate a unique S3 object key from a SendMessageBatchRequestEntry
39+
* instance.
40+
*
41+
* @param batchEntry the batch request object for the new message.
42+
* @return a unique S3 object key.
43+
*/
44+
String generateObjectKey(SendMessageBatchRequestEntry batchEntry);
45+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
* Copyright 2010-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package com.amazon.sqs.javamessaging;
17+
18+
import com.amazonaws.auth.AWSCredentials;
19+
import com.amazonaws.auth.BasicAWSCredentials;
20+
import com.amazonaws.services.s3.AmazonS3;
21+
import com.amazonaws.services.s3.AmazonS3Client;
22+
import com.amazonaws.services.sqs.AmazonSQS;
23+
import com.amazonaws.services.sqs.AmazonSQSClient;
24+
import com.amazonaws.services.sqs.model.*;
25+
import org.junit.Before;
26+
import org.junit.Test;
27+
28+
import java.text.SimpleDateFormat;
29+
import java.util.*;
30+
31+
32+
/**
33+
* Tests the AmazonSQSExtendedClient class.
34+
*/
35+
public class AmazonSQSExtendedClientIntegrationTest {
36+
37+
private AmazonSQSClient client;
38+
private AmazonSQS sqs;
39+
private AmazonS3 s3;
40+
private static final AWSCredentials AWS_CREDS = new BasicAWSCredentials("[YOUR_AWS_KEY]", "[YOUR_AWS_SECRET]");
41+
private static final String S3_BUCKET_NAME = "[YOUR_EXISTING_S3_BUCKET_NAME]";
42+
private static final String SQS_QUEUE_URL = "[YOUR_EXISTING_SQS_QUEUE_NAME]";
43+
private static final int SQS_SIZE_LIMIT = 262144;
44+
45+
/**
46+
* A simple S3KeyGenerator implementation that pre-pends the current date in front of the unique S3 key name.
47+
*/
48+
class DateKeyGenerator implements S3KeyGenerator {
49+
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");
50+
51+
@Override
52+
public String generateObjectKey(SendMessageRequest sendMessageRequest) {
53+
return formatter.format(Calendar.getInstance().getTime()) + "/" + UUID.randomUUID().toString();
54+
}
55+
56+
@Override
57+
public String generateObjectKey(SendMessageBatchRequestEntry batchEntry) {
58+
return formatter.format(Calendar.getInstance().getTime()) + "/" + UUID.randomUUID().toString();
59+
}
60+
}
61+
62+
/**
63+
* A S3KeyGenerator implementation that uses the message meta data to construct a unique S3 key name.
64+
*/
65+
class MetaBasedKeyGenerator implements S3KeyGenerator {
66+
67+
private String metaKey;
68+
69+
public MetaBasedKeyGenerator(String metaKey) {
70+
this.metaKey = metaKey;
71+
}
72+
73+
private String getPrefix(Map<String, MessageAttributeValue> attribsMap) {
74+
if (attribsMap.containsKey(metaKey))
75+
return attribsMap.get(metaKey).getStringValue();
76+
else
77+
return "NOT_SPECIFIED";
78+
}
79+
80+
@Override
81+
public String generateObjectKey(SendMessageRequest sendMessageRequest) {
82+
return getPrefix(sendMessageRequest.getMessageAttributes()) + "/" + UUID.randomUUID().toString();
83+
}
84+
85+
@Override
86+
public String generateObjectKey(SendMessageBatchRequestEntry batchEntry) {
87+
return getPrefix(batchEntry.getMessageAttributes()) + "/" + UUID.randomUUID().toString();
88+
}
89+
}
90+
91+
static final String EVENT_TYPE = "EVENT_TYPE";
92+
93+
@Before
94+
public void setupClient() {
95+
96+
s3 = new AmazonS3Client(AWS_CREDS);
97+
s3.setEndpoint("s3-eu-west-1.amazonaws.com");
98+
99+
ExtendedClientConfiguration extendedClientConfiguration = new ExtendedClientConfiguration()
100+
.withLargePayloadSupportEnabled(s3, S3_BUCKET_NAME)
101+
//.withS3KeyGenerator(new DateKeyGenerator());
102+
.withS3KeyGenerator(new MetaBasedKeyGenerator(EVENT_TYPE))
103+
.withRetainS3Messages(true);
104+
105+
client = new AmazonSQSClient(AWS_CREDS); //mock(AmazonSQSClient.class)
106+
sqs = new AmazonSQSExtendedClient(client, extendedClientConfiguration);
107+
}
108+
109+
/* UNCOMMENT THIS TEST ONCE YOU HAVE FILLED IN YOUR AWS DETAILS IN THE STATIC VARS ABOVE
110+
111+
@Test
112+
public void exercise() {
113+
114+
int messageLength = SQS_SIZE_LIMIT + 1;
115+
String messageBody = generateString(messageLength);
116+
117+
MessageAttributeValue messageAttributeValue = new MessageAttributeValue();
118+
messageAttributeValue.setDataType("String");
119+
messageAttributeValue.setStringValue("Payment_Success");
120+
121+
Map<String,MessageAttributeValue> metaData = new HashMap<String,MessageAttributeValue>();
122+
metaData.put(EVENT_TYPE, messageAttributeValue);
123+
124+
SendMessageRequest messageRequest = new SendMessageRequest(SQS_QUEUE_URL, messageBody)
125+
.withMessageAttributes(metaData);
126+
127+
SendMessageResult sendResult = sqs.sendMessage(messageRequest);
128+
System.out.println(sendResult);
129+
130+
ReceiveMessageRequest requestMsg = new ReceiveMessageRequest(SQS_QUEUE_URL).withMaxNumberOfMessages(1);
131+
132+
ReceiveMessageResult receiveResult = sqs.receiveMessage(requestMsg);
133+
//System.out.println(receiveResult);
134+
135+
DeleteMessageRequest deleteRequest = new DeleteMessageRequest(SQS_QUEUE_URL, receiveResult.getMessages().get(0).getReceiptHandle());
136+
137+
sqs.deleteMessage(deleteRequest);
138+
}*/
139+
140+
141+
private String generateString(int messageLength) {
142+
char[] charArray = new char[messageLength];
143+
Arrays.fill(charArray, 'x');
144+
return new String(charArray);
145+
}
146+
}

src/test/java/com/amazon/sqs/javamessaging/ExtendedClientConfigurationTest.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,14 @@
1717

1818
import com.amazonaws.services.s3.AmazonS3;
1919
import com.amazonaws.services.s3.model.PutObjectRequest;
20+
import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
21+
import com.amazonaws.services.sqs.model.SendMessageRequest;
2022
import junit.framework.Assert;
2123
import org.junit.Before;
2224
import org.junit.Test;
25+
26+
import java.util.UUID;
27+
2328
import static org.mockito.Matchers.isA;
2429
import static org.mockito.Mockito.*;
2530

@@ -42,20 +47,29 @@ public void testCopyConstructor() {
4247
when(s3.putObject(isA(PutObjectRequest.class))).thenReturn(null);
4348

4449
boolean alwaysThroughS3 = true;
50+
boolean retainS3Messages = true;
4551
int messageSizeThreshold = 500;
4652

4753
ExtendedClientConfiguration extendedClientConfig = new ExtendedClientConfiguration();
4854

55+
S3KeyGenerator s3KeyGenerator = new S3KeyGenerator() {
56+
public String generateObjectKey(SendMessageRequest sendMessageRequest) { return UUID.randomUUID().toString();}
57+
public String generateObjectKey(SendMessageBatchRequestEntry batchEntry) { return UUID.randomUUID().toString();}
58+
};
59+
4960
extendedClientConfig.withLargePayloadSupportEnabled(s3, s3BucketName)
50-
.withAlwaysThroughS3(alwaysThroughS3).withMessageSizeThreshold(messageSizeThreshold);
61+
.withAlwaysThroughS3(alwaysThroughS3).withMessageSizeThreshold(messageSizeThreshold)
62+
.withRetainS3Messages(retainS3Messages).withS3KeyGenerator(s3KeyGenerator);
5163

5264
ExtendedClientConfiguration newExtendedClientConfig = new ExtendedClientConfiguration(extendedClientConfig);
5365

5466
Assert.assertEquals(s3, newExtendedClientConfig.getAmazonS3Client());
5567
Assert.assertEquals(s3BucketName, newExtendedClientConfig.getS3BucketName());
5668
Assert.assertTrue(newExtendedClientConfig.isLargePayloadSupportEnabled());
5769
Assert.assertEquals(alwaysThroughS3, newExtendedClientConfig.isAlwaysThroughS3());
70+
Assert.assertEquals(retainS3Messages, newExtendedClientConfig.isRetainS3Messages());
5871
Assert.assertEquals(messageSizeThreshold, newExtendedClientConfig.getMessageSizeThreshold());
72+
Assert.assertEquals(s3KeyGenerator, newExtendedClientConfig.getS3KeyGenerator());
5973

6074
Assert.assertNotSame(newExtendedClientConfig, extendedClientConfig);
6175
}

0 commit comments

Comments
 (0)