Skip to content

Upgrade to AWS SDK v2 #6

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Oct 5, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>software.amazon.sns</groupId>
<artifactId>sns-extended-client</artifactId>
<version>1.0.0</version>
<version>2.0.0</version>
<packaging>jar</packaging>
<name>Amazon SNS Extended Client Library for Java</name>
<description>An extension to the Amazon SNS client that enables sending messages up to 2GB via Amazon S3.
Expand All @@ -26,7 +26,7 @@
</licenses>

<properties>
<aws-java-sdk.version>1.11.817</aws-java-sdk.version>
<aws-java-sdk.version>2.14.19</aws-java-sdk.version>
</properties>

<developers>
Expand All @@ -53,13 +53,13 @@
<type>jar</type>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sns</artifactId>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sns</artifactId>
<version>${aws-java-sdk.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<version>${aws-java-sdk.version}</version>
</dependency>
<dependency>
Expand Down
113 changes: 68 additions & 45 deletions src/main/java/software/amazon/sns/AmazonSNSExtendedClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,27 @@

import com.amazon.sqs.javamessaging.SQSExtendedClientConstants;
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.sns.AmazonSNS;
import com.amazonaws.services.sns.model.*;
import com.amazonaws.util.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.sns.SnsClient;
import software.amazon.awssdk.services.sns.model.*;
import software.amazon.payloadoffloading.*;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;

public class AmazonSNSExtendedClient extends AmazonSNSExtendedClientBase {
static final String MULTIPLE_PROTOCOL_MESSAGE_STRUCTURE = "json";
static final String USER_AGENT_HEADER_NAME = "User-Agent";
static final String USER_AGENT_HEADER = Util.getUserAgentHeader(AmazonSNSExtendedClient.class.getSimpleName());

private static final Log LOGGER = LogFactory.getLog(AmazonSNSExtendedClient.class);
private static final String USER_AGENT_HEADER = Util.getUserAgentHeader(AmazonSNSExtendedClient.class.getSimpleName());
private PayloadStore payloadStore;
private SNSExtendedClientConfiguration snsExtendedClientConfiguration;

Expand All @@ -34,7 +39,7 @@ public class AmazonSNSExtendedClient extends AmazonSNSExtendedClientBase {
* @param snsExtendedClientConfiguration The sns extended client configuration options controlling the
* functionality of this client.
*/
public AmazonSNSExtendedClient(AmazonSNS snsClient, SNSExtendedClientConfiguration snsExtendedClientConfiguration) {
public AmazonSNSExtendedClient(SnsClient snsClient, SNSExtendedClientConfiguration snsExtendedClientConfiguration) {
super(snsClient);

this.snsExtendedClientConfiguration = snsExtendedClientConfiguration;
Expand All @@ -44,7 +49,8 @@ public AmazonSNSExtendedClient(AmazonSNS snsClient, SNSExtendedClientConfigurati

/**
* <p>
* Sends a message to an Amazon SNS topic or sends a text message (SMS message) directly to a phone number.
* Sends a message to an Amazon SNS topic, a text message (SMS message) directly to a phone number, or a message to
* a mobile platform endpoint (when you specify the <code>TargetArn</code>).
* </p>
* <p>
* If you send a message to a topic, Amazon SNS delivers the message to each endpoint that is subscribed to the
Expand All @@ -61,9 +67,14 @@ public AmazonSNSExtendedClient(AmazonSNS snsClient, SNSExtendedClientConfigurati
* </p>
* <p>
* For more information about formatting messages, see <a
* href="http://docs.aws.amazon.com/sns/latest/dg/mobile-push-send-custommessage.html">Send Custom Platform-Specific
* Payloads in Messages to Mobile Devices</a>.
* href="https://docs.aws.amazon.com/sns/latest/dg/mobile-push-send-custommessage.html">Send Custom
* Platform-Specific Payloads in Messages to Mobile Devices</a>.
* </p>
* <important>
* <p>
* You can publish messages only to topics and endpoints in the same AWS Region.
* </p>
* </important>
*
* @param publishRequest Input for Publish action.
* @return Result of the Publish operation returned by the service.
Expand All @@ -74,33 +85,52 @@ public AmazonSNSExtendedClient(AmazonSNS snsClient, SNSExtendedClientConfigurati
* @throws EndpointDisabledException Exception error indicating endpoint disabled.
* @throws PlatformApplicationDisabledException Exception error indicating platform application disabled.
* @throws AuthorizationErrorException Indicates that the user has been denied access to the requested resource.
* @sample AmazonSNS.Publish
* @throws KmsDisabledException The request was rejected because the specified customer master key (CMK) isn't enabled.
* @throws KmsInvalidStateException The request was rejected because the state of the specified resource isn't valid for this request. For
* more information, see <a href="https://docs.aws.amazon.com/kms/latest/developerguide/key-state.html">How
* Key State Affects Use of a Customer Master Key</a> in the <i>AWS Key Management Service Developer
* Guide</i>.
* @throws KmsNotFoundException The request was rejected because the specified entity or resource can't be found.
* @throws KmsOptInRequiredException The AWS access key ID needs a subscription for the service.
* @throws KmsThrottlingException The request was denied due to request throttling. For more information about throttling, see <a
* href="https://docs.aws.amazon.com/kms/latest/developerguide/limits.html#requests-per-second">Limits</a>
* in the <i>AWS Key Management Service Developer Guide.</i>
* @throws KmsAccessDeniedException The ciphertext references a key that doesn't exist or that you don't have access to.
* @throws InvalidSecurityException The credential signature isn't valid. You must use an HTTPS endpoint and sign your request using
* Signature Version 4.
* @throws SdkException Base class for all exceptions that can be thrown by the SDK (both service and client). Can be used for
* catch all scenarios.
* @throws SdkClientException If any client side error occurs such as an IO related failure, failure to get credentials, etc.
* @throws SnsException Base class for all service exceptions. Unknown exceptions will be thrown as an instance of this type.
* @sample SnsClient.Publish
* @see <a href="http://docs.aws.amazon.com/goto/WebAPI/sns-2010-03-31/Publish" target="_top">AWS API
* Documentation</a>
*/
@Override
public PublishResult publish(PublishRequest publishRequest) {
if (publishRequest == null || StringUtils.isNullOrEmpty(publishRequest.getMessage())) {
public PublishResponse publish(PublishRequest publishRequest) {
if (publishRequest == null || StringUtils.isNullOrEmpty(publishRequest.message())) {
return super.publish(publishRequest);
}

if (!StringUtils.isNullOrEmpty(publishRequest.getMessageStructure()) &&
publishRequest.getMessageStructure().equals(MULTIPLE_PROTOCOL_MESSAGE_STRUCTURE)) {
if (!StringUtils.isNullOrEmpty(publishRequest.messageStructure()) &&
publishRequest.messageStructure().equals(MULTIPLE_PROTOCOL_MESSAGE_STRUCTURE)) {
String errorMessage = "SNS extended client does not support sending JSON messages.";
LOGGER.error(errorMessage);
throw new AmazonClientException(errorMessage);
}

publishRequest.getRequestClientOptions().appendUserAgent(USER_AGENT_HEADER);
PublishRequest.Builder publishRequestBuilder = publishRequest.toBuilder();
publishRequestBuilder.overrideConfiguration(AwsRequestOverrideConfiguration.builder().putHeader(USER_AGENT_HEADER_NAME, USER_AGENT_HEADER).build());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure if there was a better way to handle this. Please advise here

publishRequest = publishRequestBuilder.build();

long messageAttributesSize = getMsgAttributesSize(publishRequest.getMessageAttributes());
long messageBodySize = Util.getStringSizeInBytes(publishRequest.getMessage());
long messageAttributesSize = getMsgAttributesSize(publishRequest.messageAttributes());
long messageBodySize = Util.getStringSizeInBytes(publishRequest.message());

if (!shouldExtendedStoreBeUsed(messageAttributesSize + messageBodySize)) {
return super.publish(publishRequest);
}

checkMessageAttributes(publishRequest.getMessageAttributes());
checkMessageAttributes(publishRequest.messageAttributes());
checkSizeOfMessageAttributes(messageAttributesSize);

PublishRequest clonedPublishRequest = copyPublishRequest(publishRequest);
Expand All @@ -109,18 +139,6 @@ public PublishResult publish(PublishRequest publishRequest) {
return super.publish(publishRequest);
}

/**
* Simplified method form for invoking the Publish operation.
*
* @param topicArn
* @param message
* @see #publish(PublishRequest)
*/
@Override
public PublishResult publish(String topicArn, String message) {
return this.publish((new PublishRequest()).withTopicArn(topicArn).withMessage(message));
}

private boolean shouldExtendedStoreBeUsed(long totalMessageSize) {
return snsExtendedClientConfiguration.isAlwaysThroughS3() ||
(snsExtendedClientConfiguration.isPayloadSupportEnabled() && isTotalMessageSizeLargerThanThreshold(totalMessageSize));
Expand Down Expand Up @@ -173,48 +191,53 @@ private int getMsgAttributesSize(Map<String, MessageAttributeValue> msgAttribute
private long getMessageAttributeSize(String MessageAttributeKey, MessageAttributeValue value) {
long messageAttributeSize = Util.getStringSizeInBytes(MessageAttributeKey);

if (value.getDataType() != null) {
messageAttributeSize += Util.getStringSizeInBytes(value.getDataType());
if (value.dataType() != null) {
messageAttributeSize += Util.getStringSizeInBytes(value.dataType());
}

String stringVal = value.getStringValue();
String stringVal = value.stringValue();
if (stringVal != null) {
messageAttributeSize += Util.getStringSizeInBytes(stringVal);
}

ByteBuffer binaryVal = value.getBinaryValue();
SdkBytes binaryVal = value.binaryValue();
if (binaryVal != null) {
messageAttributeSize += binaryVal.array().length;
messageAttributeSize += binaryVal.asByteArray().length;
}

return messageAttributeSize;
}

private PublishRequest storeMessageInExtendedStore(PublishRequest publishRequest, long messageAttributeSize) {
String messageContentStr = publishRequest.getMessage();
String messageContentStr = publishRequest.message();
Long messageContentSize = Util.getStringSizeInBytes(messageContentStr);

PublishRequest.Builder publishRequestBuilder = publishRequest.toBuilder();
String largeMessagePointer = payloadStore.storeOriginalPayload(messageContentStr,
messageContentSize);
publishRequest.setMessage(largeMessagePointer);
publishRequestBuilder.message(largeMessagePointer);

MessageAttributeValue.Builder messageAttributeValueBuilder = MessageAttributeValue.builder();
messageAttributeValueBuilder.dataType("Number");
messageAttributeValueBuilder.stringValue(messageContentSize.toString());
MessageAttributeValue messageAttributeValue = messageAttributeValueBuilder.build();

MessageAttributeValue messageAttributeValue = new MessageAttributeValue();
messageAttributeValue.setDataType("Number");
messageAttributeValue.setStringValue(messageContentSize.toString());
publishRequest.addMessageAttributesEntry(SQSExtendedClientConstants.RESERVED_ATTRIBUTE_NAME, messageAttributeValue);
Map<String, MessageAttributeValue> attributes = new HashMap<>(publishRequest.messageAttributes());
attributes.put(SQSExtendedClientConstants.RESERVED_ATTRIBUTE_NAME, messageAttributeValue);
publishRequestBuilder.messageAttributes(attributes);

messageAttributeSize += getMessageAttributeSize(SQSExtendedClientConstants.RESERVED_ATTRIBUTE_NAME, messageAttributeValue);
checkSizeOfMessageAttributes(messageAttributeSize);

return publishRequest;
return publishRequestBuilder.build();
}

private PublishRequest copyPublishRequest(PublishRequest publishRequest) {
// We only modify Message and MessageAttributes, to avoid performance impact let's shallow-copy
// the request and then copy the MessageAttributes map.
PublishRequest clonedPublishRequest = publishRequest.clone();
Map<String, MessageAttributeValue> attributes = new HashMap<>(publishRequest.getMessageAttributes());
clonedPublishRequest.setMessageAttributes(attributes);
return clonedPublishRequest;
PublishRequest.Builder publishRequestBuilder = publishRequest.toBuilder();
Map<String, MessageAttributeValue> attributes = new HashMap<>(publishRequest.messageAttributes());
publishRequestBuilder.messageAttributes(attributes);
return publishRequestBuilder.build();
}
}
Loading