Skip to content

Commit 1a074e7

Browse files
author
Pankaj Agrawal
committed
chore(performance): Build queue url from queue arn instead of API call
1 parent 473c11b commit 1a074e7

File tree

3 files changed

+75
-39
lines changed

3 files changed

+75
-39
lines changed

powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/BatchContext.java

+22-20
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse;
1919
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest;
2020
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesResponse;
21-
import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest;
2221
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
2322
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
2423
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
@@ -28,12 +27,12 @@
2827

2928
import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
3029
import static java.lang.String.format;
30+
import static java.util.Optional.ofNullable;
3131
import static java.util.stream.Collectors.toList;
3232

3333
public final class BatchContext {
3434
private static final Logger LOG = LoggerFactory.getLogger(BatchContext.class);
35-
private static final Map<String, String> queueArnToQueueUrlMapping = new HashMap<>();
36-
private static final Map<String, String> queueArnToDlqUrlMapping = new HashMap<>();
35+
private static final Map<String, String> QUEUE_ARN_TO_DLQ_URL_MAPPING = new HashMap<>();
3736

3837
private final Map<SQSMessage, Exception> messageToException = new HashMap<>();
3938
private final List<SQSMessage> success = new ArrayList<>();
@@ -91,6 +90,10 @@ public final <T> void processSuccessAndHandleFailed(final List<T> successReturns
9190

9291
deleteMessagesFromQueue(messagesToBeDeleted);
9392

93+
if (failedMessages.isEmpty()) {
94+
return;
95+
}
96+
9497
if (suppressException) {
9598
List<String> messageIds = failedMessages.stream().
9699
map(SQSMessage::getMessageId)
@@ -148,21 +151,27 @@ private boolean moveNonRetryableMessagesToDlqIfConfigured(Map<SQSMessage, Except
148151
private Optional<String> fetchDlqUrl(Map<SQSMessage, Exception> nonRetryableMessageToException) {
149152
return nonRetryableMessageToException.keySet().stream()
150153
.findFirst()
151-
.map(sqsMessage -> queueArnToDlqUrlMapping.computeIfAbsent(sqsMessage.getEventSourceArn(), sourceArn -> {
154+
.map(sqsMessage -> QUEUE_ARN_TO_DLQ_URL_MAPPING.computeIfAbsent(sqsMessage.getEventSourceArn(), sourceArn -> {
152155
String queueUrl = url(sourceArn);
153156

154157
GetQueueAttributesResponse queueAttributes = client.getQueueAttributes(GetQueueAttributesRequest.builder()
155158
.attributeNames(QueueAttributeName.REDRIVE_POLICY)
156159
.queueUrl(queueUrl)
157160
.build());
158161

159-
try {
160-
JsonNode jsonNode = SqsUtils.objectMapper().readTree(queueAttributes.attributes().get(QueueAttributeName.REDRIVE_POLICY));
161-
return url(jsonNode.get("deadLetterTargetArn").asText());
162-
} catch (JsonProcessingException e) {
163-
LOG.debug("Unable to parse Re drive policy for queue {}. Even if DLQ exists, failed messages will be send back to main queue.", queueUrl, e);
164-
return null;
165-
}
162+
return ofNullable(queueAttributes.attributes().get(QueueAttributeName.REDRIVE_POLICY))
163+
.map(policy -> {
164+
try {
165+
return SqsUtils.objectMapper().readTree(policy);
166+
} catch (JsonProcessingException e) {
167+
LOG.debug("Unable to parse Re drive policy for queue {}. Even if DLQ exists, failed messages will be send back to main queue.", queueUrl, e);
168+
return null;
169+
}
170+
})
171+
.map(node -> node.get("deadLetterTargetArn"))
172+
.map(JsonNode::asText)
173+
.map(this::url)
174+
.orElse(null);
166175
}));
167176
}
168177

@@ -186,14 +195,7 @@ private void deleteMessagesFromQueue(final List<SQSMessage> messages) {
186195
}
187196

188197
private String url(String queueArn) {
189-
return queueArnToQueueUrlMapping.computeIfAbsent(queueArn, s -> {
190-
String[] arnArray = queueArn.split(":");
191-
192-
return client.getQueueUrl(GetQueueUrlRequest.builder()
193-
.queueOwnerAWSAccountId(arnArray[4])
194-
.queueName(arnArray[5])
195-
.build())
196-
.queueUrl();
197-
});
198+
String[] arnArray = queueArn.split(":");
199+
return String.format("https://sqs.%s.amazonaws.com/%s/%s", arnArray[3], arnArray[4], arnArray[5]);
198200
}
199201
}

powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/SqsUtilsBatchProcessorTest.java

+24-12
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,23 @@
11
package software.amazon.lambda.powertools.sqs;
22

33
import java.io.IOException;
4+
import java.util.HashMap;
45
import java.util.List;
6+
import java.util.function.Consumer;
57

68
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
79
import com.fasterxml.jackson.databind.ObjectMapper;
10+
import org.assertj.core.api.Assertions;
811
import org.junit.jupiter.api.BeforeEach;
912
import org.junit.jupiter.api.Test;
1013
import org.junit.jupiter.params.ParameterizedTest;
1114
import org.junit.jupiter.params.provider.ValueSource;
1215
import org.mockito.ArgumentCaptor;
1316
import software.amazon.awssdk.services.sqs.SqsClient;
1417
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
15-
import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest;
16-
import software.amazon.awssdk.services.sqs.model.GetQueueUrlResponse;
18+
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest;
19+
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesResponse;
20+
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
1721

1822
import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
1923
import static org.assertj.core.api.Assertions.assertThat;
@@ -39,11 +43,6 @@ class SqsUtilsBatchProcessorTest {
3943
void setUp() throws IOException {
4044
reset(sqsClient, interactionClient);
4145
event = MAPPER.readValue(this.getClass().getResource("/sampleSqsBatchEvent.json"), SQSEvent.class);
42-
43-
when(sqsClient.getQueueUrl(any(GetQueueUrlRequest.class))).thenReturn(GetQueueUrlResponse.builder()
44-
.queueUrl("test")
45-
.build());
46-
4746
overrideSqsClient(sqsClient);
4847
}
4948

@@ -107,14 +106,12 @@ void shouldBatchProcessAndDeleteSuccessMessageOnPartialFailures() {
107106
});
108107

109108
verify(interactionClient).listQueues();
110-
verify(sqsClient).deleteMessageBatch(any(DeleteMessageBatchRequest.class));
111109

112-
ArgumentCaptor<GetQueueUrlRequest> captor = ArgumentCaptor.forClass(GetQueueUrlRequest.class);
113-
verify(sqsClient).getQueueUrl(captor.capture());
110+
ArgumentCaptor<DeleteMessageBatchRequest> captor = ArgumentCaptor.forClass(DeleteMessageBatchRequest.class);
111+
verify(sqsClient).deleteMessageBatch(captor.capture());
114112

115113
assertThat(captor.getValue())
116-
.hasFieldOrPropertyWithValue("queueName", "my-queue")
117-
.hasFieldOrPropertyWithValue("queueOwnerAWSAccountId", "123456789012");
114+
.hasFieldOrPropertyWithValue("queueUrl", "https://sqs.us-east-2.amazonaws.com/123456789012/my-queue");
118115
}
119116

120117
@Test
@@ -219,6 +216,16 @@ public String process(SQSMessage message) {
219216
@Test
220217
void shouldBatchProcessAndMoveNonRetryableExceptionToDlq() {
221218
String failedId = "2e1424d4-f796-459a-8184-9c92662be6da";
219+
HashMap<QueueAttributeName, String> attributes = new HashMap<>();
220+
221+
attributes.put(QueueAttributeName.REDRIVE_POLICY, "{\n" +
222+
" \"deadLetterTargetArn\": \"arn:aws:sqs:us-east-2:123456789012:retry-queue\",\n" +
223+
" \"maxReceiveCount\": 2\n" +
224+
"}");
225+
226+
when(sqsClient.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(GetQueueAttributesResponse.builder()
227+
.attributes(attributes)
228+
.build());
222229

223230
List<String> batchProcessor = batchProcessor(event, (message) -> {
224231
if (failedId.equals(message.getMessageId())) {
@@ -228,6 +235,11 @@ void shouldBatchProcessAndMoveNonRetryableExceptionToDlq() {
228235
interactionClient.listQueues();
229236
return "Success";
230237
}, IllegalStateException.class, IllegalArgumentException.class);
238+
239+
Assertions.assertThat(batchProcessor)
240+
.hasSize(1);
241+
242+
verify(sqsClient).sendMessageBatch(any(Consumer.class));
231243
}
232244

233245
public class FailureSampleInnerSqsHandler implements SqsMessageHandler<String> {

powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspectTest.java

+29-7
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package software.amazon.lambda.powertools.sqs.internal;
22

33
import java.io.IOException;
4+
import java.util.HashMap;
45
import java.util.Random;
6+
import java.util.function.Consumer;
57

68
import com.amazonaws.services.lambda.runtime.Context;
79
import com.amazonaws.services.lambda.runtime.RequestHandler;
@@ -12,13 +14,15 @@
1214
import org.junit.jupiter.api.Test;
1315
import software.amazon.awssdk.services.sqs.SqsClient;
1416
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
15-
import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest;
16-
import software.amazon.awssdk.services.sqs.model.GetQueueUrlResponse;
17+
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest;
18+
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesResponse;
19+
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
1720
import software.amazon.lambda.powertools.sqs.SQSBatchProcessingException;
1821
import software.amazon.lambda.powertools.sqs.handlers.LambdaHandlerApiGateway;
1922
import software.amazon.lambda.powertools.sqs.handlers.PartialBatchFailureSuppressedHandler;
2023
import software.amazon.lambda.powertools.sqs.handlers.PartialBatchPartialFailureHandler;
2124
import software.amazon.lambda.powertools.sqs.handlers.PartialBatchSuccessHandler;
25+
import software.amazon.lambda.powertools.sqs.handlers.SqsMessageHandlerWithNonRetryableHandler;
2226

2327
import static org.assertj.core.api.Assertions.assertThat;
2428
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
@@ -48,11 +52,6 @@ void setUp() throws IOException {
4852
reset(sqsClient);
4953
setupContext();
5054
event = MAPPER.readValue(this.getClass().getResource("/sampleSqsBatchEvent.json"), SQSEvent.class);
51-
52-
when(sqsClient.getQueueUrl(any(GetQueueUrlRequest.class))).thenReturn(GetQueueUrlResponse.builder()
53-
.queueUrl("test")
54-
.build());
55-
5655
requestHandler = new PartialBatchSuccessHandler();
5756
}
5857

@@ -109,6 +108,29 @@ void shouldNotTakeEffectOnNonSqsEventHandler() {
109108
verifyNoInteractions(sqsClient);
110109
}
111110

111+
@Test
112+
void shouldBatchProcessAndMoveNonRetryableExceptionToDlq() {
113+
requestHandler = new SqsMessageHandlerWithNonRetryableHandler();
114+
event.getRecords().get(0).setMessageId("");
115+
116+
HashMap<QueueAttributeName, String> attributes = new HashMap<>();
117+
118+
attributes.put(QueueAttributeName.REDRIVE_POLICY, "{\n" +
119+
" \"deadLetterTargetArn\": \"arn:aws:sqs:us-east-2:123456789012:retry-queue\",\n" +
120+
" \"maxReceiveCount\": 2\n" +
121+
"}");
122+
123+
when(sqsClient.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(GetQueueAttributesResponse.builder()
124+
.attributes(attributes)
125+
.build());
126+
127+
requestHandler.handleRequest(event, context);
128+
129+
verify(mockedRandom).nextInt();
130+
verify(sqsClient).deleteMessageBatch(any(DeleteMessageBatchRequest.class));
131+
verify(sqsClient).sendMessageBatch(any(Consumer.class));
132+
}
133+
112134
private void setupContext() {
113135
when(context.getFunctionName()).thenReturn("testFunction");
114136
when(context.getInvokedFunctionArn()).thenReturn("testArn");

0 commit comments

Comments
 (0)