@@ -496,9 +496,14 @@ public static <R> List<R> batchProcessor(final SQSEvent event,
496
496
}
497
497
498
498
BatchContext batchContext = new BatchContext (client );
499
- Queue <SQSMessage > messagesToProcess = new LinkedList <>(event .getRecords ());
500
- while (!messagesToProcess .isEmpty ()) {
501
- SQSMessage message = messagesToProcess .remove ();
499
+ int offset = 0 ;
500
+ while (offset < event .getRecords ().size ()) {
501
+ // Get the current message and advance to the next. Doing this here
502
+ // makes it easier for us to know where we are up to if we have to
503
+ // break out of here early.
504
+ SQSMessage message = event .getRecords ().get (offset );
505
+ offset ++;
506
+
502
507
// If the batch hasn't failed, try process the message
503
508
try {
504
509
handlerReturn .add (handler .process (message ));
@@ -525,11 +530,15 @@ public static <R> List<R> batchProcessor(final SQSEvent event,
525
530
526
531
// If we have a FIFO batch failure, unprocessed messages will remain on the queue
527
532
// past the failed message. We have to add these to the errors
528
- messagesToProcess .forEach (message -> {
529
- LOG .info ("Skipping message {} as another message with a message group failed in this batch" ,
530
- message .getMessageId ());
531
- batchContext .addFailure (message , new SkippedMessageDueToFailedBatchException ());
532
- });
533
+ if (offset < event .getRecords ().size ()) {
534
+ event .getRecords ()
535
+ .subList (offset , event .getRecords ().size ())
536
+ .forEach (message -> {
537
+ LOG .info ("Skipping message {} as another message with a message group failed in this batch" ,
538
+ message .getMessageId ());
539
+ batchContext .addFailure (message , new SkippedMessageDueToFailedBatchException ());
540
+ });
541
+ }
533
542
534
543
batchContext .processSuccessAndHandleFailed (handlerReturn , suppressException , deleteNonRetryableMessageFromQueue , nonRetryableExceptions );
535
544
return handlerReturn ;
0 commit comments