@@ -47,6 +47,28 @@ public AmazonSNSExtendedClient(SnsClient snsClient, SNSExtendedClientConfigurati
47
47
this .payloadStore = new S3BackedPayloadStore (s3Dao , this .snsExtendedClientConfiguration .getS3BucketName ());
48
48
}
49
49
50
+ /**
51
+ * Constructs a new Amazon SNS extended client to invoke service methods on
52
+ * Amazon SNS with extended functionality using the specified Amazon SNS
53
+ * client object and Payload Store object.
54
+ * <p>
55
+ * <p>
56
+ * All service calls made using this new client object are blocking, and
57
+ * will not return until the service call completes.
58
+ *
59
+ * @param snsClient The Amazon SNS client to use to connect to Amazon SNS.
60
+ * @param snsExtendedClientConfiguration The sns extended client configuration options controlling the
61
+ * functionality of this client.
62
+ * @param payloadStore The Payload Store that handles logic for saving to the desired
63
+ * extended storage.
64
+ */
65
+ public AmazonSNSExtendedClient (AmazonSNS snsClient , SNSExtendedClientConfiguration snsExtendedClientConfiguration , PayloadStore payloadStore ) {
66
+ super (snsClient );
67
+
68
+ this .snsExtendedClientConfiguration = snsExtendedClientConfiguration ;
69
+ this .payloadStore = payloadStore ;
70
+ }
71
+
50
72
/**
51
73
* <p>
52
74
* Sends a message to an Amazon SNS topic, a text message (SMS message) directly to a phone number, or a message to
@@ -212,9 +234,18 @@ private long getMessageAttributeSize(String MessageAttributeKey, MessageAttribut
212
234
return messageAttributeSize ;
213
235
}
214
236
237
+ private static String getS3keyAttribute (Map <String , MessageAttributeValue > messageAttributes ) {
238
+ if (messageAttributes != null && messageAttributes .containsKey (S3_KEY )) {
239
+ MessageAttributeValue attributeS3KeyValue = messageAttributes .get (S3_KEY );
240
+ return (attributeS3KeyValue == null ) ? null : attributeS3KeyValue .getStringValue ();
241
+ }
242
+ return null ;
243
+ }
244
+
215
245
private PublishRequest storeMessageInExtendedStore (PublishRequest publishRequest , long messageAttributeSize ) {
216
246
String messageContentStr = publishRequest .message ();
217
247
Long messageContentSize = Util .getStringSizeInBytes (messageContentStr );
248
+ String s3Key = getS3keyAttribute (publishRequest .getMessageAttributes ()) ;
218
249
219
250
PublishRequest .Builder publishRequestBuilder = publishRequest .toBuilder ();
220
251
String largeMessagePointer = payloadStore .storeOriginalPayload (messageContentStr );
@@ -225,6 +256,7 @@ private PublishRequest storeMessageInExtendedStore(PublishRequest publishRequest
225
256
messageAttributeValueBuilder .stringValue (messageContentSize .toString ());
226
257
MessageAttributeValue messageAttributeValue = messageAttributeValueBuilder .build ();
227
258
259
+
228
260
Map <String , MessageAttributeValue > attributes = new HashMap <>(publishRequest .messageAttributes ());
229
261
attributes .put (SQSExtendedClientConstants .RESERVED_ATTRIBUTE_NAME , messageAttributeValue );
230
262
publishRequestBuilder .messageAttributes (attributes );
0 commit comments