Skip to content

Commit e04ca71

Browse files
fix: support batch size greater than 10 processing (#667)
1 parent a3f435c commit e04ca71

File tree

4 files changed

+527
-24
lines changed

4 files changed

+527
-24
lines changed

powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/BatchContext.java

+37-14
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66
import java.util.List;
77
import java.util.Map;
88
import java.util.Optional;
9-
9+
import java.util.function.Consumer;
10+
import java.util.stream.IntStream;
1011
import com.fasterxml.jackson.core.JsonProcessingException;
1112
import com.fasterxml.jackson.databind.JsonNode;
1213
import org.slf4j.Logger;
@@ -20,6 +21,7 @@
2021
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesResponse;
2122
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
2223
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
24+
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
2325
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
2426
import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
2527
import software.amazon.lambda.powertools.sqs.SQSBatchProcessingException;
@@ -156,14 +158,20 @@ private boolean moveNonRetryableMessagesToDlqIfConfigured(Map<SQSMessage, Except
156158
})
157159
.collect(toList());
158160

159-
SendMessageBatchResponse sendMessageBatchResponse = client.sendMessageBatch(builder -> builder.queueUrl(dlqUrl.get())
160-
.entries(dlqMessages));
161+
batchRequest(dlqMessages, 10, entriesToSend -> {
162+
163+
SendMessageBatchResponse sendMessageBatchResponse = client.sendMessageBatch(SendMessageBatchRequest.builder()
164+
.entries(entriesToSend)
165+
.queueUrl(dlqUrl.get())
166+
.build());
161167

162-
LOG.debug("Response from send batch message to DLQ request {}", sendMessageBatchResponse);
168+
LOG.debug("Response from send batch message to DLQ request {}", sendMessageBatchResponse);
169+
});
163170

164171
return true;
165172
}
166173

174+
167175
private Optional<String> fetchDlqUrl(Map<SQSMessage, Exception> nonRetryableMessageToException) {
168176
return nonRetryableMessageToException.keySet().stream()
169177
.findFirst()
@@ -197,19 +205,34 @@ private boolean hasFailures() {
197205

198206
private void deleteMessagesFromQueue(final List<SQSMessage> messages) {
199207
if (!messages.isEmpty()) {
200-
DeleteMessageBatchRequest request = DeleteMessageBatchRequest.builder()
201-
.queueUrl(url(messages.get(0).getEventSourceArn()))
202-
.entries(messages.stream().map(m -> DeleteMessageBatchRequestEntry.builder()
203-
.id(m.getMessageId())
204-
.receiptHandle(m.getReceiptHandle())
205-
.build()).collect(toList()))
206-
.build();
207-
208-
DeleteMessageBatchResponse deleteMessageBatchResponse = client.deleteMessageBatch(request);
209-
LOG.debug("Response from delete request {}", deleteMessageBatchResponse);
208+
209+
List<DeleteMessageBatchRequestEntry> entries = messages.stream().map(m -> DeleteMessageBatchRequestEntry.builder()
210+
.id(m.getMessageId())
211+
.receiptHandle(m.getReceiptHandle())
212+
.build()).collect(toList());
213+
214+
batchRequest(entries, 10, entriesToDelete -> {
215+
DeleteMessageBatchRequest request = DeleteMessageBatchRequest.builder()
216+
.queueUrl(url(messages.get(0).getEventSourceArn()))
217+
.entries(entriesToDelete)
218+
.build();
219+
220+
DeleteMessageBatchResponse deleteMessageBatchResponse = client.deleteMessageBatch(request);
221+
222+
LOG.debug("Response from delete request {}", deleteMessageBatchResponse);
223+
});
210224
}
211225
}
212226

227+
private <T> void batchRequest(final List<T> listOFEntries,
228+
final int size,
229+
final Consumer<List<T>> batchLogic) {
230+
IntStream.range(0, listOFEntries.size())
231+
.filter(index -> index % size == 0)
232+
.mapToObj(index -> listOFEntries.subList(index, Math.min(index + size, listOFEntries.size())))
233+
.forEach(batchLogic);
234+
}
235+
213236
private String url(String queueArn) {
214237
String[] arnArray = queueArn.split(":");
215238
return String.format("https://sqs.%s.amazonaws.com/%s/%s", arnArray[3], arnArray[4], arnArray[5]);

powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/SqsUtilsBatchProcessorTest.java

+80-5
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
package software.amazon.lambda.powertools.sqs;
22

33
import java.io.IOException;
4+
import java.util.Collection;
45
import java.util.HashMap;
56
import java.util.List;
67
import java.util.function.Consumer;
7-
8+
import java.util.function.Function;
89
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
910
import com.fasterxml.jackson.databind.ObjectMapper;
1011
import org.assertj.core.api.Assertions;
@@ -18,8 +19,11 @@
1819
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest;
1920
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesResponse;
2021
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
22+
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
23+
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
2124

2225
import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
26+
import static org.assertj.core.api.Assertions.*;
2327
import static org.assertj.core.api.Assertions.assertThat;
2428
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
2529
import static org.mockito.ArgumentMatchers.any;
@@ -236,10 +240,10 @@ void shouldBatchProcessAndMoveNonRetryableExceptionToDlq() {
236240
return "Success";
237241
}, IllegalStateException.class, IllegalArgumentException.class);
238242

239-
Assertions.assertThat(batchProcessor)
243+
assertThat(batchProcessor)
240244
.hasSize(1);
241245

242-
verify(sqsClient).sendMessageBatch(any(Consumer.class));
246+
verify(sqsClient).sendMessageBatch(any(SendMessageBatchRequest.class));
243247
}
244248

245249
@Test
@@ -265,13 +269,84 @@ void shouldBatchProcessAndDeleteNonRetryableException() {
265269
return "Success";
266270
}, true, IllegalStateException.class, IllegalArgumentException.class);
267271

268-
Assertions.assertThat(batchProcessor)
272+
assertThat(batchProcessor)
269273
.hasSize(1);
270274

271-
verify(sqsClient, times(0)).sendMessageBatch(any(Consumer.class));
275+
verify(sqsClient, times(0)).sendMessageBatch(any(SendMessageBatchRequest.class));
272276
verify(sqsClient).deleteMessageBatch(any(DeleteMessageBatchRequest.class));
273277
}
274278

279+
@Test
280+
void shouldDeleteSuccessfulMessageInBatchesOfT10orLess() throws IOException {
281+
SQSEvent batch25Message = MAPPER.readValue(this.getClass().getResource("/sampleSqsBatchEventBatchSize25.json"), SQSEvent.class);
282+
283+
assertThatExceptionOfType(SQSBatchProcessingException.class)
284+
.isThrownBy(() -> batchProcessor(batch25Message, FailureSampleInnerSqsHandler.class))
285+
.satisfies(e -> {
286+
287+
assertThat(e.successMessageReturnValues())
288+
.hasSize(24)
289+
.contains("Success");
290+
291+
assertThat(e.getFailures())
292+
.hasSize(1)
293+
.extracting("messageId")
294+
.contains("2e1424d4-f796-459a-8184-9c92662be6da");
295+
296+
assertThat(e.getExceptions())
297+
.hasSize(1)
298+
.extracting("detailMessage")
299+
.contains("Failed processing");
300+
});
301+
302+
ArgumentCaptor<DeleteMessageBatchRequest> captor = ArgumentCaptor.forClass(DeleteMessageBatchRequest.class);
303+
304+
verify(sqsClient, times(3)).deleteMessageBatch(captor.capture());
305+
306+
assertThat(captor.getAllValues())
307+
.hasSize(3)
308+
.flatMap(DeleteMessageBatchRequest::entries)
309+
.hasSize(24);
310+
}
311+
312+
@Test
313+
void shouldBatchProcessAndMoveNonRetryableExceptionToDlqInBatchesOfT10orLess() throws IOException {
314+
SQSEvent batch25Message = MAPPER.readValue(this.getClass().getResource("/sampleSqsBatchEventBatchSize25.json"), SQSEvent.class);
315+
316+
HashMap<QueueAttributeName, String> attributes = new HashMap<>();
317+
318+
attributes.put(QueueAttributeName.REDRIVE_POLICY, "{\n" +
319+
" \"deadLetterTargetArn\": \"arn:aws:sqs:us-east-2:123456789012:retry-queue\",\n" +
320+
" \"maxReceiveCount\": 2\n" +
321+
"}");
322+
323+
when(sqsClient.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(GetQueueAttributesResponse.builder()
324+
.attributes(attributes)
325+
.build());
326+
327+
List<String> batchProcessor = batchProcessor(batch25Message, (message) -> {
328+
if ("2e1424d4-f796-459a-8184-9c92662be6da".equals(message.getMessageId())) {
329+
interactionClient.listQueues();
330+
return "Success";
331+
}
332+
333+
throw new IllegalStateException("Failed processing");
334+
}, IllegalStateException.class, IllegalArgumentException.class);
335+
336+
assertThat(batchProcessor)
337+
.hasSize(1);
338+
339+
ArgumentCaptor<SendMessageBatchRequest> captor = ArgumentCaptor.forClass(SendMessageBatchRequest.class);
340+
341+
342+
verify(sqsClient, times(3)).sendMessageBatch(captor.capture());
343+
344+
assertThat(captor.getAllValues())
345+
.hasSize(3)
346+
.flatMap(SendMessageBatchRequest::entries)
347+
.hasSize(24);
348+
}
349+
275350
public class FailureSampleInnerSqsHandler implements SqsMessageHandler<String> {
276351
@Override
277352
public String process(SQSEvent.SQSMessage message) {

powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspectTest.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest;
2020
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesResponse;
2121
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
22+
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
2223
import software.amazon.lambda.powertools.sqs.SQSBatchProcessingException;
2324
import software.amazon.lambda.powertools.sqs.handlers.LambdaHandlerApiGateway;
2425
import software.amazon.lambda.powertools.sqs.handlers.PartialBatchFailureSuppressedHandler;
@@ -133,7 +134,7 @@ void shouldBatchProcessAndMoveNonRetryableExceptionToDlq() {
133134

134135
verify(interactionClient).listQueues();
135136
verify(sqsClient).deleteMessageBatch(any(DeleteMessageBatchRequest.class));
136-
verify(sqsClient).sendMessageBatch(any(Consumer.class));
137+
verify(sqsClient).sendMessageBatch(any(SendMessageBatchRequest.class));
137138
}
138139

139140
@Test
@@ -146,7 +147,7 @@ void shouldBatchProcessAndDeleteNonRetryableExceptionMessage() {
146147
verify(interactionClient).listQueues();
147148
ArgumentCaptor<DeleteMessageBatchRequest> captor = ArgumentCaptor.forClass(DeleteMessageBatchRequest.class);
148149
verify(sqsClient).deleteMessageBatch(captor.capture());
149-
verify(sqsClient, never()).sendMessageBatch(any(Consumer.class));
150+
verify(sqsClient, never()).sendMessageBatch(any(SendMessageBatchRequest.class));
150151
verify(sqsClient, never()).getQueueAttributes(any(GetQueueAttributesRequest.class));
151152

152153
assertThat(captor.getValue())
@@ -186,7 +187,7 @@ void shouldBatchProcessAndFailWithExceptionForNonRetryableExceptionAndNoDlq() {
186187

187188
verify(interactionClient).listQueues();
188189
verify(sqsClient).deleteMessageBatch(any(DeleteMessageBatchRequest.class));
189-
verify(sqsClient, never()).sendMessageBatch(any(Consumer.class));
190+
verify(sqsClient, never()).sendMessageBatch(any(SendMessageBatchRequest.class));
190191
}
191192

192193
@Test
@@ -223,7 +224,7 @@ void shouldBatchProcessAndFailWithExceptionForNonRetryableExceptionWhenFailedPar
223224

224225
verify(interactionClient).listQueues();
225226
verify(sqsClient).deleteMessageBatch(any(DeleteMessageBatchRequest.class));
226-
verify(sqsClient, never()).sendMessageBatch(any(Consumer.class));
227+
verify(sqsClient, never()).sendMessageBatch(any(SendMessageBatchRequest.class));
227228
}
228229

229230
@Test
@@ -264,7 +265,7 @@ void shouldBatchProcessAndMoveNonRetryableExceptionToDlqAndThrowException() thro
264265

265266
verify(interactionClient).listQueues();
266267
verify(sqsClient).deleteMessageBatch(any(DeleteMessageBatchRequest.class));
267-
verify(sqsClient).sendMessageBatch(any(Consumer.class));
268+
verify(sqsClient).sendMessageBatch(any(SendMessageBatchRequest.class));
268269
}
269270

270271
private void setupContext() {

0 commit comments

Comments
 (0)