Skip to content

Commit 242736b

Browse files
author
Pankaj Agrawal
committed
feat(batch-processing): Support for moving non retryable msg to DLQ
1 parent 7ec6fd3 commit 242736b

File tree

6 files changed

+232
-29
lines changed

6 files changed

+232
-29
lines changed

powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsBatch.java

+4
Original file line numberDiff line numberDiff line change
@@ -59,4 +59,8 @@
5959
Class<? extends SqsMessageHandler<Object>> value();
6060

6161
boolean suppressException() default false;
62+
63+
Class<? extends Exception>[] nonRetryableExceptions() default {};
64+
65+
boolean deleteNonRetryableMessageFromQueue() default false;
6266
}

powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java

+42-3
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,13 @@ public static <R> List<R> batchProcessor(final SQSEvent event,
131131
return batchProcessor(event, false, handler);
132132
}
133133

134+
@SafeVarargs
135+
public static <R> List<R> batchProcessor(final SQSEvent event,
136+
final Class<? extends SqsMessageHandler<R>> handler,
137+
final Class<? extends Exception>... nonRetryableExceptions) {
138+
return batchProcessor(event, false, handler, nonRetryableExceptions);
139+
}
140+
134141
/**
135142
* This utility method is used to processes each {@link SQSMessage} inside received {@link SQSEvent}
136143
*
@@ -166,6 +173,16 @@ public static <R> List<R> batchProcessor(final SQSEvent event,
166173
return batchProcessor(event, suppressException, handlerInstance);
167174
}
168175

176+
@SafeVarargs
177+
public static <R> List<R> batchProcessor(final SQSEvent event,
178+
final boolean suppressException,
179+
final Class<? extends SqsMessageHandler<R>> handler,
180+
final Class<? extends Exception>... nonRetryableExceptions) {
181+
182+
SqsMessageHandler<R> handlerInstance = instantiatedHandler(handler);
183+
return batchProcessor(event, suppressException, handlerInstance, false, nonRetryableExceptions);
184+
}
185+
169186
/**
170187
* This utility method is used to processes each {@link SQSMessage} inside received {@link SQSEvent}
171188
*
@@ -199,6 +216,14 @@ public static <R> List<R> batchProcessor(final SQSEvent event,
199216
return batchProcessor(event, false, handler);
200217
}
201218

219+
@SafeVarargs
220+
public static <R> List<R> batchProcessor(final SQSEvent event,
221+
final SqsMessageHandler<R> handler,
222+
final Class<? extends Exception>... nonRetryableExceptions) {
223+
return batchProcessor(event, false, handler, false, nonRetryableExceptions);
224+
}
225+
226+
202227
/**
203228
* This utility method is used to processes each {@link SQSMessage} inside received {@link SQSEvent}
204229
*
@@ -229,6 +254,16 @@ public static <R> List<R> batchProcessor(final SQSEvent event,
229254
public static <R> List<R> batchProcessor(final SQSEvent event,
230255
final boolean suppressException,
231256
final SqsMessageHandler<R> handler) {
257+
return batchProcessor(event, suppressException, handler, false);
258+
259+
}
260+
261+
@SafeVarargs
262+
public static <R> List<R> batchProcessor(final SQSEvent event,
263+
final boolean suppressException,
264+
final SqsMessageHandler<R> handler,
265+
final boolean deleteNonRetryableMessageFromQueue,
266+
final Class<? extends Exception>... nonRetryableExceptions) {
232267
final List<R> handlerReturn = new ArrayList<>();
233268

234269
if(client == null) {
@@ -246,7 +281,7 @@ public static <R> List<R> batchProcessor(final SQSEvent event,
246281
}
247282
}
248283

249-
batchContext.processSuccessAndHandleFailed(handlerReturn, suppressException);
284+
batchContext.processSuccessAndHandleFailed(handlerReturn, suppressException, deleteNonRetryableMessageFromQueue, nonRetryableExceptions);
250285

251286
return handlerReturn;
252287
}
@@ -255,12 +290,12 @@ private static <R> SqsMessageHandler<R> instantiatedHandler(final Class<? extend
255290

256291
try {
257292
if (null == handler.getDeclaringClass()) {
258-
return handler.newInstance();
293+
return handler.getDeclaredConstructor().newInstance();
259294
}
260295

261296
final Constructor<? extends SqsMessageHandler<R>> constructor = handler.getDeclaredConstructor(handler.getDeclaringClass());
262297
constructor.setAccessible(true);
263-
return constructor.newInstance(handler.getDeclaringClass().newInstance());
298+
return constructor.newInstance(handler.getDeclaringClass().getDeclaredConstructor().newInstance());
264299
} catch (Exception e) {
265300
LOG.error("Failed creating handler instance", e);
266301
throw new RuntimeException("Unexpected error occurred. Please raise issue at " +
@@ -276,4 +311,8 @@ private static SQSMessage clonedMessage(final SQSMessage sqsMessage) {
276311
throw new RuntimeException(e);
277312
}
278313
}
314+
315+
public static ObjectMapper objectMapper() {
316+
return objectMapper;
317+
}
279318
}
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,43 @@
11
package software.amazon.lambda.powertools.sqs.internal;
22

3-
import org.slf4j.Logger;
4-
import org.slf4j.LoggerFactory;
5-
63
import java.util.ArrayList;
4+
import java.util.Arrays;
5+
import java.util.HashMap;
76
import java.util.List;
7+
import java.util.Map;
8+
import java.util.Optional;
89

10+
import com.fasterxml.jackson.core.JsonProcessingException;
11+
import com.fasterxml.jackson.databind.JsonNode;
12+
import org.slf4j.Logger;
13+
import org.slf4j.LoggerFactory;
14+
import software.amazon.awssdk.core.SdkBytes;
915
import software.amazon.awssdk.services.sqs.SqsClient;
1016
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
1117
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry;
1218
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse;
19+
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest;
20+
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesResponse;
1321
import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest;
22+
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
23+
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
24+
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
25+
import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
1426
import software.amazon.lambda.powertools.sqs.SQSBatchProcessingException;
27+
import software.amazon.lambda.powertools.sqs.SqsUtils;
1528

1629
import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
1730
import static java.lang.String.format;
1831
import static java.util.stream.Collectors.toList;
1932

2033
public final class BatchContext {
2134
private static final Logger LOG = LoggerFactory.getLogger(BatchContext.class);
35+
private static final Map<String, String> queueArnToQueueUrlMapping = new HashMap<>();
36+
private static final Map<String, String> queueArnToDlqUrlMapping = new HashMap<>();
2237

38+
private final Map<SQSMessage, Exception> messageToException = new HashMap<>();
2339
private final List<SQSMessage> success = new ArrayList<>();
24-
private final List<SQSMessage> failures = new ArrayList<>();
25-
private final List<Exception> exceptions = new ArrayList<>();
40+
2641
private final SqsClient client;
2742

2843
public BatchContext(SqsClient client) {
@@ -34,37 +49,132 @@ public void addSuccess(SQSMessage event) {
3449
}
3550

3651
public void addFailure(SQSMessage event, Exception e) {
37-
failures.add(event);
38-
exceptions.add(e);
52+
messageToException.put(event, e);
3953
}
4054

41-
public <T> void processSuccessAndHandleFailed(final List<T> successReturns,
42-
final boolean suppressException) {
55+
@SafeVarargs
56+
public final <T> void processSuccessAndHandleFailed(final List<T> successReturns,
57+
final boolean suppressException,
58+
final boolean deleteNonRetryableMessageFromQueue,
59+
final Class<? extends Exception>... nonRetryableExceptions) {
4360
if (hasFailures()) {
44-
deleteSuccessMessage();
61+
62+
List<Exception> exceptions = new ArrayList<>();
63+
List<SQSMessage> failedMessages = new ArrayList<>();
64+
Map<SQSMessage, Exception> nonRetryableMessageToException = new HashMap<>();
65+
66+
messageToException.forEach((sqsMessage, exception) -> {
67+
boolean nonRetryableMessage = Arrays.stream(nonRetryableExceptions)
68+
.anyMatch(aClass -> aClass.isInstance(exception));
69+
70+
if (nonRetryableMessage) {
71+
nonRetryableMessageToException.put(sqsMessage, exception);
72+
} else {
73+
exceptions.add(exception);
74+
failedMessages.add(sqsMessage);
75+
}
76+
});
77+
78+
List<SQSMessage> messagesToBeDeleted = new ArrayList<>(success);
79+
80+
if (!nonRetryableMessageToException.isEmpty() && deleteNonRetryableMessageFromQueue) {
81+
messagesToBeDeleted.addAll(nonRetryableMessageToException.keySet());
82+
} else if (!nonRetryableMessageToException.isEmpty()) {
83+
84+
boolean isMovedToDlq = moveNonRetryableMessagesToDlqIfConfigured(nonRetryableMessageToException);
85+
86+
if (!isMovedToDlq) {
87+
exceptions.addAll(nonRetryableMessageToException.values());
88+
failedMessages.addAll(nonRetryableMessageToException.keySet());
89+
}
90+
}
91+
92+
deleteMessagesFromQueue(messagesToBeDeleted);
4593

4694
if (suppressException) {
47-
List<String> messageIds = failures.stream().
95+
List<String> messageIds = failedMessages.stream().
4896
map(SQSMessage::getMessageId)
4997
.collect(toList());
5098

5199
LOG.debug(format("[%s] records failed processing, but exceptions are suppressed. " +
52-
"Failed messages %s", failures.size(), messageIds));
100+
"Failed messages %s", failedMessages.size(), messageIds));
53101
} else {
54-
throw new SQSBatchProcessingException(exceptions, failures, successReturns);
102+
throw new SQSBatchProcessingException(exceptions, failedMessages, successReturns);
55103
}
56104
}
57105
}
58106

107+
private boolean moveNonRetryableMessagesToDlqIfConfigured(Map<SQSMessage, Exception> nonRetryableMessageToException) {
108+
Optional<String> dlqUrl = fetchDlqUrl(nonRetryableMessageToException);
109+
110+
if (!dlqUrl.isPresent()) {
111+
return false;
112+
}
113+
114+
List<SendMessageBatchRequestEntry> dlqMessages = nonRetryableMessageToException.keySet().stream()
115+
.map(sqsMessage -> {
116+
Map<String, MessageAttributeValue> messageAttributesMap = new HashMap<>();
117+
118+
sqsMessage.getMessageAttributes().forEach((s, messageAttribute) -> {
119+
MessageAttributeValue.Builder builder = MessageAttributeValue.builder();
120+
121+
builder
122+
.dataType(messageAttribute.getDataType())
123+
.stringValue(messageAttribute.getStringValue());
124+
125+
if (null != messageAttribute.getBinaryValue()) {
126+
builder.binaryValue(SdkBytes.fromByteBuffer(messageAttribute.getBinaryValue()));
127+
}
128+
129+
messageAttributesMap.put(s, builder.build());
130+
});
131+
132+
return SendMessageBatchRequestEntry.builder()
133+
.messageBody(sqsMessage.getBody())
134+
.id(sqsMessage.getMessageId())
135+
.messageAttributes(messageAttributesMap)
136+
.build();
137+
})
138+
.collect(toList());
139+
140+
SendMessageBatchResponse sendMessageBatchResponse = client.sendMessageBatch(builder -> builder.queueUrl(dlqUrl.get())
141+
.entries(dlqMessages));
142+
143+
LOG.debug(format("Response from send batch message to DLQ request %s", sendMessageBatchResponse));
144+
145+
return true;
146+
}
147+
148+
private Optional<String> fetchDlqUrl(Map<SQSMessage, Exception> nonRetryableMessageToException) {
149+
return nonRetryableMessageToException.keySet().stream()
150+
.findFirst()
151+
.map(sqsMessage -> queueArnToDlqUrlMapping.computeIfAbsent(sqsMessage.getEventSourceArn(), sourceArn -> {
152+
String queueUrl = url(sourceArn);
153+
154+
GetQueueAttributesResponse queueAttributes = client.getQueueAttributes(GetQueueAttributesRequest.builder()
155+
.attributeNames(QueueAttributeName.REDRIVE_POLICY)
156+
.queueUrl(queueUrl)
157+
.build());
158+
159+
try {
160+
JsonNode jsonNode = SqsUtils.objectMapper().readTree(queueAttributes.attributes().get(QueueAttributeName.REDRIVE_POLICY));
161+
return url(jsonNode.get("deadLetterTargetArn").asText());
162+
} catch (JsonProcessingException e) {
163+
LOG.debug("Unable to parse Re drive policy for queue {}. Even if DLQ exists, failed messages will be send back to main queue.", queueUrl, e);
164+
return null;
165+
}
166+
}));
167+
}
168+
59169
private boolean hasFailures() {
60-
return !failures.isEmpty();
170+
return !messageToException.isEmpty();
61171
}
62172

63-
private void deleteSuccessMessage() {
64-
if (!success.isEmpty()) {
173+
private void deleteMessagesFromQueue(final List<SQSMessage> messages) {
174+
if (!messages.isEmpty()) {
65175
DeleteMessageBatchRequest request = DeleteMessageBatchRequest.builder()
66-
.queueUrl(url())
67-
.entries(success.stream().map(m -> DeleteMessageBatchRequestEntry.builder()
176+
.queueUrl(url(messages.get(0).getEventSourceArn()))
177+
.entries(messages.stream().map(m -> DeleteMessageBatchRequestEntry.builder()
68178
.id(m.getMessageId())
69179
.receiptHandle(m.getReceiptHandle())
70180
.build()).collect(toList()))
@@ -75,12 +185,15 @@ private void deleteSuccessMessage() {
75185
}
76186
}
77187

78-
private String url() {
79-
String[] arnArray = success.get(0).getEventSourceArn().split(":");
80-
return client.getQueueUrl(GetQueueUrlRequest.builder()
81-
.queueOwnerAWSAccountId(arnArray[4])
82-
.queueName(arnArray[5])
83-
.build())
84-
.queueUrl();
188+
private String url(String queueArn) {
189+
return queueArnToQueueUrlMapping.computeIfAbsent(queueArn, s -> {
190+
String[] arnArray = queueArn.split(":");
191+
192+
return client.getQueueUrl(GetQueueUrlRequest.builder()
193+
.queueOwnerAWSAccountId(arnArray[4])
194+
.queueName(arnArray[5])
195+
.build())
196+
.queueUrl();
197+
});
85198
}
86199
}

powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspect.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ && placedOnSqsEventRequestHandler(pjp)) {
2929

3030
SQSEvent sqsEvent = (SQSEvent) proceedArgs[0];
3131

32-
batchProcessor(sqsEvent, sqsBatch.suppressException(), sqsBatch.value());
32+
batchProcessor(sqsEvent, sqsBatch.suppressException(), sqsBatch.value(), sqsBatch.nonRetryableExceptions());
3333
}
3434

3535
return pjp.proceed(proceedArgs);

powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/SqsUtilsBatchProcessorTest.java

+14
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,20 @@ public String process(SQSMessage message) {
216216
}
217217
}
218218

219+
@Test
220+
void shouldBatchProcessAndMoveNonRetryableExceptionToDlq() {
221+
String failedId = "2e1424d4-f796-459a-8184-9c92662be6da";
222+
223+
List<String> batchProcessor = batchProcessor(event, (message) -> {
224+
if (failedId.equals(message.getMessageId())) {
225+
throw new IllegalStateException("Failed processing");
226+
}
227+
228+
interactionClient.listQueues();
229+
return "Success";
230+
}, IllegalStateException.class, IllegalArgumentException.class);
231+
}
232+
219233
public class FailureSampleInnerSqsHandler implements SqsMessageHandler<String> {
220234
@Override
221235
public String process(SQSEvent.SQSMessage message) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package software.amazon.lambda.powertools.sqs.handlers;
2+
3+
import com.amazonaws.services.lambda.runtime.Context;
4+
import com.amazonaws.services.lambda.runtime.RequestHandler;
5+
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
6+
import software.amazon.lambda.powertools.sqs.SqsBatch;
7+
import software.amazon.lambda.powertools.sqs.SqsMessageHandler;
8+
9+
import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
10+
import static software.amazon.lambda.powertools.sqs.internal.SqsMessageBatchProcessorAspectTest.mockedRandom;
11+
12+
public class SqsMessageHandlerWithNonRetryableHandler implements RequestHandler<SQSEvent, String> {
13+
14+
@Override
15+
@SqsBatch(value = InnerMessageHandler.class, nonRetryableExceptions = {IllegalStateException.class, IllegalArgumentException.class})
16+
public String handleRequest(final SQSEvent sqsEvent,
17+
final Context context) {
18+
return "Success";
19+
}
20+
21+
private class InnerMessageHandler implements SqsMessageHandler<Object> {
22+
23+
@Override
24+
public String process(SQSMessage message) {
25+
if(message.getMessageId().isEmpty()) {
26+
throw new IllegalArgumentException("Invalid message and was moved to DLQ");
27+
}
28+
29+
mockedRandom.nextInt();
30+
return "Success";
31+
}
32+
}
33+
}

0 commit comments

Comments
 (0)