Skip to content

Commit 9ba540f

Browse files
author
Pankaj Agrawal
committed
feat(batch-processing): test cases
1 parent 1d08539 commit 9ba540f

File tree

8 files changed

+334
-26
lines changed

8 files changed

+334
-26
lines changed

powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java

+11
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,17 @@ public static <R> List<R> batchProcessor(final SQSEvent event,
183183
return batchProcessor(event, suppressException, handlerInstance, false, nonRetryableExceptions);
184184
}
185185

186+
@SafeVarargs
187+
public static <R> List<R> batchProcessor(final SQSEvent event,
188+
final boolean suppressException,
189+
final Class<? extends SqsMessageHandler<R>> handler,
190+
final boolean deleteNonRetryableMessageFromQueue,
191+
final Class<? extends Exception>... nonRetryableExceptions) {
192+
193+
SqsMessageHandler<R> handlerInstance = instantiatedHandler(handler);
194+
return batchProcessor(event, suppressException, handlerInstance, deleteNonRetryableMessageFromQueue, nonRetryableExceptions);
195+
}
196+
186197
/**
187198
* This utility method is used to processes each {@link SQSMessage} inside received {@link SQSEvent}
188199
*

powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/BatchContext.java

+41-25
Original file line numberDiff line numberDiff line change
@@ -62,17 +62,21 @@ public final <T> void processSuccessAndHandleFailed(final List<T> successReturns
6262
List<SQSMessage> failedMessages = new ArrayList<>();
6363
Map<SQSMessage, Exception> nonRetryableMessageToException = new HashMap<>();
6464

65-
messageToException.forEach((sqsMessage, exception) -> {
66-
boolean nonRetryableMessage = Arrays.stream(nonRetryableExceptions)
67-
.anyMatch(aClass -> aClass.isInstance(exception));
68-
69-
if (nonRetryableMessage) {
70-
nonRetryableMessageToException.put(sqsMessage, exception);
71-
} else {
72-
exceptions.add(exception);
73-
failedMessages.add(sqsMessage);
74-
}
75-
});
65+
if (nonRetryableExceptions.length == 0) {
66+
exceptions.addAll(messageToException.values());
67+
failedMessages.addAll(messageToException.keySet());
68+
} else {
69+
messageToException.forEach((sqsMessage, exception) -> {
70+
boolean nonRetryableException = isNonRetryableException(exception, nonRetryableExceptions);
71+
72+
if (nonRetryableException) {
73+
nonRetryableMessageToException.put(sqsMessage, exception);
74+
} else {
75+
exceptions.add(exception);
76+
failedMessages.add(sqsMessage);
77+
}
78+
});
79+
}
7680

7781
List<SQSMessage> messagesToBeDeleted = new ArrayList<>(success);
7882

@@ -90,23 +94,35 @@ public final <T> void processSuccessAndHandleFailed(final List<T> successReturns
9094

9195
deleteMessagesFromQueue(messagesToBeDeleted);
9296

93-
if (failedMessages.isEmpty()) {
94-
return;
95-
}
97+
processFailedMessages(successReturns, suppressException, exceptions, failedMessages);
98+
}
99+
}
96100

97-
if (suppressException) {
98-
List<String> messageIds = failedMessages.stream().
99-
map(SQSMessage::getMessageId)
100-
.collect(toList());
101+
private <T> void processFailedMessages(List<T> successReturns,
102+
boolean suppressException,
103+
List<Exception> exceptions,
104+
List<SQSMessage> failedMessages) {
105+
if (failedMessages.isEmpty()) {
106+
return;
107+
}
101108

102-
LOG.debug(format("[%s] records failed processing, but exceptions are suppressed. " +
103-
"Failed messages %s", failedMessages.size(), messageIds));
104-
} else {
105-
throw new SQSBatchProcessingException(exceptions, failedMessages, successReturns);
106-
}
109+
if (suppressException) {
110+
List<String> messageIds = failedMessages.stream().
111+
map(SQSMessage::getMessageId)
112+
.collect(toList());
113+
114+
LOG.debug(format("[%s] records failed processing, but exceptions are suppressed. " +
115+
"Failed messages %s", failedMessages.size(), messageIds));
116+
} else {
117+
throw new SQSBatchProcessingException(exceptions, failedMessages, successReturns);
107118
}
108119
}
109120

121+
private boolean isNonRetryableException(Exception exception, Class<? extends Exception>[] nonRetryableExceptions) {
122+
return Arrays.stream(nonRetryableExceptions)
123+
.anyMatch(aClass -> aClass.isInstance(exception));
124+
}
125+
110126
private boolean moveNonRetryableMessagesToDlqIfConfigured(Map<SQSMessage, Exception> nonRetryableMessageToException) {
111127
Optional<String> dlqUrl = fetchDlqUrl(nonRetryableMessageToException);
112128

@@ -143,7 +159,7 @@ private boolean moveNonRetryableMessagesToDlqIfConfigured(Map<SQSMessage, Except
143159
SendMessageBatchResponse sendMessageBatchResponse = client.sendMessageBatch(builder -> builder.queueUrl(dlqUrl.get())
144160
.entries(dlqMessages));
145161

146-
LOG.debug(format("Response from send batch message to DLQ request %s", sendMessageBatchResponse));
162+
LOG.debug("Response from send batch message to DLQ request {}", sendMessageBatchResponse);
147163

148164
return true;
149165
}
@@ -190,7 +206,7 @@ private void deleteMessagesFromQueue(final List<SQSMessage> messages) {
190206
.build();
191207

192208
DeleteMessageBatchResponse deleteMessageBatchResponse = client.deleteMessageBatch(request);
193-
LOG.debug(format("Response from delete request %s", deleteMessageBatchResponse));
209+
LOG.debug("Response from delete request {}", deleteMessageBatchResponse);
194210
}
195211
}
196212

powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspect.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,11 @@ && placedOnSqsEventRequestHandler(pjp)) {
2929

3030
SQSEvent sqsEvent = (SQSEvent) proceedArgs[0];
3131

32-
batchProcessor(sqsEvent, sqsBatch.suppressException(), sqsBatch.value(), sqsBatch.nonRetryableExceptions());
32+
batchProcessor(sqsEvent,
33+
sqsBatch.suppressException(),
34+
sqsBatch.value(),
35+
sqsBatch.deleteNonRetryableMessageFromQueue(),
36+
sqsBatch.nonRetryableExceptions());
3337
}
3438

3539
return pjp.proceed(proceedArgs);

powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/SqsUtilsBatchProcessorTest.java

+30
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,36 @@ void shouldBatchProcessAndMoveNonRetryableExceptionToDlq() {
242242
verify(sqsClient).sendMessageBatch(any(Consumer.class));
243243
}
244244

245+
@Test
246+
void shouldBatchProcessAndDeleteNonRetryableException() {
247+
String failedId = "2e1424d4-f796-459a-8184-9c92662be6da";
248+
HashMap<QueueAttributeName, String> attributes = new HashMap<>();
249+
250+
attributes.put(QueueAttributeName.REDRIVE_POLICY, "{\n" +
251+
" \"deadLetterTargetArn\": \"arn:aws:sqs:us-east-2:123456789012:retry-queue\",\n" +
252+
" \"maxReceiveCount\": 2\n" +
253+
"}");
254+
255+
when(sqsClient.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(GetQueueAttributesResponse.builder()
256+
.attributes(attributes)
257+
.build());
258+
259+
List<String> batchProcessor = batchProcessor(event, false, (message) -> {
260+
if (failedId.equals(message.getMessageId())) {
261+
throw new IllegalStateException("Failed processing");
262+
}
263+
264+
interactionClient.listQueues();
265+
return "Success";
266+
}, true, IllegalStateException.class, IllegalArgumentException.class);
267+
268+
Assertions.assertThat(batchProcessor)
269+
.hasSize(1);
270+
271+
verify(sqsClient, times(0)).sendMessageBatch(any(Consumer.class));
272+
verify(sqsClient).deleteMessageBatch(any(DeleteMessageBatchRequest.class));
273+
}
274+
245275
public class FailureSampleInnerSqsHandler implements SqsMessageHandler<String> {
246276
@Override
247277
public String process(SQSEvent.SQSMessage message) {

powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/SqsMessageHandlerWithNonRetryableHandler.java

+4
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ public String process(SQSMessage message) {
2626
throw new IllegalArgumentException("Invalid message and was moved to DLQ");
2727
}
2828

29+
if("2e1424d4-f796-459a-9696-9c92662ba5da".equals(message.getMessageId())) {
30+
throw new RuntimeException("Invalid message and should be reprocessed");
31+
}
32+
2933
mockedRandom.nextInt();
3034
return "Success";
3135
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package software.amazon.lambda.powertools.sqs.handlers;
2+
3+
import com.amazonaws.services.lambda.runtime.Context;
4+
import com.amazonaws.services.lambda.runtime.RequestHandler;
5+
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
6+
import software.amazon.lambda.powertools.sqs.SqsBatch;
7+
import software.amazon.lambda.powertools.sqs.SqsMessageHandler;
8+
9+
import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
10+
import static software.amazon.lambda.powertools.sqs.internal.SqsMessageBatchProcessorAspectTest.mockedRandom;
11+
12+
public class SqsMessageHandlerWithNonRetryableHandlerWithDelete implements RequestHandler<SQSEvent, String> {
13+
14+
@Override
15+
@SqsBatch(value = InnerMessageHandler.class,
16+
nonRetryableExceptions = {IllegalStateException.class, IllegalArgumentException.class},
17+
deleteNonRetryableMessageFromQueue = true)
18+
public String handleRequest(final SQSEvent sqsEvent,
19+
final Context context) {
20+
return "Success";
21+
}
22+
23+
private class InnerMessageHandler implements SqsMessageHandler<Object> {
24+
25+
@Override
26+
public String process(SQSMessage message) {
27+
if(message.getMessageId().isEmpty()) {
28+
throw new IllegalArgumentException("Invalid message and was moved to DLQ");
29+
}
30+
31+
if("2e1424d4-f796-459a-9696-9c92662ba5da".equals(message.getMessageId())) {
32+
throw new RuntimeException("Invalid message and should be reprocessed");
33+
}
34+
35+
mockedRandom.nextInt();
36+
return "Success";
37+
}
38+
}
39+
}

powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspectTest.java

+134
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import com.fasterxml.jackson.databind.ObjectMapper;
1313
import org.junit.jupiter.api.BeforeEach;
1414
import org.junit.jupiter.api.Test;
15+
import org.mockito.ArgumentCaptor;
1516
import software.amazon.awssdk.services.sqs.SqsClient;
1617
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
1718
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest;
@@ -23,11 +24,13 @@
2324
import software.amazon.lambda.powertools.sqs.handlers.PartialBatchPartialFailureHandler;
2425
import software.amazon.lambda.powertools.sqs.handlers.PartialBatchSuccessHandler;
2526
import software.amazon.lambda.powertools.sqs.handlers.SqsMessageHandlerWithNonRetryableHandler;
27+
import software.amazon.lambda.powertools.sqs.handlers.SqsMessageHandlerWithNonRetryableHandlerWithDelete;
2628

2729
import static org.assertj.core.api.Assertions.assertThat;
2830
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
2931
import static org.mockito.ArgumentMatchers.any;
3032
import static org.mockito.Mockito.mock;
33+
import static org.mockito.Mockito.never;
3134
import static org.mockito.Mockito.reset;
3235
import static org.mockito.Mockito.times;
3336
import static org.mockito.Mockito.verify;
@@ -131,6 +134,137 @@ void shouldBatchProcessAndMoveNonRetryableExceptionToDlq() {
131134
verify(sqsClient).sendMessageBatch(any(Consumer.class));
132135
}
133136

137+
@Test
138+
void shouldBatchProcessAndDeleteNonRetryableExceptionMessage() {
139+
requestHandler = new SqsMessageHandlerWithNonRetryableHandlerWithDelete();
140+
event.getRecords().get(0).setMessageId("");
141+
142+
requestHandler.handleRequest(event, context);
143+
144+
verify(mockedRandom).nextInt();
145+
ArgumentCaptor<DeleteMessageBatchRequest> captor = ArgumentCaptor.forClass(DeleteMessageBatchRequest.class);
146+
verify(sqsClient).deleteMessageBatch(captor.capture());
147+
verify(sqsClient, never()).sendMessageBatch(any(Consumer.class));
148+
verify(sqsClient, never()).getQueueAttributes(any(GetQueueAttributesRequest.class));
149+
150+
assertThat(captor.getValue())
151+
.satisfies(deleteMessageBatchRequest -> assertThat(deleteMessageBatchRequest.entries())
152+
.hasSize(2)
153+
.extracting("id")
154+
.contains("", "2e1424d4-f796-459a-8184-9c92662be6da"));
155+
}
156+
157+
@Test
158+
void shouldBatchProcessAndFailWithExceptionForNonRetryableExceptionAndNoDlq() {
159+
requestHandler = new SqsMessageHandlerWithNonRetryableHandler();
160+
161+
event.getRecords().get(0).setMessageId("");
162+
event.getRecords().forEach(sqsMessage -> sqsMessage.setEventSourceArn(sqsMessage.getEventSourceArn() + "-temp"));
163+
164+
when(sqsClient.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(GetQueueAttributesResponse.builder()
165+
.build());
166+
167+
assertThatExceptionOfType(SQSBatchProcessingException.class)
168+
.isThrownBy(() -> requestHandler.handleRequest(event, context))
169+
.satisfies(e -> {
170+
assertThat(e.getExceptions())
171+
.hasSize(1)
172+
.extracting("detailMessage")
173+
.containsExactly("Invalid message and was moved to DLQ");
174+
175+
assertThat(e.getFailures())
176+
.hasSize(1)
177+
.extracting("messageId")
178+
.containsExactly("");
179+
180+
assertThat(e.successMessageReturnValues())
181+
.hasSize(1)
182+
.contains("Success");
183+
});
184+
185+
verify(mockedRandom).nextInt();
186+
verify(sqsClient).deleteMessageBatch(any(DeleteMessageBatchRequest.class));
187+
verify(sqsClient, never()).sendMessageBatch(any(Consumer.class));
188+
}
189+
190+
@Test
191+
void shouldBatchProcessAndFailWithExceptionForNonRetryableExceptionWhenFailedParsingPolicy() {
192+
requestHandler = new SqsMessageHandlerWithNonRetryableHandler();
193+
event.getRecords().get(0).setMessageId("");
194+
event.getRecords().forEach(sqsMessage -> sqsMessage.setEventSourceArn(sqsMessage.getEventSourceArn() + "-temp-queue"));
195+
196+
HashMap<QueueAttributeName, String> attributes = new HashMap<>();
197+
198+
attributes.put(QueueAttributeName.REDRIVE_POLICY, "MalFormedRedrivePolicy");
199+
200+
when(sqsClient.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(GetQueueAttributesResponse.builder()
201+
.attributes(attributes)
202+
.build());
203+
204+
assertThatExceptionOfType(SQSBatchProcessingException.class)
205+
.isThrownBy(() -> requestHandler.handleRequest(event, context))
206+
.satisfies(e -> {
207+
assertThat(e.getExceptions())
208+
.hasSize(1)
209+
.extracting("detailMessage")
210+
.containsExactly("Invalid message and was moved to DLQ");
211+
212+
assertThat(e.getFailures())
213+
.hasSize(1)
214+
.extracting("messageId")
215+
.containsExactly("");
216+
217+
assertThat(e.successMessageReturnValues())
218+
.hasSize(1)
219+
.contains("Success");
220+
});
221+
222+
verify(mockedRandom).nextInt();
223+
verify(sqsClient).deleteMessageBatch(any(DeleteMessageBatchRequest.class));
224+
verify(sqsClient, never()).sendMessageBatch(any(Consumer.class));
225+
}
226+
227+
@Test
228+
void shouldBatchProcessAndMoveNonRetryableExceptionToDlqAndThrowException() throws IOException {
229+
requestHandler = new SqsMessageHandlerWithNonRetryableHandler();
230+
event = MAPPER.readValue(this.getClass().getResource("/threeMessageSqsBatchEvent.json"), SQSEvent.class);
231+
232+
event.getRecords().get(1).setMessageId("");
233+
234+
HashMap<QueueAttributeName, String> attributes = new HashMap<>();
235+
236+
attributes.put(QueueAttributeName.REDRIVE_POLICY, "{\n" +
237+
" \"deadLetterTargetArn\": \"arn:aws:sqs:us-east-2:123456789012:retry-queue\",\n" +
238+
" \"maxReceiveCount\": 2\n" +
239+
"}");
240+
241+
when(sqsClient.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(GetQueueAttributesResponse.builder()
242+
.attributes(attributes)
243+
.build());
244+
245+
assertThatExceptionOfType(SQSBatchProcessingException.class)
246+
.isThrownBy(() -> requestHandler.handleRequest(event, context))
247+
.satisfies(e -> {
248+
assertThat(e.getExceptions())
249+
.hasSize(1)
250+
.extracting("detailMessage")
251+
.containsExactly("Invalid message and should be reprocessed");
252+
253+
assertThat(e.getFailures())
254+
.hasSize(1)
255+
.extracting("messageId")
256+
.containsExactly("2e1424d4-f796-459a-9696-9c92662ba5da");
257+
258+
assertThat(e.successMessageReturnValues())
259+
.hasSize(1)
260+
.contains("Success");
261+
});
262+
263+
verify(mockedRandom).nextInt();
264+
verify(sqsClient).deleteMessageBatch(any(DeleteMessageBatchRequest.class));
265+
verify(sqsClient).sendMessageBatch(any(Consumer.class));
266+
}
267+
134268
private void setupContext() {
135269
when(context.getFunctionName()).thenReturn("testFunction");
136270
when(context.getInvokedFunctionArn()).thenReturn("testArn");

0 commit comments

Comments
 (0)