20
20
import java .util .ArrayList ;
21
21
import java .util .List ;
22
22
import java .util .Optional ;
23
+ import java .util .concurrent .atomic .AtomicBoolean ;
23
24
import java .util .function .BiConsumer ;
24
25
import java .util .function .Consumer ;
25
26
import java .util .stream .Collectors ;
@@ -65,10 +66,10 @@ public SQSBatchResponse processBatch(SQSEvent event, Context context) {
65
66
// If we are working on a FIFO queue, when any message fails we should stop processing and return the
66
67
// rest of the batch as failed too. We use this variable to track when that has happened.
67
68
// https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting
68
- final Boolean [] failWholeBatch = { false } ;
69
+ final AtomicBoolean failWholeBatch = new AtomicBoolean ( false ) ;
69
70
70
71
int messageCursor = 0 ;
71
- for (; messageCursor < event .getRecords ().size () && !failWholeBatch [ 0 ] ; messageCursor ++) {
72
+ for (; messageCursor < event .getRecords ().size () && !failWholeBatch . get () ; messageCursor ++) {
72
73
SQSEvent .SQSMessage message = event .getRecords ().get (messageCursor );
73
74
74
75
String messageGroupId = message .getAttributes () != null ?
@@ -77,15 +78,15 @@ public SQSBatchResponse processBatch(SQSEvent event, Context context) {
77
78
processBatchItem (message , context ).ifPresent (batchItemFailure -> {
78
79
response .getBatchItemFailures ().add (batchItemFailure );
79
80
if (messageGroupId != null ) {
80
- failWholeBatch [ 0 ] = true ;
81
+ failWholeBatch . set ( true ) ;
81
82
LOGGER .info (
82
83
"A message in a batch with messageGroupId {} and messageId {} failed; failing the rest of the batch too"
83
84
, messageGroupId , message .getMessageId ());
84
85
}
85
86
});
86
87
}
87
88
88
- if (failWholeBatch [ 0 ] ) {
89
+ if (failWholeBatch . get () ) {
89
90
// Add the remaining messages to the batch item failures
90
91
event .getRecords ()
91
92
.subList (messageCursor , event .getRecords ().size ())
@@ -106,6 +107,7 @@ public SQSBatchResponse processBatchInParallel(SQSEvent event, Context context)
106
107
List <SQSBatchResponse .BatchItemFailure > batchItemFailures = event .getRecords ()
107
108
.parallelStream () // Parallel processing
108
109
.map (sqsMessage -> {
110
+
109
111
multiThreadMDC .copyMDCToThread (Thread .currentThread ().getName ());
110
112
return processBatchItem (sqsMessage , context );
111
113
})
0 commit comments