Skip to content

Commit dcac111

Browse files
author
Evangilo Morais
committed
add support to set a prefix for the S3 key #7
1 parent 50ee609 commit dcac111

File tree

5 files changed

+205
-9
lines changed

5 files changed

+205
-9
lines changed

src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.List;
2323
import java.util.Map;
2424
import java.util.Optional;
25+
import java.util.UUID;
2526

2627
import org.apache.commons.logging.Log;
2728
import org.apache.commons.logging.LogFactory;
@@ -965,7 +966,7 @@ private SendMessageBatchRequestEntry storeMessageInS3(SendMessageBatchRequestEnt
965966
updateMessageAttributePayloadSize(batchEntry.messageAttributes(), messageContentSize));
966967

967968
// Store the message content in S3.
968-
String largeMessagePointer = payloadStore.storeOriginalPayload(messageContentStr);
969+
String largeMessagePointer = storeOriginalPayload(messageContentStr);
969970
batchEntryBuilder.messageBody(largeMessagePointer);
970971

971972
return batchEntryBuilder.build();
@@ -984,12 +985,20 @@ private SendMessageRequest storeMessageInS3(SendMessageRequest sendMessageReques
984985
updateMessageAttributePayloadSize(sendMessageRequest.messageAttributes(), messageContentSize));
985986

986987
// Store the message content in S3.
987-
String largeMessagePointer = payloadStore.storeOriginalPayload(messageContentStr);
988+
String largeMessagePointer = storeOriginalPayload(messageContentStr);
988989
sendMessageRequestBuilder.messageBody(largeMessagePointer);
989990

990991
return sendMessageRequestBuilder.build();
991992
}
992993

994+
private String storeOriginalPayload(String messageContentStr) {
995+
String s3KeyPrefix = clientConfiguration.getS3KeyPrefix();
996+
if (StringUtils.isBlank(s3KeyPrefix)) {
997+
return payloadStore.storeOriginalPayload(messageContentStr);
998+
}
999+
return payloadStore.storeOriginalPayload(messageContentStr, s3KeyPrefix + UUID.randomUUID());
1000+
}
1001+
9931002
private Map<String, MessageAttributeValue> updateMessageAttributePayloadSize(
9941003
Map<String, MessageAttributeValue> messageAttributes, Long messageContentSize) {
9951004
Map<String, MessageAttributeValue> updatedMessageAttributes = new HashMap<>(messageAttributes);

src/main/java/com/amazon/sqs/javamessaging/ExtendedClientConfiguration.java

+71
Original file line numberDiff line numberDiff line change
@@ -15,23 +15,36 @@
1515

1616
package com.amazon.sqs.javamessaging;
1717

18+
import org.apache.commons.logging.Log;
19+
import org.apache.commons.logging.LogFactory;
1820
import software.amazon.awssdk.annotations.NotThreadSafe;
21+
import software.amazon.awssdk.core.exception.SdkClientException;
1922
import software.amazon.awssdk.services.s3.S3Client;
2023
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
24+
import software.amazon.awssdk.utils.StringUtils;
2125
import software.amazon.payloadoffloading.PayloadStorageConfiguration;
2226
import software.amazon.payloadoffloading.ServerSideEncryptionStrategy;
2327

28+
import java.util.regex.Pattern;
29+
2430

2531
/**
2632
* Amazon SQS extended client configuration options such as Amazon S3 client,
2733
* bucket name, and message size threshold for large-payload messages.
2834
*/
2935
@NotThreadSafe
3036
public class ExtendedClientConfiguration extends PayloadStorageConfiguration {
37+
private static final Log LOG = LogFactory.getLog(ExtendedClientConfiguration.class);
38+
39+
private static final int UUID_LENGTH = 36;
40+
private static final int MAX_S3_KEY_LENGTH = 1024;
41+
private static final int MAX_S3_KEY_PREFIX_LENGTH = MAX_S3_KEY_LENGTH - UUID_LENGTH;
42+
private static final Pattern INVALID_S3_PREFIX_KEY_CHARACTERS_PATTERN = Pattern.compile("[^a-zA-Z0-9./_-]");
3143

3244
private boolean cleanupS3Payload = true;
3345
private boolean useLegacyReservedAttributeName = true;
3446
private boolean ignorePayloadNotFound = false;
47+
private String s3KeyPrefix = "";
3548

3649
public ExtendedClientConfiguration() {
3750
super();
@@ -43,6 +56,7 @@ public ExtendedClientConfiguration(ExtendedClientConfiguration other) {
4356
this.cleanupS3Payload = other.doesCleanupS3Payload();
4457
this.useLegacyReservedAttributeName = other.usesLegacyReservedAttributeName();
4558
this.ignorePayloadNotFound = other.ignoresPayloadNotFound();
59+
this.s3KeyPrefix = other.s3KeyPrefix;
4660
}
4761

4862
/**
@@ -128,6 +142,63 @@ public ExtendedClientConfiguration withIgnorePayloadNotFound(boolean ignorePaylo
128142
return this;
129143
}
130144

145+
/**
146+
* Sets a string that will be used as prefix of the S3 Key.
147+
*
148+
* @param s3KeyPrefix
149+
* A S3 key prefix value
150+
*/
151+
public void setS3KeyPrefix(String s3KeyPrefix) {
152+
String trimmedPrefix = StringUtils.trimToEmpty(s3KeyPrefix);
153+
154+
if (trimmedPrefix.length() > MAX_S3_KEY_PREFIX_LENGTH) {
155+
String errorMessage = "The S3 key prefix length must not be greater than " + MAX_S3_KEY_PREFIX_LENGTH;
156+
LOG.error(errorMessage);
157+
throw SdkClientException.create(errorMessage);
158+
}
159+
160+
if (trimmedPrefix.startsWith(".") || trimmedPrefix.startsWith("/")) {
161+
String errorMessage = "The S3 key prefix must not starts with '.' or '/'";
162+
LOG.error(errorMessage);
163+
throw SdkClientException.create(errorMessage);
164+
}
165+
166+
if (trimmedPrefix.contains("..")) {
167+
String errorMessage = "The S3 key prefix must not contains the string '..'";
168+
LOG.error(errorMessage);
169+
throw SdkClientException.create(errorMessage);
170+
}
171+
172+
if (INVALID_S3_PREFIX_KEY_CHARACTERS_PATTERN.matcher(trimmedPrefix).find()) {
173+
String errorMessage = "The S3 key prefix contain invalid characters. The allowed characters are: letters, digits, '/', '_', '-', and '.'";
174+
LOG.error(errorMessage);
175+
throw SdkClientException.create(errorMessage);
176+
}
177+
178+
this.s3KeyPrefix = trimmedPrefix;
179+
}
180+
181+
/**
182+
* Sets a string that will be used as prefix of the S3 Key.
183+
*
184+
* @param s3KeyPrefix
185+
* A S3 key prefix value
186+
*
187+
* @return the updated ExtendedClientConfiguration object.
188+
*/
189+
public ExtendedClientConfiguration withS3KeyPrefix(String s3KeyPrefix) {
190+
setS3KeyPrefix(s3KeyPrefix);
191+
return this;
192+
}
193+
194+
/**
195+
* Gets the S3 key prefix
196+
* @return the prefix value which is being used for compose the S3 key.
197+
*/
198+
public String getS3KeyPrefix() {
199+
return this.s3KeyPrefix;
200+
}
201+
131202
/**
132203
* Checks whether or not clean up large objects in S3 is enabled.
133204
*

src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClientTest.java

+52-7
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,13 @@
1515

1616
package com.amazon.sqs.javamessaging;
1717

18+
import static com.amazon.sqs.javamessaging.StringTestUtil.generateStringWithLength;
19+
20+
import org.junit.jupiter.api.AfterEach;
1821
import org.junit.jupiter.api.BeforeEach;
1922
import org.junit.jupiter.api.Test;
2023
import org.mockito.ArgumentCaptor;
24+
import org.mockito.MockedStatic;
2125
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
2226
import software.amazon.awssdk.core.ApiName;
2327
import software.amazon.awssdk.core.ResponseInputStream;
@@ -62,9 +66,11 @@
6266
import static org.junit.jupiter.api.Assertions.assertNull;
6367
import static org.junit.jupiter.api.Assertions.assertTrue;
6468
import static org.mockito.ArgumentMatchers.any;
69+
import static org.mockito.ArgumentMatchers.argThat;
6570
import static org.mockito.ArgumentMatchers.eq;
6671
import static org.mockito.Mockito.isA;
6772
import static org.mockito.Mockito.mock;
73+
import static org.mockito.Mockito.mockStatic;
6874
import static org.mockito.Mockito.never;
6975
import static org.mockito.Mockito.spy;
7076
import static org.mockito.Mockito.times;
@@ -82,11 +88,16 @@ public class AmazonSQSExtendedClientTest {
8288
private SqsClient extendedSqsWithDefaultKMS;
8389
private SqsClient extendedSqsWithGenericReservedAttributeName;
8490
private SqsClient extendedSqsWithDeprecatedMethods;
91+
private SqsClient extendedSqsWithS3KeyPrefix;
8592
private SqsClient mockSqsBackend;
8693
private S3Client mockS3;
94+
95+
private MockedStatic<UUID> uuidMockStatic;
8796
private static final String S3_BUCKET_NAME = "test-bucket-name";
8897
private static final String SQS_QUEUE_URL = "test-queue-url";
8998
private static final String S3_SERVER_SIDE_ENCRYPTION_KMS_KEY_ID = "test-customer-managed-kms-key-id";
99+
private static final String S3_KEY_PREFIX = "test-s3-key-prefix";
100+
private static final String S3_KEY_UUID = "test-s3-key-uuid";
90101

91102
private static final int LESS_THAN_SQS_SIZE_LIMIT = 3;
92103
private static final int SQS_SIZE_LIMIT = 262144;
@@ -101,6 +112,7 @@ public class AmazonSQSExtendedClientTest {
101112

102113
@BeforeEach
103114
public void setupClients() {
115+
uuidMockStatic = mockStatic(UUID.class);
104116
mockS3 = mock(S3Client.class);
105117
mockSqsBackend = mock(SqsClient.class);
106118
when(mockS3.putObject(isA(PutObjectRequest.class), isA(RequestBody.class))).thenReturn(null);
@@ -121,11 +133,25 @@ public void setupClients() {
121133

122134
ExtendedClientConfiguration extendedClientConfigurationDeprecated = new ExtendedClientConfiguration().withPayloadSupportEnabled(mockS3, S3_BUCKET_NAME);
123135

136+
ExtendedClientConfiguration extendedClientConfigurationWithS3KeyPrefix = new ExtendedClientConfiguration()
137+
.withPayloadSupportEnabled(mockS3, S3_BUCKET_NAME)
138+
.withS3KeyPrefix(S3_KEY_PREFIX);
139+
140+
UUID uuidMock = mock(UUID.class);
141+
when(uuidMock.toString()).thenReturn(S3_KEY_UUID);
142+
uuidMockStatic.when(UUID::randomUUID).thenReturn(uuidMock);
143+
124144
extendedSqsWithDefaultConfig = spy(new AmazonSQSExtendedClient(mockSqsBackend, extendedClientConfiguration));
125145
extendedSqsWithCustomKMS = spy(new AmazonSQSExtendedClient(mockSqsBackend, extendedClientConfigurationWithCustomKMS));
126146
extendedSqsWithDefaultKMS = spy(new AmazonSQSExtendedClient(mockSqsBackend, extendedClientConfigurationWithDefaultKMS));
127147
extendedSqsWithGenericReservedAttributeName = spy(new AmazonSQSExtendedClient(mockSqsBackend, extendedClientConfigurationWithGenericReservedAttributeName));
128148
extendedSqsWithDeprecatedMethods = spy(new AmazonSQSExtendedClient(mockSqsBackend, extendedClientConfigurationDeprecated));
149+
extendedSqsWithS3KeyPrefix = spy(new AmazonSQSExtendedClient(mockSqsBackend, extendedClientConfigurationWithS3KeyPrefix));
150+
}
151+
152+
@AfterEach
153+
public void tearDown() {
154+
uuidMockStatic.close();
129155
}
130156

131157
@Test
@@ -617,6 +643,32 @@ public void testWhenSendMessageWIthCannedAccessControlListDefined() {
617643
assertEquals(expected, captor.getValue().acl());
618644
}
619645

646+
@Test
647+
public void testWhenSendLargeMessageWithS3PrefixKeyDefined() {
648+
String messageBody = generateStringWithLength(MORE_THAN_SQS_SIZE_LIMIT);
649+
650+
SendMessageRequest messageRequest = SendMessageRequest.builder().queueUrl(SQS_QUEUE_URL).messageBody(messageBody).build();
651+
652+
extendedSqsWithS3KeyPrefix.sendMessage(messageRequest);
653+
654+
verify(mockS3, times(1)).putObject(
655+
argThat((PutObjectRequest obj) -> obj.key().equals(S3_KEY_PREFIX + S3_KEY_UUID)),
656+
isA(RequestBody.class));
657+
}
658+
659+
@Test
660+
public void testWhenSendLargeMessageWithUndefinedS3PrefixKey() {
661+
String messageBody = generateStringWithLength(MORE_THAN_SQS_SIZE_LIMIT);
662+
663+
SendMessageRequest messageRequest = SendMessageRequest.builder().queueUrl(SQS_QUEUE_URL).messageBody(messageBody).build();
664+
665+
extendedSqsWithDefaultConfig.sendMessage(messageRequest);
666+
667+
verify(mockS3, times(1)).putObject(
668+
argThat((PutObjectRequest obj) -> obj.key().equals(S3_KEY_UUID)),
669+
isA(RequestBody.class));
670+
}
671+
620672
private void testReceiveMessage_when_MessageIsLarge(String reservedAttributeName) {
621673
String pointer = new PayloadS3Pointer(S3_BUCKET_NAME, "S3Key").toJson();
622674
Message message = Message.builder()
@@ -665,11 +717,4 @@ private String getLargeReceiptHandle(String s3Key, String originalReceiptHandle)
665717
private String getSampleLargeReceiptHandle(String originalReceiptHandle) {
666718
return getLargeReceiptHandle(UUID.randomUUID().toString(), originalReceiptHandle);
667719
}
668-
669-
private String generateStringWithLength(int messageLength) {
670-
char[] charArray = new char[messageLength];
671-
Arrays.fill(charArray, 'x');
672-
return new String(charArray);
673-
}
674-
675720
}

src/test/java/com/amazon/sqs/javamessaging/ExtendedClientConfigurationTest.java

+60
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,12 @@
1515

1616
package com.amazon.sqs.javamessaging;
1717

18+
import static com.amazon.sqs.javamessaging.StringTestUtil.generateStringWithLength;
19+
1820
import org.junit.jupiter.api.Test;
21+
import org.junit.jupiter.params.ParameterizedTest;
22+
import org.junit.jupiter.params.provider.ValueSource;
23+
import software.amazon.awssdk.core.exception.SdkClientException;
1924
import software.amazon.awssdk.services.s3.S3Client;
2025
import software.amazon.payloadoffloading.ServerSideEncryptionFactory;
2126
import software.amazon.payloadoffloading.ServerSideEncryptionStrategy;
@@ -24,6 +29,7 @@
2429
import static org.junit.jupiter.api.Assertions.assertFalse;
2530
import static org.junit.jupiter.api.Assertions.assertNotNull;
2631
import static org.junit.jupiter.api.Assertions.assertNull;
32+
import static org.junit.jupiter.api.Assertions.assertThrows;
2733
import static org.junit.jupiter.api.Assertions.assertTrue;
2834
import static org.junit.jupiter.api.Assertions.assertNotSame;
2935
import static org.mockito.Mockito.mock;
@@ -164,4 +170,58 @@ public void testMessageSizeThreshold() {
164170
assertEquals(messageLength, extendedClientConfiguration.getPayloadSizeThreshold());
165171

166172
}
173+
174+
@ParameterizedTest
175+
@ValueSource(strings = {
176+
"test-s3-key-prefix",
177+
"TEST-S3-KEY-PREFIX",
178+
"test.s3.key.prefix",
179+
"test_s3_key_prefix",
180+
"test/s3/key/prefix/"
181+
})
182+
public void testS3keyPrefix(String s3KeyPrefix) {
183+
ExtendedClientConfiguration extendedClientConfiguration = new ExtendedClientConfiguration();
184+
185+
extendedClientConfiguration.withS3KeyPrefix(s3KeyPrefix);
186+
187+
assertEquals(s3KeyPrefix, extendedClientConfiguration.getS3KeyPrefix());
188+
}
189+
190+
@Test
191+
public void testTrimS3keyPrefix() {
192+
String s3KeyPrefix = "test-s3-key-prefix";
193+
ExtendedClientConfiguration extendedClientConfiguration = new ExtendedClientConfiguration();
194+
195+
extendedClientConfiguration.withS3KeyPrefix(String.format(" %s ", s3KeyPrefix));
196+
197+
assertEquals(s3KeyPrefix, extendedClientConfiguration.getS3KeyPrefix());
198+
}
199+
200+
@ParameterizedTest
201+
@ValueSource(strings = {
202+
".test-s3-key-prefix",
203+
"./test-s3-key-prefix",
204+
"../test-s3-key-prefix",
205+
"/test-s3-key-prefix",
206+
"test..s3..key..prefix",
207+
"test-s3-key-prefix@",
208+
"test s3 key prefix"
209+
})
210+
public void testS3KeyPrefixWithInvalidCharacters(String s3KeyPrefix) {
211+
ExtendedClientConfiguration extendedClientConfiguration = new ExtendedClientConfiguration();
212+
213+
assertThrows(SdkClientException.class, () -> extendedClientConfiguration.withS3KeyPrefix(s3KeyPrefix));
214+
}
215+
216+
@Test
217+
public void testS3keyPrefixWithALargeString() {
218+
int maxS3KeyLength = 1024;
219+
int uuidLength = 36;
220+
int maxS3KeyPrefixLength = maxS3KeyLength - uuidLength;
221+
String s3KeyPrefix = generateStringWithLength(maxS3KeyPrefixLength + 1);
222+
223+
ExtendedClientConfiguration extendedClientConfiguration = new ExtendedClientConfiguration();
224+
225+
assertThrows(SdkClientException.class, () -> extendedClientConfiguration.withS3KeyPrefix(s3KeyPrefix));
226+
}
167227
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package com.amazon.sqs.javamessaging;
2+
3+
import java.util.Arrays;
4+
5+
public class StringTestUtil {
6+
public static String generateStringWithLength(int messageLength) {
7+
char[] charArray = new char[messageLength];
8+
Arrays.fill(charArray, 'x');
9+
return new String(charArray);
10+
}
11+
}

0 commit comments

Comments
 (0)