-
Notifications
You must be signed in to change notification settings - Fork 90
/
Copy pathSqsMessageSender.java
79 lines (62 loc) · 3.13 KB
/
SqsMessageSender.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package org.demo.sqs;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.ScheduledEvent;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.joda.JodaModule;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
import software.amazon.lambda.powertools.logging.Logging;
import software.amazon.lambda.powertools.logging.LoggingUtils;
import java.security.SecureRandom;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.stream.IntStream;
import static java.util.stream.Collectors.toList;
public class SqsMessageSender implements RequestHandler<ScheduledEvent, String> {
private static final Logger log = LogManager.getLogger(SqsMessageSender.class);
private static final SqsClient sqsClient = SqsClient.builder()
.httpClient(UrlConnectionHttpClient.create())
.build();
private static final Random random = new SecureRandom();
private static final ObjectMapper objectMapper;
static {
objectMapper = new ObjectMapper();
objectMapper.registerModule(new JodaModule());
LoggingUtils.defaultObjectMapper(objectMapper);
}
@Logging(logEvent = true)
public String handleRequest(final ScheduledEvent input, final Context context) {
String queueUrl = System.getenv("QUEUE_URL");
// Push 5 messages on each invoke.
List<SendMessageBatchRequestEntry> batchRequestEntries = IntStream.range(0, 5)
.mapToObj(value -> {
Map<String, MessageAttributeValue> attributeValueHashMap = new HashMap<>();
attributeValueHashMap.put("Key" + value, MessageAttributeValue.builder()
.dataType("String")
.stringValue("Value" + value)
.build());
byte[] array = new byte[7];
random.nextBytes(array);
return SendMessageBatchRequestEntry.builder()
.messageAttributes(attributeValueHashMap)
.id(input.getId() + value)
.messageBody("Sample Message " + value)
.build();
}).collect(toList());
SendMessageBatchResponse sendMessageBatchResponse = sqsClient.sendMessageBatch(SendMessageBatchRequest.builder()
.queueUrl(queueUrl)
.entries(batchRequestEntries)
.build());
log.info("Sent Message {}", sendMessageBatchResponse);
return "Success";
}
}