Skip to content

Commit c2ca9b3

Browse files
fix: Prevent message to be marked as success if failed sending to DLQ
1 parent 1773879 commit c2ca9b3

File tree

2 files changed

+56
-15
lines changed

2 files changed

+56
-15
lines changed

powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/BatchContext.java

+22-11
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66
import java.util.List;
77
import java.util.Map;
88
import java.util.Optional;
9-
import java.util.function.Consumer;
9+
import java.util.function.Function;
10+
import java.util.stream.Collectors;
1011
import java.util.stream.IntStream;
1112
import com.fasterxml.jackson.core.JsonProcessingException;
1213
import com.fasterxml.jackson.databind.JsonNode;
@@ -158,17 +159,23 @@ private boolean moveNonRetryableMessagesToDlqIfConfigured(Map<SQSMessage, Except
158159
})
159160
.collect(toList());
160161

161-
batchRequest(dlqMessages, 10, entriesToSend -> {
162+
List<SendMessageBatchResponse> sendMessageBatchResponses = batchRequest(dlqMessages, 10, entriesToSend -> {
162163

163164
SendMessageBatchResponse sendMessageBatchResponse = client.sendMessageBatch(SendMessageBatchRequest.builder()
164-
.entries(entriesToSend)
165-
.queueUrl(dlqUrl.get())
166-
.build());
165+
.entries(entriesToSend)
166+
.queueUrl(dlqUrl.get())
167+
.build());
168+
167169

168170
LOG.debug("Response from send batch message to DLQ request {}", sendMessageBatchResponse);
171+
172+
return sendMessageBatchResponse;
169173
});
170174

171-
return true;
175+
return sendMessageBatchResponses.stream()
176+
.filter(response -> null != response && response.hasFailed())
177+
.peek(sendMessageBatchResponse -> LOG.error("Failed sending message to the DLQ. Entire batch will be re processed. Check if need permissions are configured for the function. Response: {}", sendMessageBatchResponse))
178+
.count() == 0;
172179
}
173180

174181

@@ -220,17 +227,21 @@ private void deleteMessagesFromQueue(final List<SQSMessage> messages) {
220227
DeleteMessageBatchResponse deleteMessageBatchResponse = client.deleteMessageBatch(request);
221228

222229
LOG.debug("Response from delete request {}", deleteMessageBatchResponse);
230+
231+
return deleteMessageBatchResponse;
223232
});
224233
}
225234
}
226235

227-
private <T> void batchRequest(final List<T> listOFEntries,
228-
final int size,
229-
final Consumer<List<T>> batchLogic) {
230-
IntStream.range(0, listOFEntries.size())
236+
private <T, R> List<R> batchRequest(final List<T> listOFEntries,
237+
final int size,
238+
final Function<List<T>, R> batchLogic) {
239+
240+
return IntStream.range(0, listOFEntries.size())
231241
.filter(index -> index % size == 0)
232242
.mapToObj(index -> listOFEntries.subList(index, Math.min(index + size, listOFEntries.size())))
233-
.forEach(batchLogic);
243+
.map(batchLogic)
244+
.collect(Collectors.toList());
234245
}
235246

236247
private String url(String queueArn) {

powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspectTest.java

+34-4
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
package software.amazon.lambda.powertools.sqs.internal;
22

33
import java.io.IOException;
4-
import java.time.LocalDateTime;
54
import java.util.HashMap;
6-
import java.util.function.Consumer;
7-
85
import com.amazonaws.services.lambda.runtime.Context;
96
import com.amazonaws.services.lambda.runtime.RequestHandler;
107
import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent;
@@ -15,11 +12,13 @@
1512
import org.junit.jupiter.api.Test;
1613
import org.mockito.ArgumentCaptor;
1714
import software.amazon.awssdk.services.sqs.SqsClient;
15+
import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
1816
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
1917
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest;
2018
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesResponse;
2119
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
2220
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
21+
import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
2322
import software.amazon.lambda.powertools.sqs.SQSBatchProcessingException;
2423
import software.amazon.lambda.powertools.sqs.handlers.LambdaHandlerApiGateway;
2524
import software.amazon.lambda.powertools.sqs.handlers.PartialBatchFailureSuppressedHandler;
@@ -30,7 +29,6 @@
3029

3130
import static org.assertj.core.api.Assertions.assertThat;
3231
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
33-
import static org.assertj.core.api.Assertions.in;
3432
import static org.mockito.ArgumentMatchers.any;
3533
import static org.mockito.Mockito.mock;
3634
import static org.mockito.Mockito.never;
@@ -137,6 +135,38 @@ void shouldBatchProcessAndMoveNonRetryableExceptionToDlq() {
137135
verify(sqsClient).sendMessageBatch(any(SendMessageBatchRequest.class));
138136
}
139137

138+
@Test
139+
void shouldBatchProcessAndThrowExceptionForNonRetryableExceptionWhenMoveToDlqReturnFailedResponse() {
140+
requestHandler = new SqsMessageHandlerWithNonRetryableHandler();
141+
event.getRecords().get(0).setMessageId("");
142+
143+
when(sqsClient.sendMessageBatch(any(SendMessageBatchRequest.class))).thenReturn(SendMessageBatchResponse.builder()
144+
.failed(BatchResultErrorEntry.builder()
145+
.message("Permission Error")
146+
.code("KMS.AccessDeniedException")
147+
.senderFault(true)
148+
.build())
149+
.build());
150+
151+
HashMap<QueueAttributeName, String> attributes = new HashMap<>();
152+
153+
attributes.put(QueueAttributeName.REDRIVE_POLICY, "{\n" +
154+
" \"deadLetterTargetArn\": \"arn:aws:sqs:us-east-2:123456789012:retry-queue\",\n" +
155+
" \"maxReceiveCount\": 2\n" +
156+
"}");
157+
158+
when(sqsClient.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(GetQueueAttributesResponse.builder()
159+
.attributes(attributes)
160+
.build());
161+
162+
Assertions.assertThatExceptionOfType(SQSBatchProcessingException.class).
163+
isThrownBy(() -> requestHandler.handleRequest(event, context));
164+
165+
verify(interactionClient).listQueues();
166+
verify(sqsClient).sendMessageBatch(any(SendMessageBatchRequest.class));
167+
verify(sqsClient).deleteMessageBatch(any(DeleteMessageBatchRequest.class));
168+
}
169+
140170
@Test
141171
void shouldBatchProcessAndDeleteNonRetryableExceptionMessage() {
142172
requestHandler = new SqsMessageHandlerWithNonRetryableHandlerWithDelete();

0 commit comments

Comments
 (0)