Skip to content

Commit c5e3a74

Browse files
committed
update payload offloading and address comments
1 parent 227ed8b commit c5e3a74

File tree

6 files changed

+710
-158
lines changed

6 files changed

+710
-158
lines changed

README.md

Lines changed: 38 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ It saves the actual payload in S3 and publishes the reference of the stored S3 o
1313
<dependency>
1414
<groupId>software.amazon.sns</groupId>
1515
<artifactId>sns-extended-client</artifactId>
16-
<version>1.0.0</version>
16+
<version>2.0.0</version>
1717
<type>jar</type>
1818
</dependency>
1919
```
@@ -31,20 +31,18 @@ Below is the code sample that creates a sample topic and queue, subscribes the q
3131
```java
3232
import com.amazon.sqs.javamessaging.AmazonSQSExtendedClient;
3333
import com.amazon.sqs.javamessaging.ExtendedClientConfiguration;
34-
import com.amazonaws.regions.Region;
35-
import com.amazonaws.regions.Regions;
36-
import com.amazonaws.services.s3.AmazonS3;
37-
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
38-
import com.amazonaws.services.sns.AmazonSNS;
39-
import com.amazonaws.services.sns.AmazonSNSClientBuilder;
40-
import com.amazonaws.services.sns.model.CreateTopicRequest;
41-
import com.amazonaws.services.sns.model.PublishRequest;
42-
import com.amazonaws.services.sns.model.SetSubscriptionAttributesRequest;
43-
import com.amazonaws.services.sns.util.Topics;
44-
import com.amazonaws.services.sqs.AmazonSQS;
45-
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
46-
import com.amazonaws.services.sqs.model.CreateQueueRequest;
47-
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
34+
import software.amazon.awssdk.regions.Region;
35+
import software.amazon.awssdk.services.s3.S3Client;
36+
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
37+
import software.amazon.awssdk.services.sns.SnsClient;
38+
import software.amazon.awssdk.services.sns.model.CreateTopicRequest;
39+
import software.amazon.awssdk.services.sns.model.PublishRequest;
40+
import software.amazon.awssdk.services.sns.model.SetSubscriptionAttributesRequest;
41+
import software.amazon.awssdk.services.sns.model.SubscribeRequest;
42+
import software.amazon.awssdk.services.sqs.SqsClient;
43+
import software.amazon.awssdk.services.sqs.model.CreateQueueRequest;
44+
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
45+
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
4846
import software.amazon.sns.AmazonSNSExtendedClient;
4947
import software.amazon.sns.SNSExtendedClientConfiguration;
5048

@@ -54,59 +52,60 @@ public class Example {
5452
final String BUCKET_NAME = "extended-client-bucket";
5553
final String TOPIC_NAME = "extended-client-topic";
5654
final String QUEUE_NAME = "extended-client-queue";
57-
final Regions region = Regions.DEFAULT_REGION;
55+
final Region region = Region.US_WEST_2;
5856

5957
//Message threshold controls the maximum message size that will be allowed to be published
6058
//through SNS using the extended client. Payload of messages exceeding this value will be stored in
6159
//S3. The default value of this parameter is 256 KB which is the maximum message size in SNS (and SQS).
6260
final int EXTENDED_STORAGE_MESSAGE_SIZE_THRESHOLD = 32;
6361

6462
//Initialize SNS, SQS and S3 clients
65-
final AmazonSNS snsClient = AmazonSNSClientBuilder.standard().withRegion(region).build();
66-
final AmazonSQS sqsClient = AmazonSQSClientBuilder.standard().withRegion(region).build();
67-
final AmazonS3 s3Client = AmazonS3ClientBuilder.standard().withRegion(region).build();
63+
final SnsClient snsClient = SnsClient.builder().region(region).build();
64+
final SqsClient sqsClient = SqsClient.builder().region(region).build();
65+
final S3Client s3Client = S3Client.builder().region(region).build();
6866

6967
//Create bucket, topic, queue and subscription
70-
s3Client.createBucket(BUCKET_NAME);
68+
s3Client.createBucket(CreateBucketRequest.builder().bucket(BUCKET_NAME).build());
7169
final String topicArn = snsClient.createTopic(
72-
new CreateTopicRequest().withName(TOPIC_NAME)
73-
).getTopicArn();
70+
CreateTopicRequest.builder().name(TOPIC_NAME).build()
71+
).topicArn();
7472
final String queueUrl = sqsClient.createQueue(
75-
new CreateQueueRequest().withQueueName(QUEUE_NAME)
76-
).getQueueUrl();
77-
final String subscriptionArn = Topics.subscribeQueue(
78-
snsClient, sqsClient, topicArn, queueUrl
79-
);
73+
CreateQueueRequest.builder().queueName(QUEUE_NAME).build()
74+
).queueUrl();
75+
final String subscriptionArn = snsClient.subscribe(
76+
SubscribeRequest.builder().topicArn(topicArn).endpoint(queueUrl).build()
77+
).subscriptionArn();
8078

8179
//To read message content stored in S3 transparently through SQS extended client,
8280
//set the RawMessageDelivery subscription attribute to TRUE
83-
final SetSubscriptionAttributesRequest subscriptionAttributesRequest = new SetSubscriptionAttributesRequest();
84-
subscriptionAttributesRequest.setSubscriptionArn(subscriptionArn);
85-
subscriptionAttributesRequest.setAttributeName("RawMessageDelivery");
86-
subscriptionAttributesRequest.setAttributeValue("TRUE");
81+
final SetSubscriptionAttributesRequest subscriptionAttributesRequest = SetSubscriptionAttributesRequest.builder()
82+
.subscriptionArn(subscriptionArn)
83+
.attributeName("RawMessageDelivery")
84+
.attributeValue("TRUE")
85+
.build();
8786
snsClient.setSubscriptionAttributes(subscriptionAttributesRequest);
8887

89-
//Initialize SNS extended client
88+
//Initialize SNS extended client
9089
//PayloadSizeThreshold triggers message content storage in S3 when the threshold is exceeded
9190
//To store all messages content in S3, use AlwaysThroughS3 flag
9291
final SNSExtendedClientConfiguration snsExtendedClientConfiguration = new SNSExtendedClientConfiguration()
93-
.withPayloadSupportEnabled(s3Client, BUCKET_NAME)
94-
.withPayloadSizeThreshold(EXTENDED_STORAGE_MESSAGE_SIZE_THRESHOLD);
92+
.withPayloadSupportEnabled(s3Client, BUCKET_NAME)
93+
.withPayloadSizeThreshold(EXTENDED_STORAGE_MESSAGE_SIZE_THRESHOLD);
9594
final AmazonSNSExtendedClient snsExtendedClient = new AmazonSNSExtendedClient(snsClient, snsExtendedClientConfiguration);
9695

9796
//Publish message via SNS with storage in S3
9897
final String message = "This message is stored in S3 as it exceeds the threshold of 32 bytes set above.";
99-
snsExtendedClient.publish(topicArn, message);
98+
snsExtendedClient.publish(PublishRequest.builder().topicArn(topicArn).message(message).build());
10099

101100
//Initialize SQS extended client
102101
final ExtendedClientConfiguration sqsExtendedClientConfiguration = new ExtendedClientConfiguration()
103-
.withPayloadSupportEnabled(s3Client, BUCKET_NAME);
102+
.withPayloadSupportEnabled(s3Client, BUCKET_NAME);
104103
final AmazonSQSExtendedClient sqsExtendedClient =
105-
new AmazonSQSExtendedClient(sqsClient, sqsExtendedClientConfiguration);
104+
new AmazonSQSExtendedClient(sqsClient, sqsExtendedClientConfiguration);
106105

107106
//Read the message from the queue
108-
final ReceiveMessageResult result = sqsExtendedClient.receiveMessage(queueUrl);
109-
System.out.println("Received message is " + result.getMessages().get(0).getBody());
107+
final ReceiveMessageResponse response = sqsExtendedClient.receiveMessage(ReceiveMessageRequest.builder().queueUrl(queueUrl).build());
108+
System.out.println("Received message is " + response.messages().get(0).body());
110109
}
111110
}
112111
```

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
<dependency>
4545
<groupId>software.amazon.payloadoffloading</groupId>
4646
<artifactId>payloadoffloading-common</artifactId>
47-
<version>1.0.0</version>
47+
<version>2.0.0</version>
4848
</dependency>
4949
<dependency>
5050
<groupId>com.amazonaws</groupId>

src/main/java/software/amazon/sns/AmazonSNSExtendedClient.java

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
11
package software.amazon.sns;
22

33
import com.amazon.sqs.javamessaging.SQSExtendedClientConstants;
4-
import com.amazonaws.AmazonClientException;
5-
import com.amazonaws.util.StringUtils;
64
import org.apache.commons.logging.Log;
75
import org.apache.commons.logging.LogFactory;
86

97
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
8+
import software.amazon.awssdk.awscore.exception.AwsServiceException;
109
import software.amazon.awssdk.core.SdkBytes;
1110
import software.amazon.awssdk.core.exception.SdkClientException;
1211
import software.amazon.awssdk.core.exception.SdkException;
1312
import software.amazon.awssdk.services.sns.SnsClient;
1413
import software.amazon.awssdk.services.sns.model.*;
14+
import software.amazon.awssdk.utils.StringUtils;
1515
import software.amazon.payloadoffloading.*;
1616

1717
import java.util.HashMap;
@@ -43,7 +43,7 @@ public AmazonSNSExtendedClient(SnsClient snsClient, SNSExtendedClientConfigurati
4343
super(snsClient);
4444

4545
this.snsExtendedClientConfiguration = snsExtendedClientConfiguration;
46-
S3Dao s3Dao = new S3Dao(this.snsExtendedClientConfiguration.getAmazonS3Client());
46+
S3Dao s3Dao = new S3Dao(this.snsExtendedClientConfiguration.getS3Client());
4747
this.payloadStore = new S3BackedPayloadStore(s3Dao, this.snsExtendedClientConfiguration.getS3BucketName());
4848
}
4949

@@ -107,16 +107,20 @@ public AmazonSNSExtendedClient(SnsClient snsClient, SNSExtendedClientConfigurati
107107
* Documentation</a>
108108
*/
109109
@Override
110-
public PublishResponse publish(PublishRequest publishRequest) {
111-
if (publishRequest == null || StringUtils.isNullOrEmpty(publishRequest.message())) {
110+
public PublishResponse publish(PublishRequest publishRequest) throws InvalidParameterException,
111+
InvalidParameterValueException, InternalErrorException, NotFoundException, EndpointDisabledException,
112+
PlatformApplicationDisabledException, AuthorizationErrorException, KmsDisabledException, KmsInvalidStateException,
113+
KmsNotFoundException, KmsOptInRequiredException, KmsThrottlingException, KmsAccessDeniedException,
114+
InvalidSecurityException, AwsServiceException, SdkClientException, SnsException {
115+
if (publishRequest == null || StringUtils.isEmpty(publishRequest.message())) {
112116
return super.publish(publishRequest);
113117
}
114118

115-
if (!StringUtils.isNullOrEmpty(publishRequest.messageStructure()) &&
119+
if (!StringUtils.isEmpty(publishRequest.messageStructure()) &&
116120
publishRequest.messageStructure().equals(MULTIPLE_PROTOCOL_MESSAGE_STRUCTURE)) {
117121
String errorMessage = "SNS extended client does not support sending JSON messages.";
118122
LOGGER.error(errorMessage);
119-
throw new AmazonClientException(errorMessage);
123+
throw SdkClientException.create(errorMessage);
120124
}
121125

122126
PublishRequest.Builder publishRequestBuilder = publishRequest.toBuilder();
@@ -151,7 +155,7 @@ private void checkMessageAttributes(Map<String, MessageAttributeValue> messageAt
151155
+ "] exceeds the maximum allowed for large-payload messages ["
152156
+ SQSExtendedClientConstants.MAX_ALLOWED_ATTRIBUTES + "].";
153157
LOGGER.error(errorMessage);
154-
throw new AmazonClientException(errorMessage);
158+
throw SdkClientException.create(errorMessage);
155159
}
156160

157161
MessageAttributeValue largePayloadAttributeName = messageAttributes.get(SQSExtendedClientConstants.RESERVED_ATTRIBUTE_NAME);
@@ -160,7 +164,7 @@ private void checkMessageAttributes(Map<String, MessageAttributeValue> messageAt
160164
String errorMessage = "Message attribute name " + SQSExtendedClientConstants.RESERVED_ATTRIBUTE_NAME
161165
+ " is reserved for use by SNS extended client.";
162166
LOGGER.error(errorMessage);
163-
throw new AmazonClientException(errorMessage);
167+
throw SdkClientException.create(errorMessage);
164168
}
165169
}
166170

@@ -170,7 +174,7 @@ private void checkSizeOfMessageAttributes(long messageAttributeSize) {
170174
+ " bytes which is larger than the threshold of " + snsExtendedClientConfiguration.getPayloadSizeThreshold()
171175
+ " Bytes. Consider including the payload in the message body instead of message attributes.";
172176
LOGGER.error(errorMessage);
173-
throw new AmazonClientException(errorMessage);
177+
throw SdkClientException.create(errorMessage);
174178
}
175179
}
176180

@@ -213,8 +217,7 @@ private PublishRequest storeMessageInExtendedStore(PublishRequest publishRequest
213217
Long messageContentSize = Util.getStringSizeInBytes(messageContentStr);
214218

215219
PublishRequest.Builder publishRequestBuilder = publishRequest.toBuilder();
216-
String largeMessagePointer = payloadStore.storeOriginalPayload(messageContentStr,
217-
messageContentSize);
220+
String largeMessagePointer = payloadStore.storeOriginalPayload(messageContentStr);
218221
publishRequestBuilder.message(largeMessagePointer);
219222

220223
MessageAttributeValue.Builder messageAttributeValueBuilder = MessageAttributeValue.builder();

0 commit comments

Comments
 (0)