-
Notifications
You must be signed in to change notification settings - Fork 14
Upgrade to AWS SDK v2 #6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 2 commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
b4326be
Upgrade to AWS SDK v2
mngo87 227ed8b
update comments
mngo87 c5e3a74
update payload offloading and address comments
mngo87 72ba71d
Versions updated in pom.xml (sqs-extended-client)
agummi-amazon fb4d1f2
Merge branch 'main' into sdkv2
agummi-amazon b06cf4d
Update AmazonSNSExtendedClient.java
agummi-amazon File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,22 +2,27 @@ | |
|
||
import com.amazon.sqs.javamessaging.SQSExtendedClientConstants; | ||
import com.amazonaws.AmazonClientException; | ||
import com.amazonaws.services.sns.AmazonSNS; | ||
import com.amazonaws.services.sns.model.*; | ||
import com.amazonaws.util.StringUtils; | ||
import org.apache.commons.logging.Log; | ||
import org.apache.commons.logging.LogFactory; | ||
|
||
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration; | ||
import software.amazon.awssdk.core.SdkBytes; | ||
import software.amazon.awssdk.core.exception.SdkClientException; | ||
import software.amazon.awssdk.core.exception.SdkException; | ||
import software.amazon.awssdk.services.sns.SnsClient; | ||
import software.amazon.awssdk.services.sns.model.*; | ||
import software.amazon.payloadoffloading.*; | ||
|
||
import java.nio.ByteBuffer; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
|
||
public class AmazonSNSExtendedClient extends AmazonSNSExtendedClientBase { | ||
static final String MULTIPLE_PROTOCOL_MESSAGE_STRUCTURE = "json"; | ||
static final String USER_AGENT_HEADER_NAME = "User-Agent"; | ||
static final String USER_AGENT_HEADER = Util.getUserAgentHeader(AmazonSNSExtendedClient.class.getSimpleName()); | ||
|
||
private static final Log LOGGER = LogFactory.getLog(AmazonSNSExtendedClient.class); | ||
private static final String USER_AGENT_HEADER = Util.getUserAgentHeader(AmazonSNSExtendedClient.class.getSimpleName()); | ||
private PayloadStore payloadStore; | ||
private SNSExtendedClientConfiguration snsExtendedClientConfiguration; | ||
|
||
|
@@ -34,7 +39,7 @@ public class AmazonSNSExtendedClient extends AmazonSNSExtendedClientBase { | |
* @param snsExtendedClientConfiguration The sns extended client configuration options controlling the | ||
* functionality of this client. | ||
*/ | ||
public AmazonSNSExtendedClient(AmazonSNS snsClient, SNSExtendedClientConfiguration snsExtendedClientConfiguration) { | ||
public AmazonSNSExtendedClient(SnsClient snsClient, SNSExtendedClientConfiguration snsExtendedClientConfiguration) { | ||
super(snsClient); | ||
|
||
this.snsExtendedClientConfiguration = snsExtendedClientConfiguration; | ||
|
@@ -44,7 +49,8 @@ public AmazonSNSExtendedClient(AmazonSNS snsClient, SNSExtendedClientConfigurati | |
|
||
/** | ||
* <p> | ||
* Sends a message to an Amazon SNS topic or sends a text message (SMS message) directly to a phone number. | ||
* Sends a message to an Amazon SNS topic, a text message (SMS message) directly to a phone number, or a message to | ||
* a mobile platform endpoint (when you specify the <code>TargetArn</code>). | ||
* </p> | ||
* <p> | ||
* If you send a message to a topic, Amazon SNS delivers the message to each endpoint that is subscribed to the | ||
|
@@ -61,9 +67,14 @@ public AmazonSNSExtendedClient(AmazonSNS snsClient, SNSExtendedClientConfigurati | |
* </p> | ||
* <p> | ||
* For more information about formatting messages, see <a | ||
* href="http://docs.aws.amazon.com/sns/latest/dg/mobile-push-send-custommessage.html">Send Custom Platform-Specific | ||
* Payloads in Messages to Mobile Devices</a>. | ||
* href="https://docs.aws.amazon.com/sns/latest/dg/mobile-push-send-custommessage.html">Send Custom | ||
* Platform-Specific Payloads in Messages to Mobile Devices</a>. | ||
* </p> | ||
* <important> | ||
* <p> | ||
* You can publish messages only to topics and endpoints in the same AWS Region. | ||
* </p> | ||
* </important> | ||
* | ||
* @param publishRequest Input for Publish action. | ||
* @return Result of the Publish operation returned by the service. | ||
|
@@ -74,33 +85,52 @@ public AmazonSNSExtendedClient(AmazonSNS snsClient, SNSExtendedClientConfigurati | |
* @throws EndpointDisabledException Exception error indicating endpoint disabled. | ||
* @throws PlatformApplicationDisabledException Exception error indicating platform application disabled. | ||
* @throws AuthorizationErrorException Indicates that the user has been denied access to the requested resource. | ||
* @sample AmazonSNS.Publish | ||
* @throws KmsDisabledException The request was rejected because the specified customer master key (CMK) isn't enabled. | ||
* @throws KmsInvalidStateException The request was rejected because the state of the specified resource isn't valid for this request. For | ||
* more information, see <a href="https://docs.aws.amazon.com/kms/latest/developerguide/key-state.html">How | ||
* Key State Affects Use of a Customer Master Key</a> in the <i>AWS Key Management Service Developer | ||
* Guide</i>. | ||
* @throws KmsNotFoundException The request was rejected because the specified entity or resource can't be found. | ||
* @throws KmsOptInRequiredException The AWS access key ID needs a subscription for the service. | ||
* @throws KmsThrottlingException The request was denied due to request throttling. For more information about throttling, see <a | ||
* href="https://docs.aws.amazon.com/kms/latest/developerguide/limits.html#requests-per-second">Limits</a> | ||
* in the <i>AWS Key Management Service Developer Guide.</i> | ||
* @throws KmsAccessDeniedException The ciphertext references a key that doesn't exist or that you don't have access to. | ||
* @throws InvalidSecurityException The credential signature isn't valid. You must use an HTTPS endpoint and sign your request using | ||
* Signature Version 4. | ||
* @throws SdkException Base class for all exceptions that can be thrown by the SDK (both service and client). Can be used for | ||
* catch all scenarios. | ||
* @throws SdkClientException If any client side error occurs such as an IO related failure, failure to get credentials, etc. | ||
* @throws SnsException Base class for all service exceptions. Unknown exceptions will be thrown as an instance of this type. | ||
* @sample SnsClient.Publish | ||
* @see <a href="http://docs.aws.amazon.com/goto/WebAPI/sns-2010-03-31/Publish" target="_top">AWS API | ||
* Documentation</a> | ||
mngo87 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
*/ | ||
@Override | ||
public PublishResult publish(PublishRequest publishRequest) { | ||
if (publishRequest == null || StringUtils.isNullOrEmpty(publishRequest.getMessage())) { | ||
public PublishResponse publish(PublishRequest publishRequest) { | ||
if (publishRequest == null || StringUtils.isNullOrEmpty(publishRequest.message())) { | ||
return super.publish(publishRequest); | ||
} | ||
|
||
if (!StringUtils.isNullOrEmpty(publishRequest.getMessageStructure()) && | ||
publishRequest.getMessageStructure().equals(MULTIPLE_PROTOCOL_MESSAGE_STRUCTURE)) { | ||
if (!StringUtils.isNullOrEmpty(publishRequest.messageStructure()) && | ||
publishRequest.messageStructure().equals(MULTIPLE_PROTOCOL_MESSAGE_STRUCTURE)) { | ||
String errorMessage = "SNS extended client does not support sending JSON messages."; | ||
LOGGER.error(errorMessage); | ||
throw new AmazonClientException(errorMessage); | ||
} | ||
|
||
publishRequest.getRequestClientOptions().appendUserAgent(USER_AGENT_HEADER); | ||
PublishRequest.Builder publishRequestBuilder = publishRequest.toBuilder(); | ||
publishRequestBuilder.overrideConfiguration(AwsRequestOverrideConfiguration.builder().putHeader(USER_AGENT_HEADER_NAME, USER_AGENT_HEADER).build()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wasn't sure if there was a better way to handle this. Please advise here |
||
publishRequest = publishRequestBuilder.build(); | ||
|
||
long messageAttributesSize = getMsgAttributesSize(publishRequest.getMessageAttributes()); | ||
long messageBodySize = Util.getStringSizeInBytes(publishRequest.getMessage()); | ||
long messageAttributesSize = getMsgAttributesSize(publishRequest.messageAttributes()); | ||
long messageBodySize = Util.getStringSizeInBytes(publishRequest.message()); | ||
|
||
if (!shouldExtendedStoreBeUsed(messageAttributesSize + messageBodySize)) { | ||
return super.publish(publishRequest); | ||
} | ||
|
||
checkMessageAttributes(publishRequest.getMessageAttributes()); | ||
checkMessageAttributes(publishRequest.messageAttributes()); | ||
checkSizeOfMessageAttributes(messageAttributesSize); | ||
|
||
PublishRequest clonedPublishRequest = copyPublishRequest(publishRequest); | ||
|
@@ -109,18 +139,6 @@ public PublishResult publish(PublishRequest publishRequest) { | |
return super.publish(publishRequest); | ||
} | ||
|
||
/** | ||
* Simplified method form for invoking the Publish operation. | ||
* | ||
* @param topicArn | ||
* @param message | ||
* @see #publish(PublishRequest) | ||
*/ | ||
@Override | ||
public PublishResult publish(String topicArn, String message) { | ||
return this.publish((new PublishRequest()).withTopicArn(topicArn).withMessage(message)); | ||
} | ||
|
||
private boolean shouldExtendedStoreBeUsed(long totalMessageSize) { | ||
return snsExtendedClientConfiguration.isAlwaysThroughS3() || | ||
(snsExtendedClientConfiguration.isPayloadSupportEnabled() && isTotalMessageSizeLargerThanThreshold(totalMessageSize)); | ||
|
@@ -173,48 +191,53 @@ private int getMsgAttributesSize(Map<String, MessageAttributeValue> msgAttribute | |
private long getMessageAttributeSize(String MessageAttributeKey, MessageAttributeValue value) { | ||
long messageAttributeSize = Util.getStringSizeInBytes(MessageAttributeKey); | ||
|
||
if (value.getDataType() != null) { | ||
messageAttributeSize += Util.getStringSizeInBytes(value.getDataType()); | ||
if (value.dataType() != null) { | ||
messageAttributeSize += Util.getStringSizeInBytes(value.dataType()); | ||
} | ||
|
||
String stringVal = value.getStringValue(); | ||
String stringVal = value.stringValue(); | ||
if (stringVal != null) { | ||
messageAttributeSize += Util.getStringSizeInBytes(stringVal); | ||
} | ||
|
||
ByteBuffer binaryVal = value.getBinaryValue(); | ||
SdkBytes binaryVal = value.binaryValue(); | ||
if (binaryVal != null) { | ||
messageAttributeSize += binaryVal.array().length; | ||
messageAttributeSize += binaryVal.asByteArray().length; | ||
} | ||
|
||
return messageAttributeSize; | ||
} | ||
|
||
private PublishRequest storeMessageInExtendedStore(PublishRequest publishRequest, long messageAttributeSize) { | ||
String messageContentStr = publishRequest.getMessage(); | ||
String messageContentStr = publishRequest.message(); | ||
Long messageContentSize = Util.getStringSizeInBytes(messageContentStr); | ||
|
||
PublishRequest.Builder publishRequestBuilder = publishRequest.toBuilder(); | ||
String largeMessagePointer = payloadStore.storeOriginalPayload(messageContentStr, | ||
messageContentSize); | ||
publishRequest.setMessage(largeMessagePointer); | ||
publishRequestBuilder.message(largeMessagePointer); | ||
|
||
MessageAttributeValue.Builder messageAttributeValueBuilder = MessageAttributeValue.builder(); | ||
messageAttributeValueBuilder.dataType("Number"); | ||
messageAttributeValueBuilder.stringValue(messageContentSize.toString()); | ||
MessageAttributeValue messageAttributeValue = messageAttributeValueBuilder.build(); | ||
|
||
MessageAttributeValue messageAttributeValue = new MessageAttributeValue(); | ||
messageAttributeValue.setDataType("Number"); | ||
messageAttributeValue.setStringValue(messageContentSize.toString()); | ||
publishRequest.addMessageAttributesEntry(SQSExtendedClientConstants.RESERVED_ATTRIBUTE_NAME, messageAttributeValue); | ||
Map<String, MessageAttributeValue> attributes = new HashMap<>(publishRequest.messageAttributes()); | ||
attributes.put(SQSExtendedClientConstants.RESERVED_ATTRIBUTE_NAME, messageAttributeValue); | ||
publishRequestBuilder.messageAttributes(attributes); | ||
|
||
messageAttributeSize += getMessageAttributeSize(SQSExtendedClientConstants.RESERVED_ATTRIBUTE_NAME, messageAttributeValue); | ||
checkSizeOfMessageAttributes(messageAttributeSize); | ||
|
||
return publishRequest; | ||
return publishRequestBuilder.build(); | ||
} | ||
|
||
private PublishRequest copyPublishRequest(PublishRequest publishRequest) { | ||
// We only modify Message and MessageAttributes, to avoid performance impact let's shallow-copy | ||
// the request and then copy the MessageAttributes map. | ||
PublishRequest clonedPublishRequest = publishRequest.clone(); | ||
Map<String, MessageAttributeValue> attributes = new HashMap<>(publishRequest.getMessageAttributes()); | ||
clonedPublishRequest.setMessageAttributes(attributes); | ||
return clonedPublishRequest; | ||
PublishRequest.Builder publishRequestBuilder = publishRequest.toBuilder(); | ||
Map<String, MessageAttributeValue> attributes = new HashMap<>(publishRequest.messageAttributes()); | ||
publishRequestBuilder.messageAttributes(attributes); | ||
return publishRequestBuilder.build(); | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.