Skip to content

Upgrade to AWS SDK v2 #6

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Oct 5, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>software.amazon.sns</groupId>
<artifactId>sns-extended-client</artifactId>
<version>1.0.0</version>
<version>2.0.0</version>
<packaging>jar</packaging>
<name>Amazon SNS Extended Client Library for Java</name>
<description>An extension to the Amazon SNS client that enables sending messages up to 2GB via Amazon S3.
Expand All @@ -26,7 +26,7 @@
</licenses>

<properties>
<aws-java-sdk.version>1.11.817</aws-java-sdk.version>
<aws-java-sdk.version>2.14.19</aws-java-sdk.version>
</properties>

<developers>
Expand All @@ -53,13 +53,13 @@
<type>jar</type>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sns</artifactId>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sns</artifactId>
<version>${aws-java-sdk.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<version>${aws-java-sdk.version}</version>
</dependency>
<dependency>
Expand Down
80 changes: 39 additions & 41 deletions src/main/java/software/amazon/sns/AmazonSNSExtendedClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,25 @@

import com.amazon.sqs.javamessaging.SQSExtendedClientConstants;
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.sns.AmazonSNS;
import com.amazonaws.services.sns.model.*;
import com.amazonaws.util.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.sns.SnsClient;
import software.amazon.awssdk.services.sns.model.*;
import software.amazon.payloadoffloading.*;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;

public class AmazonSNSExtendedClient extends AmazonSNSExtendedClientBase {
static final String MULTIPLE_PROTOCOL_MESSAGE_STRUCTURE = "json";
static final String USER_AGENT_HEADER_NAME = "User-Agent";
static final String USER_AGENT_HEADER = Util.getUserAgentHeader(AmazonSNSExtendedClient.class.getSimpleName());

private static final Log LOGGER = LogFactory.getLog(AmazonSNSExtendedClient.class);
private static final String USER_AGENT_HEADER = Util.getUserAgentHeader(AmazonSNSExtendedClient.class.getSimpleName());
private PayloadStore payloadStore;
private SNSExtendedClientConfiguration snsExtendedClientConfiguration;

Expand All @@ -34,7 +37,7 @@ public class AmazonSNSExtendedClient extends AmazonSNSExtendedClientBase {
* @param snsExtendedClientConfiguration The sns extended client configuration options controlling the
* functionality of this client.
*/
public AmazonSNSExtendedClient(AmazonSNS snsClient, SNSExtendedClientConfiguration snsExtendedClientConfiguration) {
public AmazonSNSExtendedClient(SnsClient snsClient, SNSExtendedClientConfiguration snsExtendedClientConfiguration) {
super(snsClient);

this.snsExtendedClientConfiguration = snsExtendedClientConfiguration;
Expand Down Expand Up @@ -79,28 +82,30 @@ public AmazonSNSExtendedClient(AmazonSNS snsClient, SNSExtendedClientConfigurati
* Documentation</a>
*/
@Override
public PublishResult publish(PublishRequest publishRequest) {
if (publishRequest == null || StringUtils.isNullOrEmpty(publishRequest.getMessage())) {
public PublishResponse publish(PublishRequest publishRequest) {
if (publishRequest == null || StringUtils.isNullOrEmpty(publishRequest.message())) {
return super.publish(publishRequest);
}

if (!StringUtils.isNullOrEmpty(publishRequest.getMessageStructure()) &&
publishRequest.getMessageStructure().equals(MULTIPLE_PROTOCOL_MESSAGE_STRUCTURE)) {
if (!StringUtils.isNullOrEmpty(publishRequest.messageStructure()) &&
publishRequest.messageStructure().equals(MULTIPLE_PROTOCOL_MESSAGE_STRUCTURE)) {
String errorMessage = "SNS extended client does not support sending JSON messages.";
LOGGER.error(errorMessage);
throw new AmazonClientException(errorMessage);
}

publishRequest.getRequestClientOptions().appendUserAgent(USER_AGENT_HEADER);
PublishRequest.Builder publishRequestBuilder = publishRequest.toBuilder();
publishRequestBuilder.overrideConfiguration(AwsRequestOverrideConfiguration.builder().putHeader(USER_AGENT_HEADER_NAME, USER_AGENT_HEADER).build());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure if there was a better way to handle this. Please advise here

publishRequest = publishRequestBuilder.build();

long messageAttributesSize = getMsgAttributesSize(publishRequest.getMessageAttributes());
long messageBodySize = Util.getStringSizeInBytes(publishRequest.getMessage());
long messageAttributesSize = getMsgAttributesSize(publishRequest.messageAttributes());
long messageBodySize = Util.getStringSizeInBytes(publishRequest.message());

if (!shouldExtendedStoreBeUsed(messageAttributesSize + messageBodySize)) {
return super.publish(publishRequest);
}

checkMessageAttributes(publishRequest.getMessageAttributes());
checkMessageAttributes(publishRequest.messageAttributes());
checkSizeOfMessageAttributes(messageAttributesSize);

PublishRequest clonedPublishRequest = copyPublishRequest(publishRequest);
Expand All @@ -109,18 +114,6 @@ public PublishResult publish(PublishRequest publishRequest) {
return super.publish(publishRequest);
}

/**
* Simplified method form for invoking the Publish operation.
*
* @param topicArn
* @param message
* @see #publish(PublishRequest)
*/
@Override
public PublishResult publish(String topicArn, String message) {
return this.publish((new PublishRequest()).withTopicArn(topicArn).withMessage(message));
}

private boolean shouldExtendedStoreBeUsed(long totalMessageSize) {
return snsExtendedClientConfiguration.isAlwaysThroughS3() ||
(snsExtendedClientConfiguration.isPayloadSupportEnabled() && isTotalMessageSizeLargerThanThreshold(totalMessageSize));
Expand Down Expand Up @@ -173,48 +166,53 @@ private int getMsgAttributesSize(Map<String, MessageAttributeValue> msgAttribute
private long getMessageAttributeSize(String MessageAttributeKey, MessageAttributeValue value) {
long messageAttributeSize = Util.getStringSizeInBytes(MessageAttributeKey);

if (value.getDataType() != null) {
messageAttributeSize += Util.getStringSizeInBytes(value.getDataType());
if (value.dataType() != null) {
messageAttributeSize += Util.getStringSizeInBytes(value.dataType());
}

String stringVal = value.getStringValue();
String stringVal = value.stringValue();
if (stringVal != null) {
messageAttributeSize += Util.getStringSizeInBytes(stringVal);
}

ByteBuffer binaryVal = value.getBinaryValue();
SdkBytes binaryVal = value.binaryValue();
if (binaryVal != null) {
messageAttributeSize += binaryVal.array().length;
messageAttributeSize += binaryVal.asByteArray().length;
}

return messageAttributeSize;
}

private PublishRequest storeMessageInExtendedStore(PublishRequest publishRequest, long messageAttributeSize) {
String messageContentStr = publishRequest.getMessage();
String messageContentStr = publishRequest.message();
Long messageContentSize = Util.getStringSizeInBytes(messageContentStr);

PublishRequest.Builder publishRequestBuilder = publishRequest.toBuilder();
String largeMessagePointer = payloadStore.storeOriginalPayload(messageContentStr,
messageContentSize);
publishRequest.setMessage(largeMessagePointer);
publishRequestBuilder.message(largeMessagePointer);

MessageAttributeValue.Builder messageAttributeValueBuilder = MessageAttributeValue.builder();
messageAttributeValueBuilder.dataType("Number");
messageAttributeValueBuilder.stringValue(messageContentSize.toString());
MessageAttributeValue messageAttributeValue = messageAttributeValueBuilder.build();

MessageAttributeValue messageAttributeValue = new MessageAttributeValue();
messageAttributeValue.setDataType("Number");
messageAttributeValue.setStringValue(messageContentSize.toString());
publishRequest.addMessageAttributesEntry(SQSExtendedClientConstants.RESERVED_ATTRIBUTE_NAME, messageAttributeValue);
Map<String, MessageAttributeValue> attributes = new HashMap<>(publishRequest.messageAttributes());
attributes.put(SQSExtendedClientConstants.RESERVED_ATTRIBUTE_NAME, messageAttributeValue);
publishRequestBuilder.messageAttributes(attributes);

messageAttributeSize += getMessageAttributeSize(SQSExtendedClientConstants.RESERVED_ATTRIBUTE_NAME, messageAttributeValue);
checkSizeOfMessageAttributes(messageAttributeSize);

return publishRequest;
return publishRequestBuilder.build();
}

private PublishRequest copyPublishRequest(PublishRequest publishRequest) {
// We only modify Message and MessageAttributes, to avoid performance impact let's shallow-copy
// the request and then copy the MessageAttributes map.
PublishRequest clonedPublishRequest = publishRequest.clone();
Map<String, MessageAttributeValue> attributes = new HashMap<>(publishRequest.getMessageAttributes());
clonedPublishRequest.setMessageAttributes(attributes);
return clonedPublishRequest;
PublishRequest.Builder publishRequestBuilder = publishRequest.toBuilder();
Map<String, MessageAttributeValue> attributes = new HashMap<>(publishRequest.messageAttributes());
publishRequestBuilder.messageAttributes(attributes);
return publishRequestBuilder.build();
}
}
Loading