Skip to content

Commit 1bbad5b

Browse files
committed
code review
1 parent 789b91b commit 1bbad5b

File tree

3 files changed

+19
-22
lines changed

3 files changed

+19
-22
lines changed

powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/DynamoDbBatchMessageHandler.java

+7-8
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import com.amazonaws.services.lambda.runtime.Context;
1818
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
1919
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
20-
import java.util.ArrayList;
2120
import java.util.List;
2221
import java.util.Optional;
2322
import java.util.function.BiConsumer;
@@ -49,16 +48,16 @@ public DynamoDbBatchMessageHandler(Consumer<DynamodbEvent.DynamodbStreamRecord>
4948

5049
@Override
5150
public StreamsEventResponse processBatch(DynamodbEvent event, Context context) {
52-
StreamsEventResponse response = StreamsEventResponse.builder().withBatchItemFailures(new ArrayList<>()).build();
53-
54-
for (DynamodbEvent.DynamodbStreamRecord streamRecord : event.getRecords()) {
55-
processBatchItem(streamRecord, context).ifPresent(batchItemFailure -> response.getBatchItemFailures().add(batchItemFailure));
56-
}
51+
List<StreamsEventResponse.BatchItemFailure> batchItemFailures = event.getRecords()
52+
.stream()
53+
.map(eventRecord -> processBatchItem(eventRecord, context))
54+
.filter(Optional::isPresent)
55+
.map(Optional::get)
56+
.collect(Collectors.toList());
5757

58-
return response;
58+
return StreamsEventResponse.builder().withBatchItemFailures(batchItemFailures).build();
5959
}
6060

61-
6261
@Override
6362
public StreamsEventResponse processBatchInParallel(DynamodbEvent event, Context context) {
6463
MultiThreadMDC multiThreadMDC = new MultiThreadMDC();

powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/KinesisStreamsBatchMessageHandler.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import com.amazonaws.services.lambda.runtime.Context;
1919
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
2020
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
21-
import java.util.ArrayList;
2221
import java.util.List;
2322
import java.util.Optional;
2423
import java.util.function.BiConsumer;
@@ -60,13 +59,14 @@ public KinesisStreamsBatchMessageHandler(BiConsumer<KinesisEvent.KinesisEventRec
6059

6160
@Override
6261
public StreamsEventResponse processBatch(KinesisEvent event, Context context) {
63-
StreamsEventResponse response = StreamsEventResponse.builder().withBatchItemFailures(new ArrayList<>()).build();
64-
65-
for (KinesisEvent.KinesisEventRecord eventRecord : event.getRecords()) {
66-
processBatchItem(eventRecord, context).ifPresent(batchItemFailure -> response.getBatchItemFailures().add(batchItemFailure));
67-
}
62+
List<StreamsEventResponse.BatchItemFailure> batchItemFailures = event.getRecords()
63+
.stream()
64+
.map(eventRecord -> processBatchItem(eventRecord, context))
65+
.filter(Optional::isPresent)
66+
.map(Optional::get)
67+
.collect(Collectors.toList());
6868

69-
return response;
69+
return StreamsEventResponse.builder().withBatchItemFailures(batchItemFailures).build();
7070
}
7171

7272
@Override

powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/SqsBatchMessageHandler.java

+5-7
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import java.util.ArrayList;
2121
import java.util.List;
2222
import java.util.Optional;
23-
import java.util.concurrent.atomic.AtomicBoolean;
2423
import java.util.function.BiConsumer;
2524
import java.util.function.Consumer;
2625
import java.util.stream.Collectors;
@@ -66,10 +65,10 @@ public SQSBatchResponse processBatch(SQSEvent event, Context context) {
6665
// If we are working on a FIFO queue, when any message fails we should stop processing and return the
6766
// rest of the batch as failed too. We use this variable to track when that has happened.
6867
// https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting
69-
AtomicBoolean failWholeBatch = new AtomicBoolean(false);
68+
final Boolean[] failWholeBatch = {false};
7069

7170
int messageCursor = 0;
72-
for (; messageCursor < event.getRecords().size() && !failWholeBatch.get(); messageCursor++) {
71+
for (; messageCursor < event.getRecords().size() && !failWholeBatch[0]; messageCursor++) {
7372
SQSEvent.SQSMessage message = event.getRecords().get(messageCursor);
7473

7574
String messageGroupId = message.getAttributes() != null ?
@@ -78,15 +77,15 @@ public SQSBatchResponse processBatch(SQSEvent event, Context context) {
7877
processBatchItem(message, context).ifPresent(batchItemFailure -> {
7978
response.getBatchItemFailures().add(batchItemFailure);
8079
if (messageGroupId != null) {
81-
failWholeBatch.set(true);
80+
failWholeBatch[0] = true;
8281
LOGGER.info(
8382
"A message in a batch with messageGroupId {} and messageId {} failed; failing the rest of the batch too"
8483
, messageGroupId, message.getMessageId());
8584
}
8685
});
8786
}
8887

89-
if (failWholeBatch.get()) {
88+
if (failWholeBatch[0]) {
9089
// Add the remaining messages to the batch item failures
9190
event.getRecords()
9291
.subList(messageCursor, event.getRecords().size())
@@ -100,8 +99,7 @@ public SQSBatchResponse processBatch(SQSEvent event, Context context) {
10099
@Override
101100
public SQSBatchResponse processBatchInParallel(SQSEvent event, Context context) {
102101
if (!event.getRecords().isEmpty() && event.getRecords().get(0).getAttributes().get(MESSAGE_GROUP_ID_KEY) != null) {
103-
LOGGER.warn("FIFO queues are not supported in parallel mode, proceeding in sequence");
104-
return processBatch(event, context);
102+
throw new UnsupportedOperationException("FIFO queues are not supported in parallel mode, use the processBatch method instead");
105103
}
106104

107105
MultiThreadMDC multiThreadMDC = new MultiThreadMDC();

0 commit comments

Comments
 (0)