batchRequestEntries = IntStream.range(0, 5)
+ .mapToObj(value -> {
+ long id = random.nextLong();
+ float price = random.nextFloat();
+ Product product = new Product(id, "product-" + id, price);
+ try {
+
+ return SendMessageBatchRequestEntry.builder()
+ .id(scheduledEvent.getId() + value)
+ .messageBody(objectMapper.writeValueAsString(product))
+ .build();
+ } catch (JsonProcessingException e) {
+ LOGGER.error("Failed serializing body", e);
+ throw new RuntimeException(e);
+ }
+ }).collect(toList());
+
+ SendMessageBatchResponse sendMessageBatchResponse = sqsClient.sendMessageBatch(SendMessageBatchRequest.builder()
+ .queueUrl(queueUrl)
+ .entries(batchRequestEntries)
+ .build());
+
+ LOGGER.info("Sent Message {}", sendMessageBatchResponse);
+
+ return "Success";
+ }
+}
diff --git a/examples/powertools-examples-batch/src/main/resources/log4j2.xml b/examples/powertools-examples-batch/src/main/resources/log4j2.xml
new file mode 100644
index 000000000..ea3ecf474
--- /dev/null
+++ b/examples/powertools-examples-batch/src/main/resources/log4j2.xml
@@ -0,0 +1,16 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/mkdocs.yml b/mkdocs.yml
index 62d8d75ce..d54ece508 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -18,6 +18,9 @@ nav:
- utilities/validation.md
- utilities/custom_resources.md
- utilities/serialization.md
+ - Deprecated:
+ - utilities/sqs_large_message_handling.md
+ - utilities/sqs_batch.md
- Processes:
- processes/maintainers.md
diff --git a/pom.xml b/pom.xml
index 6a24501be..7d8659735 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,6 +54,7 @@
powertools-idempotency
powertools-large-messages
powertools-e2e-tests
+ powertools-batch
examples
diff --git a/powertools-batch/pom.xml b/powertools-batch/pom.xml
new file mode 100644
index 000000000..9e25dabd8
--- /dev/null
+++ b/powertools-batch/pom.xml
@@ -0,0 +1,68 @@
+
+
+ 4.0.0
+
+ software.amazon.lambda
+ powertools-parent
+ 1.17.0-SNAPSHOT
+
+
+
+
+
+ dev.aspectj
+ aspectj-maven-plugin
+
+ true
+
+
+
+
+
+ powertools-batch
+
+
+ com.amazonaws
+ aws-lambda-java-events
+
+
+ com.amazonaws
+ aws-lambda-java-core
+
+
+ software.amazon.lambda
+ powertools-serialization
+ ${project.version}
+
+
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ test
+
+
+ org.assertj
+ assertj-core
+ test
+
+
+ com.amazonaws
+ aws-lambda-java-tests
+ test
+
+
+ org.mockito
+ mockito-core
+ test
+
+
+ org.mockito
+ mockito-inline
+ test
+
+
+
+
\ No newline at end of file
diff --git a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/BatchMessageHandlerBuilder.java b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/BatchMessageHandlerBuilder.java
new file mode 100644
index 000000000..4ed44453b
--- /dev/null
+++ b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/BatchMessageHandlerBuilder.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2023 Amazon.com, Inc. or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package software.amazon.lambda.powertools.batch;
+
+import software.amazon.lambda.powertools.batch.builder.DynamoDbBatchMessageHandlerBuilder;
+import software.amazon.lambda.powertools.batch.builder.KinesisBatchMessageHandlerBuilder;
+import software.amazon.lambda.powertools.batch.builder.SqsBatchMessageHandlerBuilder;
+
+/**
+ * A builder-style interface we can use to build batch processing handlers for SQS, Kinesis Streams,
+ * and DynamoDB Streams batches. The batch processing handlers that are returned allow
+ * the user to easily process batches of messages, one-by-one, while offloading
+ * the common issues - failure handling, partial responses, deserialization -
+ * to the library.
+ *
+ * @see Powertools for AWS Lambda (Java) Batch Documentation
+ **/
+public class BatchMessageHandlerBuilder {
+
+ /**
+ * Build an SQS-batch message handler.
+ *
+ * @return A fluent builder interface to continue the building
+ */
+ public SqsBatchMessageHandlerBuilder withSqsBatchHandler() {
+ return new SqsBatchMessageHandlerBuilder();
+ }
+
+ /**
+ * Build a DynamoDB streams batch message handler.
+ *
+ * @return A fluent builder interface to continue the building
+ */
+ public DynamoDbBatchMessageHandlerBuilder withDynamoDbBatchHandler() {
+ return new DynamoDbBatchMessageHandlerBuilder();
+ }
+
+ /**
+ * Builds a Kinesis streams batch message handler.
+ *
+ * @return a fluent builder interface to continue the building
+ */
+ public KinesisBatchMessageHandlerBuilder withKinesisBatchHandler() {
+ return new KinesisBatchMessageHandlerBuilder();
+ }
+}
diff --git a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/builder/AbstractBatchMessageHandlerBuilder.java b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/builder/AbstractBatchMessageHandlerBuilder.java
new file mode 100644
index 000000000..9b0647770
--- /dev/null
+++ b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/builder/AbstractBatchMessageHandlerBuilder.java
@@ -0,0 +1,142 @@
+/*
+ * Copyright 2023 Amazon.com, Inc. or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package software.amazon.lambda.powertools.batch.builder;
+
+import com.amazonaws.services.lambda.runtime.Context;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler;
+
+/**
+ * An abstract class to capture common arguments used across all the message-binding-specific batch processing
+ * builders. The builders provide a fluent interface to configure the batch processors. Any arguments specific
+ * to a particular batch binding can be added to the child builder.
+ *
+ * We capture types for the various messages involved, so that we can provide an interface that makes
+ * sense for the concrete child.
+ *
+ * @param The type of a single message in the batch
+ * @param The type of the child builder. We need this to provide a fluent interface - see also getThis()
+ * @param The type of the Lambda batch event
+ * @param The type of the batch response we return to Lambda
+ */
+abstract class AbstractBatchMessageHandlerBuilder {
+ protected BiConsumer failureHandler;
+ protected Consumer successHandler;
+
+ /**
+ * Provides an (Optional!) success handler. A success handler is invoked
+ * once for each message after it has been processed by the user-provided
+ * handler.
+ *
+ * If the success handler throws, the item in the batch will be
+ * marked failed.
+ *
+ * @param handler The handler to invoke
+ */
+ public C withSuccessHandler(Consumer handler) {
+ this.successHandler = handler;
+ return getThis();
+ }
+
+ /**
+ * Provides an (Optional!) failure handler. A failure handler is invoked
+ * once for each message after it has failed to be processed by the
+ * user-provided handler. This gives the user's code a useful hook to do
+ * anything else that might have to be done in response to a failure - for
+ * instance, updating a metric, or writing a detailed log.
+ *
+ * Please note that this method has nothing to do with the partial batch
+ * failure mechanism. Regardless of whether a failure handler is
+ * specified, partial batch failures and responses to the Lambda environment
+ * are handled by the batch utility separately.
+ *
+ * @param handler The handler to invoke on failure
+ */
+ public C withFailureHandler(BiConsumer handler) {
+ this.failureHandler = handler;
+ return getThis();
+ }
+
+ /**
+ * Builds a BatchMessageHandler that can be used to process batches, given
+ * a user-defined handler to process each item in the batch. This variant
+ * takes a function that consumes a raw message and the Lambda context. This
+ * is useful for handlers that need access to the entire message object, not
+ * just the deserialized contents of the body.
+ *
+ * Note: If you don't need the Lambda context, use the variant of this function
+ * that does not require it.
+ *
+ * @param handler Takes a raw message - the underlying AWS Events Library event - to process.
+ * For instance for SQS this would be an SQSMessage.
+ * @return A BatchMessageHandler for processing the batch
+ */
+ public abstract BatchMessageHandler buildWithRawMessageHandler(BiConsumer handler);
+
+ /**
+ * Builds a BatchMessageHandler that can be used to process batches, given
+ * a user-defined handler to process each item in the batch. This variant
+ * takes a function that consumes a raw message and the Lambda context. This
+ * is useful for handlers that need access to the entire message object, not
+ * just the deserialized contents of the body.
+ *
+ * @param handler Takes a raw message - the underlying AWS Events Library event - to process.
+ * For instance for SQS this would be an SQSMessage.
+ * @return A BatchMessageHandler for processing the batch
+ */
+ public BatchMessageHandler buildWithRawMessageHandler(Consumer handler) {
+ return buildWithRawMessageHandler((f, c) -> handler.accept(f));
+ }
+
+ /**
+ * Builds a BatchMessageHandler that can be used to process batches, given
+ * a user-defined handler to process each item in the batch. This variant
+ * takes a function that consumes the deserialized body of the given message
+ * and the lambda context. If deserialization fails, it will be treated as
+ * failure of the processing of that item in the batch.
+ * Note: If you don't need the Lambda context, use the variant of this function
+ * that does not require it.
+ *
+ * @param handler Processes the deserialized body of the message
+ * @return A BatchMessageHandler for processing the batch
+ */
+ public abstract BatchMessageHandler buildWithMessageHandler(BiConsumer handler,
+ Class messageClass);
+
+ /**
+ * Builds a BatchMessageHandler that can be used to process batches, given
+ * a user-defined handler to process each item in the batch. This variant
+ * takes a function that consumes the deserialized body of the given message
+ * If deserialization fails, it will be treated as
+ * failure of the processing of that item in the batch.
+ * Note: If you don't need the Lambda context, use the variant of this function
+ * that does not require it.
+ *
+ * @param handler Processes the deserialized body of the message
+ * @return A BatchMessageHandler for processing the batch
+ */
+ public BatchMessageHandler buildWithMessageHandler(Consumer handler, Class messageClass) {
+ return buildWithMessageHandler((f, c) -> handler.accept(f), messageClass);
+ }
+
+
+ /**
+ * Used to chain the fluent builder interface through the child classes.
+ *
+ * @return This
+ */
+ protected abstract C getThis();
+}
diff --git a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/builder/DynamoDbBatchMessageHandlerBuilder.java b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/builder/DynamoDbBatchMessageHandlerBuilder.java
new file mode 100644
index 000000000..8513322b3
--- /dev/null
+++ b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/builder/DynamoDbBatchMessageHandlerBuilder.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2023 Amazon.com, Inc. or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package software.amazon.lambda.powertools.batch.builder;
+
+import com.amazonaws.services.lambda.runtime.Context;
+import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
+import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
+import java.util.function.BiConsumer;
+import software.amazon.lambda.powertools.batch.exception.DeserializationNotSupportedException;
+import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler;
+import software.amazon.lambda.powertools.batch.handler.DynamoDbBatchMessageHandler;
+
+/**
+ * Builds a batch processor for processing DynamoDB Streams batch events
+ **/
+public class DynamoDbBatchMessageHandlerBuilder
+ extends AbstractBatchMessageHandlerBuilder {
+
+
+ @Override
+ public BatchMessageHandler buildWithRawMessageHandler(
+ BiConsumer rawMessageHandler) {
+ return new DynamoDbBatchMessageHandler(
+ this.successHandler,
+ this.failureHandler,
+ rawMessageHandler);
+ }
+
+ @Override
+ public BatchMessageHandler buildWithMessageHandler(
+ BiConsumer handler, Class messageClass) {
+ // The DDB provider streams DynamoDB changes, and therefore does not have a customizable payload
+ throw new DeserializationNotSupportedException();
+ }
+
+ @Override
+ protected DynamoDbBatchMessageHandlerBuilder getThis() {
+ return this;
+ }
+}
diff --git a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/builder/KinesisBatchMessageHandlerBuilder.java b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/builder/KinesisBatchMessageHandlerBuilder.java
new file mode 100644
index 000000000..30bfcab65
--- /dev/null
+++ b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/builder/KinesisBatchMessageHandlerBuilder.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2023 Amazon.com, Inc. or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package software.amazon.lambda.powertools.batch.builder;
+
+import com.amazonaws.services.lambda.runtime.Context;
+import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
+import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
+import java.util.function.BiConsumer;
+import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler;
+import software.amazon.lambda.powertools.batch.handler.KinesisStreamsBatchMessageHandler;
+
+/**
+ * Builds a batch processor for processing Kinesis Streams batch events
+ */
+public class KinesisBatchMessageHandlerBuilder
+ extends AbstractBatchMessageHandlerBuilder {
+ @Override
+ public BatchMessageHandler buildWithRawMessageHandler(
+ BiConsumer rawMessageHandler) {
+ return new KinesisStreamsBatchMessageHandler(
+ rawMessageHandler,
+ null,
+ null,
+ successHandler,
+ failureHandler);
+ }
+
+ @Override
+ public BatchMessageHandler buildWithMessageHandler(
+ BiConsumer messageHandler, Class messageClass) {
+ return new KinesisStreamsBatchMessageHandler<>(
+ null,
+ messageHandler,
+ messageClass,
+ successHandler,
+ failureHandler);
+ }
+
+ @Override
+ protected KinesisBatchMessageHandlerBuilder getThis() {
+ return this;
+ }
+}
diff --git a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/builder/SqsBatchMessageHandlerBuilder.java b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/builder/SqsBatchMessageHandlerBuilder.java
new file mode 100644
index 000000000..ee2dc23f6
--- /dev/null
+++ b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/builder/SqsBatchMessageHandlerBuilder.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2023 Amazon.com, Inc. or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package software.amazon.lambda.powertools.batch.builder;
+
+import com.amazonaws.services.lambda.runtime.Context;
+import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse;
+import com.amazonaws.services.lambda.runtime.events.SQSEvent;
+import java.util.function.BiConsumer;
+import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler;
+import software.amazon.lambda.powertools.batch.handler.SqsBatchMessageHandler;
+
+/**
+ * Builds a batch processor for the SQS event source.
+ */
+public class SqsBatchMessageHandlerBuilder extends AbstractBatchMessageHandlerBuilder {
+
+
+ @Override
+ public BatchMessageHandler buildWithRawMessageHandler(
+ BiConsumer rawMessageHandler) {
+ return new SqsBatchMessageHandler(
+ null,
+ null,
+ rawMessageHandler,
+ successHandler,
+ failureHandler
+ );
+ }
+
+ @Override
+ public BatchMessageHandler buildWithMessageHandler(
+ BiConsumer messageHandler, Class messageClass) {
+ return new SqsBatchMessageHandler<>(
+ messageHandler,
+ messageClass,
+ null,
+ successHandler,
+ failureHandler
+ );
+ }
+
+
+ @Override
+ protected SqsBatchMessageHandlerBuilder getThis() {
+ return this;
+ }
+
+
+}
diff --git a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/exception/DeserializationNotSupportedException.java b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/exception/DeserializationNotSupportedException.java
new file mode 100644
index 000000000..6f3206c99
--- /dev/null
+++ b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/exception/DeserializationNotSupportedException.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2023 Amazon.com, Inc. or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package software.amazon.lambda.powertools.batch.exception;
+
+/**
+ * Thrown by message handlers that do not support deserializing arbitrary payload
+ * contents. This is the case for instance with DynamoDB Streams, which stream
+ * changesets about user-defined data, but not the user-defined data models themselves.
+ */
+public class DeserializationNotSupportedException extends RuntimeException {
+
+ public DeserializationNotSupportedException() {
+ super("This BatchMessageHandler has a fixed schema and does not support user-defined types");
+ }
+
+}
diff --git a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/BatchMessageHandler.java b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/BatchMessageHandler.java
new file mode 100644
index 000000000..730211feb
--- /dev/null
+++ b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/BatchMessageHandler.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2023 Amazon.com, Inc. or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package software.amazon.lambda.powertools.batch.handler;
+
+import com.amazonaws.services.lambda.runtime.Context;
+
+/**
+ * The basic interface a batch message handler must meet.
+ *
+ * @param The type of the Lambda batch event
+ * @param The type of the lambda batch response
+ */
+public interface BatchMessageHandler {
+
+ /**
+ * Processes the given batch returning a partial batch
+ * response indicating the success and failure of individual
+ * messages within the batch.
+ *
+ * @param event The Lambda event containing the batch to process
+ * @param context The lambda context
+ * @return A partial batch response
+ */
+ public abstract R processBatch(E event, Context context);
+
+}
diff --git a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/DynamoDbBatchMessageHandler.java b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/DynamoDbBatchMessageHandler.java
new file mode 100644
index 000000000..aa6eba839
--- /dev/null
+++ b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/DynamoDbBatchMessageHandler.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2023 Amazon.com, Inc. or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package software.amazon.lambda.powertools.batch.handler;
+
+import com.amazonaws.services.lambda.runtime.Context;
+import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
+import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A batch message processor for DynamoDB Streams batches.
+ *
+ * @see DynamoDB Streams batch failure reporting
+ */
+public class DynamoDbBatchMessageHandler implements BatchMessageHandler {
+ private final static Logger LOGGER = LoggerFactory.getLogger(DynamoDbBatchMessageHandler.class);
+
+ private final Consumer successHandler;
+ private final BiConsumer failureHandler;
+ private final BiConsumer rawMessageHandler;
+
+ public DynamoDbBatchMessageHandler(Consumer successHandler,
+ BiConsumer failureHandler,
+ BiConsumer rawMessageHandler) {
+ this.successHandler = successHandler;
+ this.failureHandler = failureHandler;
+ this.rawMessageHandler = rawMessageHandler;
+ }
+
+ @Override
+ public StreamsEventResponse processBatch(DynamodbEvent event, Context context) {
+ List batchFailures = new ArrayList<>();
+
+ for (DynamodbEvent.DynamodbStreamRecord record : event.getRecords()) {
+ try {
+
+ rawMessageHandler.accept(record, context);
+ // Report success if we have a handler
+ if (this.successHandler != null) {
+ this.successHandler.accept(record);
+ }
+ } catch (Throwable t) {
+ String sequenceNumber = record.getDynamodb().getSequenceNumber();
+ LOGGER.error("Error while processing record with id {}: {}, adding it to batch item failures",
+ sequenceNumber, t.getMessage());
+ LOGGER.error("Error was", t);
+ batchFailures.add(new StreamsEventResponse.BatchItemFailure(sequenceNumber));
+
+ // Report failure if we have a handler
+ if (this.failureHandler != null) {
+ // A failing failure handler is no reason to fail the batch
+ try {
+ this.failureHandler.accept(record, t);
+ } catch (Throwable t2) {
+ LOGGER.warn("failureHandler threw handling failure", t2);
+ }
+ }
+ }
+ }
+
+ return new StreamsEventResponse(batchFailures);
+ }
+}
diff --git a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/KinesisStreamsBatchMessageHandler.java b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/KinesisStreamsBatchMessageHandler.java
new file mode 100644
index 000000000..fe1aaf354
--- /dev/null
+++ b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/KinesisStreamsBatchMessageHandler.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2023 Amazon.com, Inc. or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package software.amazon.lambda.powertools.batch.handler;
+
+
+import com.amazonaws.services.lambda.runtime.Context;
+import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
+import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.lambda.powertools.utilities.EventDeserializer;
+
+/**
+ * A batch message processor for Kinesis Streams batch processing.
+ *
+ * Refer to Kinesis Batch failure reporting
+ *
+ * @param The user-defined type of the Kinesis record payload
+ */
+public class KinesisStreamsBatchMessageHandler implements BatchMessageHandler {
+ private final static Logger LOGGER = LoggerFactory.getLogger(KinesisStreamsBatchMessageHandler.class);
+
+ private final BiConsumer rawMessageHandler;
+ private final BiConsumer messageHandler;
+ private final Class messageClass;
+ private final Consumer successHandler;
+ private final BiConsumer failureHandler;
+
+ public KinesisStreamsBatchMessageHandler(BiConsumer rawMessageHandler,
+ BiConsumer messageHandler,
+ Class messageClass,
+ Consumer successHandler,
+ BiConsumer failureHandler) {
+
+ this.rawMessageHandler = rawMessageHandler;
+ this.messageHandler = messageHandler;
+ this.messageClass = messageClass;
+ this.successHandler = successHandler;
+ this.failureHandler = failureHandler;
+ }
+
+ @Override
+ public StreamsEventResponse processBatch(KinesisEvent event, Context context) {
+ List batchFailures = new ArrayList<>();
+
+ for (KinesisEvent.KinesisEventRecord record : event.getRecords()) {
+ try {
+ if (this.rawMessageHandler != null) {
+ rawMessageHandler.accept(record, context);
+ } else {
+ M messageDeserialized = EventDeserializer.extractDataFrom(record).as(messageClass);
+ messageHandler.accept(messageDeserialized, context);
+ }
+
+ // Report success if we have a handler
+ if (this.successHandler != null) {
+ this.successHandler.accept(record);
+ }
+ } catch (Throwable t) {
+ String sequenceNumber = record.getEventID();
+ LOGGER.error("Error while processing record with eventID {}: {}, adding it to batch item failures",
+ sequenceNumber, t.getMessage());
+ LOGGER.error("Error was", t);
+
+ batchFailures.add(new StreamsEventResponse.BatchItemFailure(record.getKinesis().getSequenceNumber()));
+
+ // Report failure if we have a handler
+ if (this.failureHandler != null) {
+ // A failing failure handler is no reason to fail the batch
+ try {
+ this.failureHandler.accept(record, t);
+ } catch (Throwable t2) {
+ LOGGER.warn("failureHandler threw handling failure", t2);
+ }
+ }
+ }
+ }
+
+ return new StreamsEventResponse(batchFailures);
+ }
+}
+
diff --git a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/SqsBatchMessageHandler.java b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/SqsBatchMessageHandler.java
new file mode 100644
index 000000000..b3c416a69
--- /dev/null
+++ b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/SqsBatchMessageHandler.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2023 Amazon.com, Inc. or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package software.amazon.lambda.powertools.batch.handler;
+
+import com.amazonaws.services.lambda.runtime.Context;
+import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse;
+import com.amazonaws.services.lambda.runtime.events.SQSEvent;
+import java.util.ArrayList;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.lambda.powertools.utilities.EventDeserializer;
+
+/**
+ * A batch message processor for SQS batches.
+ *
+ * @param The user-defined type of the message payload
+ * @see SQS Batch failure reporting
+ */
+public class SqsBatchMessageHandler implements BatchMessageHandler {
+ private final static Logger LOGGER = LoggerFactory.getLogger(SqsBatchMessageHandler.class);
+
+ // The attribute on an SQS-FIFO message used to record the message group ID
+ // https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#sample-fifo-queues-message-event
+ private final static String MESSAGE_GROUP_ID_KEY = "MessageGroupId";
+
+ private final Class messageClass;
+ private final BiConsumer messageHandler;
+ private final BiConsumer rawMessageHandler;
+ private final Consumer successHandler;
+ private final BiConsumer failureHandler;
+
+ public SqsBatchMessageHandler(BiConsumer messageHandler, Class messageClass,
+ BiConsumer rawMessageHandler,
+ Consumer successHandler,
+ BiConsumer failureHandler) {
+ this.messageHandler = messageHandler;
+ this.messageClass = messageClass;
+ this.rawMessageHandler = rawMessageHandler;
+ this.successHandler = successHandler;
+ this.failureHandler = failureHandler;
+ }
+
+ @Override
+ public SQSBatchResponse processBatch(SQSEvent event, Context context) {
+ SQSBatchResponse response = SQSBatchResponse.builder().withBatchItemFailures(new ArrayList<>()).build();
+
+ // If we are working on a FIFO queue, when any message fails we should stop processing and return the
+ // rest of the batch as failed too. We use this variable to track when that has happened.
+ // https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting
+ boolean failWholeBatch = false;
+
+ int messageCursor = 0;
+ for (; messageCursor < event.getRecords().size() && !failWholeBatch; messageCursor++) {
+ SQSEvent.SQSMessage message = event.getRecords().get(messageCursor);
+
+ String messageGroupId = message.getAttributes() != null ?
+ message.getAttributes().get(MESSAGE_GROUP_ID_KEY) : null;
+
+ try {
+ if (this.rawMessageHandler != null) {
+ rawMessageHandler.accept(message, context);
+ } else {
+ M messageDeserialized = EventDeserializer.extractDataFrom(message).as(messageClass);
+ messageHandler.accept(messageDeserialized, context);
+ }
+
+ // Report success if we have a handler
+ if (this.successHandler != null) {
+ this.successHandler.accept(message);
+ }
+
+ } catch (Throwable t) {
+ LOGGER.error("Error while processing message with messageId {}: {}, adding it to batch item failures",
+ message.getMessageId(), t.getMessage());
+ LOGGER.error("Error was", t);
+
+ response.getBatchItemFailures()
+ .add(SQSBatchResponse.BatchItemFailure.builder().withItemIdentifier(message.getMessageId())
+ .build());
+ if (messageGroupId != null) {
+ failWholeBatch = true;
+ LOGGER.info(
+ "A message in a batch with messageGroupId {} and messageId {} failed; failing the rest of the batch too"
+ , messageGroupId, message.getMessageId());
+ }
+
+ // Report failure if we have a handler
+ if (this.failureHandler != null) {
+ // A failing failure handler is no reason to fail the batch
+ try {
+ this.failureHandler.accept(message, t);
+ } catch (Throwable t2) {
+ LOGGER.warn("failureHandler threw handling failure", t2);
+ }
+ }
+
+ }
+ }
+
+ if (failWholeBatch) {
+ // Add the remaining messages to the batch item failures
+ event.getRecords()
+ .subList(messageCursor, event.getRecords().size())
+ .forEach(message -> response.getBatchItemFailures()
+ .add(SQSBatchResponse.BatchItemFailure.builder().withItemIdentifier(message.getMessageId())
+ .build()));
+ }
+ return response;
+ }
+}
diff --git a/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/DdbBatchProcessorTest.java b/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/DdbBatchProcessorTest.java
new file mode 100644
index 000000000..9e2c211e2
--- /dev/null
+++ b/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/DdbBatchProcessorTest.java
@@ -0,0 +1,127 @@
+/*
+ * Copyright 2023 Amazon.com, Inc. or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package software.amazon.lambda.powertools.batch;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.amazonaws.services.lambda.runtime.Context;
+import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
+import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
+import com.amazonaws.services.lambda.runtime.tests.annotations.Event;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.mockito.Mock;
+import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler;
+
+public class DdbBatchProcessorTest {
+
+ @Mock
+ private Context context;
+
+ private void processMessageSucceeds(DynamodbEvent.DynamodbStreamRecord record, Context context) {
+ // Great success
+ }
+
+ private void processMessageFailsForFixedMessage(DynamodbEvent.DynamodbStreamRecord record, Context context) {
+ if (record.getDynamodb().getSequenceNumber().equals("4421584500000000017450439091")) {
+ throw new RuntimeException("fake exception");
+ }
+ }
+
+ @ParameterizedTest
+ @Event(value = "dynamo_event.json", type = DynamodbEvent.class)
+ public void batchProcessingSucceedsAndReturns(DynamodbEvent event) {
+ // Arrange
+ BatchMessageHandler handler = new BatchMessageHandlerBuilder()
+ .withDynamoDbBatchHandler()
+ .buildWithRawMessageHandler(this::processMessageSucceeds);
+
+ // Act
+ StreamsEventResponse dynamodbBatchResponse = handler.processBatch(event, context);
+
+ // Assert
+ assertThat(dynamodbBatchResponse.getBatchItemFailures()).hasSize(0);
+ }
+
+ @ParameterizedTest
+ @Event(value = "dynamo_event.json", type = DynamodbEvent.class)
+ public void shouldAddMessageToBatchFailure_whenException_withMessage(DynamodbEvent event) {
+ // Arrange
+ BatchMessageHandler handler = new BatchMessageHandlerBuilder()
+ .withDynamoDbBatchHandler()
+ .buildWithRawMessageHandler(this::processMessageFailsForFixedMessage);
+
+ // Act
+ StreamsEventResponse dynamodbBatchResponse = handler.processBatch(event, context);
+
+ // Assert
+ assertThat(dynamodbBatchResponse.getBatchItemFailures()).hasSize(1);
+ StreamsEventResponse.BatchItemFailure batchItemFailure = dynamodbBatchResponse.getBatchItemFailures().get(0);
+ assertThat(batchItemFailure.getItemIdentifier()).isEqualTo("4421584500000000017450439091");
+ }
+
+ @ParameterizedTest
+ @Event(value = "dynamo_event.json", type = DynamodbEvent.class)
+ public void failingFailureHandlerShouldntFailBatch(DynamodbEvent event) {
+ // Arrange
+ AtomicBoolean wasCalledAndFailed = new AtomicBoolean(false);
+ BatchMessageHandler handler = new BatchMessageHandlerBuilder()
+ .withDynamoDbBatchHandler()
+ .withFailureHandler((m, e) -> {
+ if (m.getDynamodb().getSequenceNumber().equals("4421584500000000017450439091")) {
+ wasCalledAndFailed.set(true);
+ throw new RuntimeException("Success handler throws");
+ }
+ })
+ .buildWithRawMessageHandler(this::processMessageFailsForFixedMessage);
+
+ // Act
+ StreamsEventResponse dynamodbBatchResponse = handler.processBatch(event, context);
+
+ // Assert
+ assertThat(dynamodbBatchResponse).isNotNull();
+ assertThat(dynamodbBatchResponse.getBatchItemFailures().size()).isEqualTo(1);
+ assertThat(wasCalledAndFailed.get()).isTrue();
+ StreamsEventResponse.BatchItemFailure batchItemFailure = dynamodbBatchResponse.getBatchItemFailures().get(0);
+ assertThat(batchItemFailure.getItemIdentifier()).isEqualTo("4421584500000000017450439091");
+ }
+
+ @ParameterizedTest
+ @Event(value = "dynamo_event.json", type = DynamodbEvent.class)
+ public void failingSuccessHandlerShouldntFailBatchButShouldFailMessage(DynamodbEvent event) {
+ // Arrange
+ AtomicBoolean wasCalledAndFailed = new AtomicBoolean(false);
+ BatchMessageHandler handler = new BatchMessageHandlerBuilder()
+ .withDynamoDbBatchHandler()
+ .withSuccessHandler((e) -> {
+ if (e.getDynamodb().getSequenceNumber().equals("4421584500000000017450439091")) {
+ wasCalledAndFailed.set(true);
+ throw new RuntimeException("Success handler throws");
+ }
+ })
+ .buildWithRawMessageHandler(this::processMessageSucceeds);
+
+ // Act
+ StreamsEventResponse dynamodbBatchResponse = handler.processBatch(event, context);
+
+ // Assert
+ assertThat(dynamodbBatchResponse).isNotNull();
+ assertThat(dynamodbBatchResponse.getBatchItemFailures().size()).isEqualTo(1);
+ assertThat(wasCalledAndFailed.get()).isTrue();
+ StreamsEventResponse.BatchItemFailure batchItemFailure = dynamodbBatchResponse.getBatchItemFailures().get(0);
+ assertThat(batchItemFailure.getItemIdentifier()).isEqualTo("4421584500000000017450439091");
+ }
+
+}
diff --git a/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/KinesisBatchProcessorTest.java b/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/KinesisBatchProcessorTest.java
new file mode 100644
index 000000000..d78638e1d
--- /dev/null
+++ b/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/KinesisBatchProcessorTest.java
@@ -0,0 +1,156 @@
+/*
+ * Copyright 2023 Amazon.com, Inc. or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package software.amazon.lambda.powertools.batch;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.amazonaws.services.lambda.runtime.Context;
+import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
+import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
+import com.amazonaws.services.lambda.runtime.tests.annotations.Event;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.mockito.Mock;
+import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler;
+import software.amazon.lambda.powertools.batch.model.Product;
+
+public class KinesisBatchProcessorTest {
+
+ @Mock
+ private Context context;
+
+ private void processMessageSucceeds(KinesisEvent.KinesisEventRecord record, Context context) {
+ // Great success
+ }
+
+ private void processMessageFailsForFixedMessage(KinesisEvent.KinesisEventRecord record, Context context) {
+ if (record.getKinesis().getSequenceNumber()
+ .equals("49545115243490985018280067714973144582180062593244200961")) {
+ throw new RuntimeException("fake exception");
+ }
+ }
+
+ // A handler that throws an exception for _one_ of the deserialized products in the same messages
+ public void processMessageFailsForFixedProduct(Product product, Context context) {
+ if (product.getId() == 1234) {
+ throw new RuntimeException("fake exception");
+ }
+ }
+
+ @ParameterizedTest
+ @Event(value = "kinesis_event.json", type = KinesisEvent.class)
+ public void batchProcessingSucceedsAndReturns(KinesisEvent event) {
+ // Arrange
+ BatchMessageHandler handler = new BatchMessageHandlerBuilder()
+ .withKinesisBatchHandler()
+ .buildWithRawMessageHandler(this::processMessageSucceeds);
+
+ // Act
+ StreamsEventResponse kinesisBatchResponse = handler.processBatch(event, context);
+
+ // Assert
+ assertThat(kinesisBatchResponse.getBatchItemFailures()).hasSize(0);
+ }
+
+ @ParameterizedTest
+ @Event(value = "kinesis_event.json", type = KinesisEvent.class)
+ public void shouldAddMessageToBatchFailure_whenException_withMessage(KinesisEvent event) {
+ // Arrange
+ BatchMessageHandler handler = new BatchMessageHandlerBuilder()
+ .withKinesisBatchHandler()
+ .buildWithRawMessageHandler(this::processMessageFailsForFixedMessage);
+
+ // Act
+ StreamsEventResponse kinesisBatchResponse = handler.processBatch(event, context);
+
+ // Assert
+ assertThat(kinesisBatchResponse.getBatchItemFailures()).hasSize(1);
+ StreamsEventResponse.BatchItemFailure batchItemFailure = kinesisBatchResponse.getBatchItemFailures().get(0);
+ assertThat(batchItemFailure.getItemIdentifier()).isEqualTo(
+ "49545115243490985018280067714973144582180062593244200961");
+ }
+
+ @ParameterizedTest
+ @Event(value = "kinesis_event.json", type = KinesisEvent.class)
+ public void shouldAddMessageToBatchFailure_whenException_withProduct(KinesisEvent event) {
+ // Arrange
+ BatchMessageHandler handler = new BatchMessageHandlerBuilder()
+ .withKinesisBatchHandler()
+ .buildWithMessageHandler(this::processMessageFailsForFixedProduct, Product.class);
+
+ // Act
+ StreamsEventResponse kinesisBatchResponse = handler.processBatch(event, context);
+
+ // Assert
+ assertThat(kinesisBatchResponse.getBatchItemFailures()).hasSize(1);
+ StreamsEventResponse.BatchItemFailure batchItemFailure = kinesisBatchResponse.getBatchItemFailures().get(0);
+ assertThat(batchItemFailure.getItemIdentifier()).isEqualTo(
+ "49545115243490985018280067714973144582180062593244200961");
+ }
+
+ @ParameterizedTest
+ @Event(value = "kinesis_event.json", type = KinesisEvent.class)
+ public void failingFailureHandlerShouldntFailBatch(KinesisEvent event) {
+ // Arrange
+ AtomicBoolean wasCalled = new AtomicBoolean(false);
+ BatchMessageHandler handler = new BatchMessageHandlerBuilder()
+ .withKinesisBatchHandler()
+ .withFailureHandler((e, ex) -> {
+ wasCalled.set(true);
+ throw new RuntimeException("Well, this doesn't look great");
+ })
+ .buildWithMessageHandler(this::processMessageFailsForFixedProduct, Product.class);
+
+ // Act
+ StreamsEventResponse kinesisBatchResponse = handler.processBatch(event, context);
+
+ // Assert
+ assertThat(kinesisBatchResponse).isNotNull();
+ assertThat(kinesisBatchResponse.getBatchItemFailures().size()).isEqualTo(1);
+ assertThat(wasCalled.get()).isTrue();
+ StreamsEventResponse.BatchItemFailure batchItemFailure = kinesisBatchResponse.getBatchItemFailures().get(0);
+ assertThat(batchItemFailure.getItemIdentifier()).isEqualTo(
+ "49545115243490985018280067714973144582180062593244200961");
+ }
+
+ @ParameterizedTest
+ @Event(value = "kinesis_event.json", type = KinesisEvent.class)
+ public void failingSuccessHandlerShouldntFailBatchButShouldFailMessage(KinesisEvent event) {
+ // Arrange
+ AtomicBoolean wasCalledAndFailed = new AtomicBoolean(false);
+ BatchMessageHandler handler = new BatchMessageHandlerBuilder()
+ .withKinesisBatchHandler()
+ .withSuccessHandler((e) -> {
+ if (e.getKinesis().getSequenceNumber()
+ .equals("49545115243490985018280067714973144582180062593244200961")) {
+ wasCalledAndFailed.set(true);
+ throw new RuntimeException("Success handler throws");
+ }
+ })
+ .buildWithRawMessageHandler(this::processMessageSucceeds);
+
+ // Act
+ StreamsEventResponse kinesisBatchResponse = handler.processBatch(event, context);
+
+ // Assert
+ assertThat(kinesisBatchResponse).isNotNull();
+ assertThat(kinesisBatchResponse.getBatchItemFailures().size()).isEqualTo(1);
+ assertThat(wasCalledAndFailed.get()).isTrue();
+ StreamsEventResponse.BatchItemFailure batchItemFailure = kinesisBatchResponse.getBatchItemFailures().get(0);
+ assertThat(batchItemFailure.getItemIdentifier()).isEqualTo(
+ "49545115243490985018280067714973144582180062593244200961");
+ }
+
+}
diff --git a/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/SQSBatchProcessorTest.java b/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/SQSBatchProcessorTest.java
new file mode 100644
index 000000000..2f9429fa3
--- /dev/null
+++ b/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/SQSBatchProcessorTest.java
@@ -0,0 +1,171 @@
+/*
+ * Copyright 2023 Amazon.com, Inc. or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package software.amazon.lambda.powertools.batch;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.amazonaws.services.lambda.runtime.Context;
+import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse;
+import com.amazonaws.services.lambda.runtime.events.SQSEvent;
+import com.amazonaws.services.lambda.runtime.tests.annotations.Event;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.mockito.Mock;
+import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler;
+import software.amazon.lambda.powertools.batch.model.Product;
+
+public class SQSBatchProcessorTest {
+ @Mock
+ private Context context;
+
+ // A handler that works
+ private void processMessageSucceeds(SQSEvent.SQSMessage sqsMessage) {
+ }
+
+ // A handler that throws an exception for _one_ of the sample messages
+ private void processMessageFailsForFixedMessage(SQSEvent.SQSMessage message, Context context) {
+ if (message.getMessageId().equals("e9144555-9a4f-4ec3-99a0-34ce359b4b54")) {
+ throw new RuntimeException("fake exception");
+ }
+ }
+
+ // A handler that throws an exception for _one_ of the deserialized products in the same messages
+ public void processMessageFailsForFixedProduct(Product product, Context context) {
+ if (product.getId() == 12345) {
+ throw new RuntimeException("fake exception");
+ }
+ }
+
+ @ParameterizedTest
+ @Event(value = "sqs_event.json", type = SQSEvent.class)
+ public void batchProcessingSucceedsAndReturns(SQSEvent event) {
+ // Arrange
+ BatchMessageHandler handler = new BatchMessageHandlerBuilder()
+ .withSqsBatchHandler()
+ .buildWithRawMessageHandler(this::processMessageSucceeds);
+
+ // Act
+ SQSBatchResponse sqsBatchResponse = handler.processBatch(event, context);
+
+ // Assert
+ assertThat(sqsBatchResponse.getBatchItemFailures()).hasSize(0);
+ }
+
+
+ @ParameterizedTest
+ @Event(value = "sqs_event.json", type = SQSEvent.class)
+ public void shouldAddMessageToBatchFailure_whenException_withMessage(SQSEvent event) {
+ // Arrange
+ BatchMessageHandler handler = new BatchMessageHandlerBuilder()
+ .withSqsBatchHandler()
+ .buildWithRawMessageHandler(this::processMessageFailsForFixedMessage);
+
+ // Act
+ SQSBatchResponse sqsBatchResponse = handler.processBatch(event, context);
+
+ // Assert
+ assertThat(sqsBatchResponse.getBatchItemFailures()).hasSize(1);
+ SQSBatchResponse.BatchItemFailure batchItemFailure = sqsBatchResponse.getBatchItemFailures().get(0);
+ assertThat(batchItemFailure.getItemIdentifier()).isEqualTo("e9144555-9a4f-4ec3-99a0-34ce359b4b54");
+ }
+
+ @ParameterizedTest
+ @Event(value = "sqs_fifo_event.json", type = SQSEvent.class)
+ public void shouldAddMessageToBatchFailure_whenException_withSQSFIFO(SQSEvent event) {
+ // Arrange
+ BatchMessageHandler handler = new BatchMessageHandlerBuilder()
+ .withSqsBatchHandler()
+ .buildWithRawMessageHandler(this::processMessageFailsForFixedMessage);
+
+ // Act
+ SQSBatchResponse sqsBatchResponse = handler.processBatch(event, context);
+
+ // Assert
+ assertThat(sqsBatchResponse.getBatchItemFailures()).hasSize(2);
+ SQSBatchResponse.BatchItemFailure batchItemFailure = sqsBatchResponse.getBatchItemFailures().get(0);
+ assertThat(batchItemFailure.getItemIdentifier()).isEqualTo("e9144555-9a4f-4ec3-99a0-34ce359b4b54");
+ batchItemFailure = sqsBatchResponse.getBatchItemFailures().get(1);
+ assertThat(batchItemFailure.getItemIdentifier()).isEqualTo("f9144555-9a4f-4ec3-99a0-34ce359b4b54");
+ }
+
+
+ @ParameterizedTest
+ @Event(value = "sqs_event.json", type = SQSEvent.class)
+ public void shouldAddMessageToBatchFailure_whenException_withProduct(SQSEvent event) {
+
+ // Arrange
+ BatchMessageHandler handler = new BatchMessageHandlerBuilder()
+ .withSqsBatchHandler()
+ .buildWithMessageHandler(this::processMessageFailsForFixedProduct, Product.class);
+
+ // Act
+ SQSBatchResponse sqsBatchResponse = handler.processBatch(event, context);
+ assertThat(sqsBatchResponse.getBatchItemFailures()).hasSize(1);
+
+ // Assert
+ SQSBatchResponse.BatchItemFailure batchItemFailure = sqsBatchResponse.getBatchItemFailures().get(0);
+ assertThat(batchItemFailure.getItemIdentifier()).isEqualTo("e9144555-9a4f-4ec3-99a0-34ce359b4b54");
+ }
+
+ @ParameterizedTest
+ @Event(value = "sqs_event.json", type = SQSEvent.class)
+ public void failingFailureHandlerShouldntFailBatch(SQSEvent event) {
+ // Arrange
+ AtomicBoolean wasCalled = new AtomicBoolean(false);
+ BatchMessageHandler handler = new BatchMessageHandlerBuilder()
+ .withSqsBatchHandler()
+ .withFailureHandler((e, ex) -> {
+ wasCalled.set(true);
+ throw new RuntimeException("Well, this doesn't look great");
+ })
+ .buildWithMessageHandler(this::processMessageFailsForFixedProduct, Product.class);
+
+ // Act
+ SQSBatchResponse sqsBatchResponse = handler.processBatch(event, context);
+
+ // Assert
+ assertThat(sqsBatchResponse).isNotNull();
+ assertThat(wasCalled.get()).isTrue();
+ SQSBatchResponse.BatchItemFailure batchItemFailure = sqsBatchResponse.getBatchItemFailures().get(0);
+ assertThat(batchItemFailure.getItemIdentifier()).isEqualTo("e9144555-9a4f-4ec3-99a0-34ce359b4b54");
+ }
+
+ @ParameterizedTest
+ @Event(value = "sqs_event.json", type = SQSEvent.class)
+ public void failingSuccessHandlerShouldntFailBatchButShouldFailMessage(SQSEvent event) {
+ // Arrange
+ AtomicBoolean wasCalledAndFailed = new AtomicBoolean(false);
+ BatchMessageHandler handler = new BatchMessageHandlerBuilder()
+ .withSqsBatchHandler()
+ .withSuccessHandler((e) -> {
+ if (e.getMessageId().equals("e9144555-9a4f-4ec3-99a0-34ce359b4b54")) {
+ wasCalledAndFailed.set(true);
+ throw new RuntimeException("Success handler throws");
+ }
+ })
+ .buildWithRawMessageHandler(this::processMessageSucceeds);
+
+ // Act
+ SQSBatchResponse sqsBatchResponse = handler.processBatch(event, context);
+
+ // Assert
+ assertThat(sqsBatchResponse).isNotNull();
+ assertThat(wasCalledAndFailed.get()).isTrue();
+ SQSBatchResponse.BatchItemFailure batchItemFailure = sqsBatchResponse.getBatchItemFailures().get(0);
+ assertThat(batchItemFailure.getItemIdentifier()).isEqualTo("e9144555-9a4f-4ec3-99a0-34ce359b4b54");
+ }
+
+
+}
diff --git a/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/model/Basket.java b/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/model/Basket.java
new file mode 100644
index 000000000..6009e79d6
--- /dev/null
+++ b/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/model/Basket.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2022 Amazon.com, Inc. or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package software.amazon.lambda.powertools.batch.model;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+public class Basket {
+ private List products = new ArrayList<>();
+
+ public Basket() {
+ }
+
+ public Basket(Product... p) {
+ products.addAll(Arrays.asList(p));
+ }
+
+ public List getProducts() {
+ return products;
+ }
+
+ public void setProducts(List products) {
+ this.products = products;
+ }
+
+ public void add(Product product) {
+ products.add(product);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Basket basket = (Basket) o;
+ return products.equals(basket.products);
+ }
+
+ @Override
+ public String toString() {
+ return "Basket{" +
+ "products=" + products +
+ '}';
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(products);
+ }
+}
diff --git a/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/model/Product.java b/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/model/Product.java
new file mode 100644
index 000000000..2695578f9
--- /dev/null
+++ b/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/model/Product.java
@@ -0,0 +1,84 @@
+package software.amazon.lambda.powertools.batch.model;
+
+/*
+ * Copyright 2022 Amazon.com, Inc. or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import java.util.Objects;
+
+public class Product {
+ private long id;
+
+ private String name;
+
+ private double price;
+
+ public Product() {
+ }
+
+ public Product(long id, String name, double price) {
+ this.id = id;
+ this.name = name;
+ this.price = price;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public void setId(long id) {
+ this.id = id;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public double getPrice() {
+ return price;
+ }
+
+ public void setPrice(double price) {
+ this.price = price;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Product product = (Product) o;
+ return id == product.id && Double.compare(product.price, price) == 0 && Objects.equals(name, product.name);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, name, price);
+ }
+
+ @Override
+ public String toString() {
+ return "Product{" +
+ "id=" + id +
+ ", name='" + name + '\'' +
+ ", price=" + price +
+ '}';
+ }
+}
diff --git a/powertools-batch/src/test/resources/dynamo_event.json b/powertools-batch/src/test/resources/dynamo_event.json
new file mode 100644
index 000000000..f28ce0e6e
--- /dev/null
+++ b/powertools-batch/src/test/resources/dynamo_event.json
@@ -0,0 +1,97 @@
+{
+ "Records": [
+ {
+ "eventID": "c4ca4238a0b923820dcc509a6f75849b",
+ "eventName": "INSERT",
+ "eventVersion": "1.1",
+ "eventSource": "aws:dynamodb",
+ "awsRegion": "eu-central-1",
+ "dynamodb": {
+ "Keys": {
+ "Id": {
+ "N": "101"
+ }
+ },
+ "NewImage": {
+ "Message": {
+ "S": "New item!"
+ },
+ "Id": {
+ "N": "101"
+ }
+ },
+ "ApproximateCreationDateTime": 1428537600,
+ "SequenceNumber": "4421584500000000017450439091",
+ "SizeBytes": 26,
+ "StreamViewType": "NEW_AND_OLD_IMAGES"
+ },
+ "eventSourceARN": "arn:aws:dynamodb:eu-central-1:123456789012:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899",
+ "userIdentity": {
+ "principalId": "dynamodb.amazonaws.com",
+ "type": "Service"
+ }
+ },
+ {
+ "eventID": "c81e728d9d4c2f636f067f89cc14862c",
+ "eventName": "MODIFY",
+ "eventVersion": "1.1",
+ "eventSource": "aws:dynamodb",
+ "awsRegion": "eu-central-1",
+ "dynamodb": {
+ "Keys": {
+ "Id": {
+ "N": "101"
+ }
+ },
+ "NewImage": {
+ "Message": {
+ "S": "This item has changed"
+ },
+ "Id": {
+ "N": "101"
+ }
+ },
+ "OldImage": {
+ "Message": {
+ "S": "New item!"
+ },
+ "Id": {
+ "N": "101"
+ }
+ },
+ "ApproximateCreationDateTime": 1428537600,
+ "SequenceNumber": "4421584500000000017450439092",
+ "SizeBytes": 59,
+ "StreamViewType": "NEW_AND_OLD_IMAGES"
+ },
+ "eventSourceARN": "arn:aws:dynamodb:eu-central-1:123456789012:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899"
+ },
+ {
+ "eventID": "eccbc87e4b5ce2fe28308fd9f2a7baf3",
+ "eventName": "REMOVE",
+ "eventVersion": "1.1",
+ "eventSource": "aws:dynamodb",
+ "awsRegion": "eu-central-1",
+ "dynamodb": {
+ "Keys": {
+ "Id": {
+ "N": "101"
+ }
+ },
+ "OldImage": {
+ "Message": {
+ "S": "This item has changed"
+ },
+ "Id": {
+ "N": "101"
+ }
+ },
+ "ApproximateCreationDateTime": 1428537600,
+ "SequenceNumber": "4421584500000000017450439093",
+ "SizeBytes": 38,
+ "StreamViewType": "NEW_AND_OLD_IMAGES"
+ },
+ "eventSourceARN": "arn:aws:dynamodb:eu-central-1:123456789012:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899"
+ }
+ ]
+}
\ No newline at end of file
diff --git a/powertools-batch/src/test/resources/kinesis_event.json b/powertools-batch/src/test/resources/kinesis_event.json
new file mode 100644
index 000000000..c9068da9b
--- /dev/null
+++ b/powertools-batch/src/test/resources/kinesis_event.json
@@ -0,0 +1,38 @@
+{
+ "Records": [
+ {
+ "kinesis": {
+ "partitionKey": "partitionKey-03",
+ "kinesisSchemaVersion": "1.0",
+ "data": "eyJpZCI6MTIzNCwgIm5hbWUiOiJwcm9kdWN0IiwgInByaWNlIjo0Mn0=",
+ "sequenceNumber": "49545115243490985018280067714973144582180062593244200961",
+ "approximateArrivalTimestamp": 1428537600,
+ "encryptionType": "NONE"
+ },
+ "eventSource": "aws:kinesis",
+ "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200961",
+ "invokeIdentityArn": "arn:aws:iam::EXAMPLE",
+ "eventVersion": "1.0",
+ "eventName": "aws:kinesis:record",
+ "eventSourceARN": "arn:aws:kinesis:EXAMPLE",
+ "awsRegion": "eu-central-1"
+ },
+ {
+ "kinesis": {
+ "partitionKey": "partitionKey-03",
+ "kinesisSchemaVersion": "1.0",
+ "data": "eyJpZCI6MTIzNDUsICJuYW1lIjoicHJvZHVjdDUiLCAicHJpY2UiOjQ1fQ==",
+ "sequenceNumber": "49545115243490985018280067714973144582180062593244200962",
+ "approximateArrivalTimestamp": 1428537600,
+ "encryptionType": "NONE"
+ },
+ "eventSource": "aws:kinesis",
+ "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200961",
+ "invokeIdentityArn": "arn:aws:iam::EXAMPLE",
+ "eventVersion": "1.0",
+ "eventName": "aws:kinesis:record",
+ "eventSourceARN": "arn:aws:kinesis:EXAMPLE",
+ "awsRegion": "eu-central-1"
+ }
+ ]
+}
\ No newline at end of file
diff --git a/powertools-batch/src/test/resources/sqs_event.json b/powertools-batch/src/test/resources/sqs_event.json
new file mode 100644
index 000000000..7fdad096f
--- /dev/null
+++ b/powertools-batch/src/test/resources/sqs_event.json
@@ -0,0 +1,55 @@
+{
+ "Records": [
+ {
+ "messageId": "d9144555-9a4f-4ec3-99a0-34ce359b4b54",
+ "receiptHandle": "13e7f7851d2eaa5c01f208ebadbf1e72==",
+ "body": "{\n \"id\": 1234,\n \"name\": \"product\",\n \"price\": 42\n}",
+ "attributes": {
+ "ApproximateReceiveCount": "1",
+ "SentTimestamp": "1601975706495",
+ "SenderId": "AROAIFU437PVZ5L2J53F5",
+ "ApproximateFirstReceiveTimestamp": "1601975706499"
+ },
+ "messageAttributes": {
+ },
+ "md5OfBody": "13e7f7851d2eaa5c01f208ebadbf1e72",
+ "eventSource": "aws:sqs",
+ "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda",
+ "awsRegion": "eu-central-1"
+ },
+ {
+ "messageId": "e9144555-9a4f-4ec3-99a0-34ce359b4b54",
+ "receiptHandle": "13e7f7851d2eaa5c01f208ebadbf1e72==",
+ "body": "{\n \"id\": 12345,\n \"name\": \"product5\",\n \"price\": 45\n}",
+ "attributes": {
+ "ApproximateReceiveCount": "1",
+ "SentTimestamp": "1601975706495",
+ "SenderId": "AROAIFU437PVZ5L2J53F5",
+ "ApproximateFirstReceiveTimestamp": "1601975706499"
+ },
+ "messageAttributes": {
+ },
+ "md5OfBody": "13e7f7851d2eaa5c01f208ebadbf1e72",
+ "eventSource": "aws:sqs",
+ "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda",
+ "awsRegion": "eu-central-1"
+ },
+ {
+ "messageId": "f9144555-9a4f-4ec3-99a0-34ce359b4b54",
+ "receiptHandle": "13e7f7851d2eaa5c01f208ebadbf1e72==",
+ "body": "{\n \"id\": 123456,\n \"name\": \"product6\",\n \"price\": 46\n}",
+ "attributes": {
+ "ApproximateReceiveCount": "1",
+ "SentTimestamp": "1601975706495",
+ "SenderId": "AROAIFU437PVZ5L2J53F5",
+ "ApproximateFirstReceiveTimestamp": "1601975706499"
+ },
+ "messageAttributes": {
+ },
+ "md5OfBody": "13e7f7851d2eaa5c01f208ebadbf1e72",
+ "eventSource": "aws:sqs",
+ "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda",
+ "awsRegion": "eu-central-1"
+ }
+ ]
+}
\ No newline at end of file
diff --git a/powertools-batch/src/test/resources/sqs_fifo_event.json b/powertools-batch/src/test/resources/sqs_fifo_event.json
new file mode 100644
index 000000000..e5abb1e5a
--- /dev/null
+++ b/powertools-batch/src/test/resources/sqs_fifo_event.json
@@ -0,0 +1,58 @@
+{
+ "Records": [
+ {
+ "messageId": "d9144555-9a4f-4ec3-99a0-34ce359b4b54",
+ "receiptHandle": "13e7f7851d2eaa5c01f208ebadbf1e72==",
+ "body": "{\n \"id\": 1234,\n \"name\": \"product\",\n \"price\": 42\n}",
+ "attributes": {
+ "MessageGroupId": "groupA",
+ "ApproximateReceiveCount": "1",
+ "SentTimestamp": "1601975706495",
+ "SenderId": "AROAIFU437PVZ5L2J53F5",
+ "ApproximateFirstReceiveTimestamp": "1601975706499"
+ },
+ "messageAttributes": {
+ },
+ "md5OfBody": "13e7f7851d2eaa5c01f208ebadbf1e72",
+ "eventSource": "aws:sqs",
+ "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda",
+ "awsRegion": "eu-central-1"
+ },
+ {
+ "messageId": "e9144555-9a4f-4ec3-99a0-34ce359b4b54",
+ "receiptHandle": "13e7f7851d2eaa5c01f208ebadbf1e72==",
+ "body": "{\n \"id\": 12345,\n \"name\": \"product5\",\n \"price\": 45\n}",
+ "attributes": {
+ "MessageGroupId": "groupA",
+ "ApproximateReceiveCount": "1",
+ "SentTimestamp": "1601975706495",
+ "SenderId": "AROAIFU437PVZ5L2J53F5",
+ "ApproximateFirstReceiveTimestamp": "1601975706499"
+ },
+ "messageAttributes": {
+ },
+ "md5OfBody": "13e7f7851d2eaa5c01f208ebadbf1e72",
+ "eventSource": "aws:sqs",
+ "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda",
+ "awsRegion": "eu-central-1"
+ },
+ {
+ "messageId": "f9144555-9a4f-4ec3-99a0-34ce359b4b54",
+ "receiptHandle": "13e7f7851d2eaa5c01f208ebadbf1e72==",
+ "body": "{\n \"id\": 123456,\n \"name\": \"product6\",\n \"price\": 46\n}",
+ "attributes": {
+ "MessageGroupId": "groupA",
+ "ApproximateReceiveCount": "1",
+ "SentTimestamp": "1601975706495",
+ "SenderId": "AROAIFU437PVZ5L2J53F5",
+ "ApproximateFirstReceiveTimestamp": "1601975706499"
+ },
+ "messageAttributes": {
+ },
+ "md5OfBody": "13e7f7851d2eaa5c01f208ebadbf1e72",
+ "eventSource": "aws:sqs",
+ "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda",
+ "awsRegion": "eu-central-1"
+ }
+ ]
+}
\ No newline at end of file
diff --git a/powertools-e2e-tests/handlers/batch/pom.xml b/powertools-e2e-tests/handlers/batch/pom.xml
new file mode 100644
index 000000000..995121e2a
--- /dev/null
+++ b/powertools-e2e-tests/handlers/batch/pom.xml
@@ -0,0 +1,72 @@
+
+ 4.0.0
+
+
+ software.amazon.lambda
+ e2e-test-handlers-parent
+ 1.0.0
+
+
+ e2e-test-handler-batch
+ jar
+ A Lambda function using Powertools for AWS Lambda (Java) batch
+
+
+
+ software.amazon.lambda
+ powertools-batch
+
+
+ software.amazon.lambda
+ powertools-logging
+
+
+ com.amazonaws
+ aws-lambda-java-events
+
+
+ com.amazonaws
+ aws-lambda-java-serialization
+
+
+ org.apache.logging.log4j
+ log4j-slf4j2-impl
+
+
+ software.amazon.awssdk
+ dynamodb
+
+
+
+
+
+
+ dev.aspectj
+ aspectj-maven-plugin
+
+ ${maven.compiler.source}
+ ${maven.compiler.target}
+ ${maven.compiler.target}
+
+
+ software.amazon.lambda
+ powertools-logging
+
+
+
+
+
+
+ compile
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+
+
+
diff --git a/powertools-e2e-tests/handlers/batch/src/main/java/software/amazon/lambda/powertools/e2e/Function.java b/powertools-e2e-tests/handlers/batch/src/main/java/software/amazon/lambda/powertools/e2e/Function.java
new file mode 100644
index 000000000..64f5a02c2
--- /dev/null
+++ b/powertools-e2e-tests/handlers/batch/src/main/java/software/amazon/lambda/powertools/e2e/Function.java
@@ -0,0 +1,172 @@
+/*
+ * Copyright 2023 Amazon.com, Inc. or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package software.amazon.lambda.powertools.e2e;
+
+import com.amazonaws.lambda.thirdparty.com.fasterxml.jackson.databind.ObjectMapper;
+import com.amazonaws.services.lambda.runtime.Context;
+import com.amazonaws.services.lambda.runtime.RequestHandler;
+import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
+import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
+import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse;
+import com.amazonaws.services.lambda.runtime.events.SQSEvent;
+import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
+import com.amazonaws.services.lambda.runtime.serialization.PojoSerializer;
+import com.amazonaws.services.lambda.runtime.serialization.events.LambdaEventSerializers;
+import com.amazonaws.services.lambda.runtime.serialization.factories.JacksonFactory;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.JsonNode;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.util.IOUtils;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.lambda.powertools.batch.BatchMessageHandlerBuilder;
+import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler;
+import software.amazon.lambda.powertools.e2e.model.Product;
+import software.amazon.lambda.powertools.logging.Logging;
+import software.amazon.lambda.powertools.utilities.JsonConfig;
+
+import javax.management.Attribute;
+
+
+public class Function implements RequestHandler {
+
+ private final static Logger LOGGER = LogManager.getLogger(Function.class);
+
+ private final BatchMessageHandler sqsHandler;
+ private final BatchMessageHandler kinesisHandler;
+ private final BatchMessageHandler ddbHandler;
+ private final String ddbOutputTable;
+ private DynamoDbClient ddbClient;
+
+ public Function() {
+ sqsHandler = new BatchMessageHandlerBuilder()
+ .withSqsBatchHandler()
+ .buildWithMessageHandler(this::processProductMessage, Product.class);
+
+ kinesisHandler = new BatchMessageHandlerBuilder()
+ .withKinesisBatchHandler()
+ .buildWithMessageHandler(this::processProductMessage, Product.class);
+
+ ddbHandler = new BatchMessageHandlerBuilder()
+ .withDynamoDbBatchHandler()
+ .buildWithRawMessageHandler(this::processDdbMessage);
+
+ this.ddbOutputTable = System.getenv("TABLE_FOR_ASYNC_TESTS");
+ }
+
+ private void processProductMessage(Product p, Context c) {
+ LOGGER.info("Processing product " + p);
+
+ // TODO - write product details to output table
+ ddbClient = DynamoDbClient.builder()
+ .build();
+ Map results = new HashMap<>();
+ results.put("functionName", AttributeValue.builder()
+ .s(c.getFunctionName())
+ .build());
+ results.put("id", AttributeValue.builder()
+ .s(Long.toString(p.getId()))
+ .build());
+ results.put("name", AttributeValue.builder()
+ .s(p.getName())
+ .build());
+ results.put("price", AttributeValue.builder()
+ .n(Double.toString(p.getPrice()))
+ .build());
+ ddbClient.putItem(PutItemRequest.builder()
+ .tableName(ddbOutputTable)
+ .item(results)
+ .build());
+ }
+
+ private void processDdbMessage(DynamodbEvent.DynamodbStreamRecord dynamodbStreamRecord, Context context) {
+ LOGGER.info("Processing DynamoDB Stream Record" + dynamodbStreamRecord);
+
+ ddbClient = DynamoDbClient.builder()
+ .build();
+
+ String id = dynamodbStreamRecord.getDynamodb().getKeys().get("id").getS();
+ LOGGER.info("Incoming ID is " + id);
+
+ Map results = new HashMap<>();
+ results.put("functionName", AttributeValue.builder()
+ .s(context.getFunctionName())
+ .build());
+ results.put("id", AttributeValue.builder()
+ .s(id)
+ .build());
+
+ ddbClient.putItem(PutItemRequest.builder()
+ .tableName(ddbOutputTable)
+ .item(results)
+ .build());
+ }
+
+ public Object createResult(String input, Context context) {
+
+ LOGGER.info(input);
+
+ PojoSerializer serializer =
+ LambdaEventSerializers.serializerFor(SQSEvent.class, this.getClass().getClassLoader());
+ SQSEvent event = serializer.fromJson(input);
+ if (event.getRecords().get(0).getEventSource().equals("aws:sqs")) {
+ LOGGER.info("Running for SQS");
+ LOGGER.info(event);
+ return sqsHandler.processBatch(event, context);
+ }
+
+ PojoSerializer kinesisSerializer =
+ LambdaEventSerializers.serializerFor(KinesisEvent.class, this.getClass().getClassLoader());
+ KinesisEvent kinesisEvent = kinesisSerializer.fromJson(input);
+ if (kinesisEvent.getRecords().get(0).getEventSource().equals("aws:kinesis")) {
+ LOGGER.info("Running for Kinesis");
+ return kinesisHandler.processBatch(kinesisEvent, context);
+ }
+
+ // Well, let's try dynamo
+ PojoSerializer ddbSerializer =
+ LambdaEventSerializers.serializerFor(DynamodbEvent.class, this.getClass().getClassLoader());
+ LOGGER.info("Running for DynamoDB");
+ DynamodbEvent ddbEvent = ddbSerializer.fromJson(input);
+ return ddbHandler.processBatch(ddbEvent, context);
+ }
+
+ @Override
+ public Object handleRequest(InputStream inputStream, Context context) {
+
+ String input = new BufferedReader(
+ new InputStreamReader(inputStream, StandardCharsets.UTF_8))
+ .lines()
+ .collect(Collectors.joining("\n"));
+
+ return createResult(input, context);
+ }
+}
diff --git a/powertools-e2e-tests/handlers/batch/src/main/java/software/amazon/lambda/powertools/e2e/model/Product.java b/powertools-e2e-tests/handlers/batch/src/main/java/software/amazon/lambda/powertools/e2e/model/Product.java
new file mode 100644
index 000000000..74bb5ff9f
--- /dev/null
+++ b/powertools-e2e-tests/handlers/batch/src/main/java/software/amazon/lambda/powertools/e2e/model/Product.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2023 Amazon.com, Inc. or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package software.amazon.lambda.powertools.e2e.model;
+
+public class Product {
+ private long id;
+
+ private String name;
+
+ private double price;
+
+ public Product() {
+ }
+
+ public Product(long id, String name, double price) {
+ this.id = id;
+ this.name = name;
+ this.price = price;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public void setId(long id) {
+ this.id = id;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public double getPrice() {
+ return price;
+ }
+
+ public void setPrice(double price) {
+ this.price = price;
+ }
+}
diff --git a/powertools-e2e-tests/handlers/batch/src/main/resources/log4j2.xml b/powertools-e2e-tests/handlers/batch/src/main/resources/log4j2.xml
new file mode 100644
index 000000000..8925f70b9
--- /dev/null
+++ b/powertools-e2e-tests/handlers/batch/src/main/resources/log4j2.xml
@@ -0,0 +1,16 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/powertools-e2e-tests/handlers/pom.xml b/powertools-e2e-tests/handlers/pom.xml
index 4dd8cbb45..6e82c7aec 100644
--- a/powertools-e2e-tests/handlers/pom.xml
+++ b/powertools-e2e-tests/handlers/pom.xml
@@ -16,6 +16,7 @@
1.8
1.2.2
+ 1.1.2
3.11.2
3.5.0
1.13.1
@@ -72,6 +73,11 @@
powertools-large-messages
${lambda.powertools.version}
+
+ software.amazon.lambda
+ powertools-batch
+ ${lambda.powertools.version}
+
com.amazonaws
aws-lambda-java-core
@@ -82,6 +88,11 @@
aws-lambda-java-events
${lambda.java.events}
+
+ com.amazonaws
+ aws-lambda-java-serialization
+ ${lambda.java.serialization}
+
org.apache.logging.log4j
log4j-slf4j2-impl
diff --git a/powertools-e2e-tests/pom.xml b/powertools-e2e-tests/pom.xml
index 2c802edc3..f3194c163 100644
--- a/powertools-e2e-tests/pom.xml
+++ b/powertools-e2e-tests/pom.xml
@@ -57,7 +57,12 @@
${aws.sdk.version}
test
-
+
+ software.amazon.awssdk
+ kinesis
+ ${aws.sdk.version}
+ test
+
software.amazon.awssdk
cloudwatch
diff --git a/powertools-e2e-tests/src/test/java/software/amazon/lambda/powertools/BatchE2ET.java b/powertools-e2e-tests/src/test/java/software/amazon/lambda/powertools/BatchE2ET.java
new file mode 100644
index 000000000..c5f74594d
--- /dev/null
+++ b/powertools-e2e-tests/src/test/java/software/amazon/lambda/powertools/BatchE2ET.java
@@ -0,0 +1,277 @@
+/*
+ * Copyright 2023 Amazon.com, Inc. or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package software.amazon.lambda.powertools;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static software.amazon.lambda.powertools.testutils.Infrastructure.FUNCTION_NAME_OUTPUT;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
+import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+import software.amazon.lambda.powertools.testutils.Infrastructure;
+import software.amazon.lambda.powertools.utilities.JsonConfig;
+
+public class BatchE2ET {
+ private static final SdkHttpClient httpClient = UrlConnectionHttpClient.builder().build();
+ private static final Region region = Region.of(System.getProperty("AWS_DEFAULT_REGION", "eu-west-1"));
+ private static Infrastructure infrastructure;
+ private static String functionName;
+ private static String queueUrl;
+ private static String kinesisStreamName;
+
+ private static ObjectMapper objectMapper;
+ private static String outputTable;
+ private static DynamoDbClient ddbClient;
+ private static SqsClient sqsClient;
+ private static KinesisClient kinesisClient;
+ private static String ddbStreamsTestTable;
+ private final List testProducts;
+
+ public BatchE2ET() {
+ testProducts = Arrays.asList(
+ new Product(1, "product1", 1.23),
+ new Product(2, "product2", 4.56),
+ new Product(3, "product3", 6.78)
+ );
+ }
+
+ @BeforeAll
+ @Timeout(value = 5, unit = TimeUnit.MINUTES)
+ public static void setup() {
+ String random = UUID.randomUUID().toString().substring(0, 6);
+ String queueName = "batchqueue" + random;
+ kinesisStreamName = "batchstream" + random;
+ ddbStreamsTestTable = "ddbstreams" + random;
+
+ objectMapper = JsonConfig.get().getObjectMapper();
+
+ infrastructure = Infrastructure.builder()
+ .testName(BatchE2ET.class.getSimpleName())
+ .pathToFunction("batch")
+ .queue(queueName)
+ .ddbStreamsTableName(ddbStreamsTestTable)
+ .kinesisStream(kinesisStreamName)
+ .build();
+
+ Map outputs = infrastructure.deploy();
+ functionName = outputs.get(FUNCTION_NAME_OUTPUT);
+ queueUrl = outputs.get("QueueURL");
+ kinesisStreamName = outputs.get("KinesisStreamName");
+ outputTable = outputs.get("TableNameForAsyncTests");
+ ddbStreamsTestTable = outputs.get("DdbStreamsTestTable");
+
+ ddbClient = DynamoDbClient.builder()
+ .region(region)
+ .httpClient(httpClient)
+ .build();
+
+ // GIVEN
+ sqsClient = SqsClient.builder()
+ .httpClient(httpClient)
+ .region(region)
+ .build();
+ kinesisClient = KinesisClient.builder()
+ .httpClient(httpClient)
+ .region(region)
+ .build();
+ }
+
+ @AfterAll
+ public static void tearDown() {
+ if (infrastructure != null) {
+ infrastructure.destroy();
+ }
+ }
+
+ @AfterEach
+ public void cleanUpTest() {
+ // Delete everything in the output table
+ ScanResponse items = ddbClient.scan(ScanRequest.builder()
+ .tableName(outputTable)
+ .build());
+
+ for (Map item : items.items()) {
+ HashMap key = new HashMap() {
+ {
+ put("functionName", AttributeValue.builder()
+ .s(item.get("functionName").s())
+ .build());
+ put("id", AttributeValue.builder()
+ .s(item.get("id").s())
+ .build());
+ }
+ };
+
+ ddbClient.deleteItem(DeleteItemRequest.builder()
+ .tableName(outputTable)
+ .key(key)
+ .build());
+ }
+ }
+
+ @Test
+ public void sqsBatchProcessingSucceeds() throws InterruptedException {
+ List entries = testProducts.stream()
+ .map(p -> {
+ try {
+ return SendMessageBatchRequestEntry.builder()
+ .id(p.getName())
+ .messageBody(objectMapper.writeValueAsString(p))
+ .build();
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .collect(Collectors.toList());
+
+ // WHEN
+ sqsClient.sendMessageBatch(SendMessageBatchRequest.builder()
+ .entries(entries)
+ .queueUrl(queueUrl)
+ .build());
+ Thread.sleep(30000); // wait for function to be executed
+
+ // THEN
+ ScanResponse items = ddbClient.scan(ScanRequest.builder()
+ .tableName(outputTable)
+ .build());
+ validateAllItemsHandled(items);
+ }
+
+ @Test
+ public void kinesisBatchProcessingSucceeds() throws InterruptedException {
+ List entries = testProducts.stream()
+ .map(p -> {
+ try {
+ return PutRecordsRequestEntry.builder()
+ .partitionKey("1")
+ .data(SdkBytes.fromUtf8String(objectMapper.writeValueAsString(p)))
+ .build();
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .collect(Collectors.toList());
+
+ // WHEN
+ PutRecordsResponse result = kinesisClient.putRecords(PutRecordsRequest.builder()
+ .streamName(kinesisStreamName)
+ .records(entries)
+ .build());
+ Thread.sleep(30000); // wait for function to be executed
+
+ // THEN
+ ScanResponse items = ddbClient.scan(ScanRequest.builder()
+ .tableName(outputTable)
+ .build());
+ validateAllItemsHandled(items);
+ }
+
+ @Test
+ public void ddbStreamsBatchProcessingSucceeds() throws InterruptedException {
+ // GIVEN
+ String theId = "my-test-id";
+
+ // WHEN
+ ddbClient.putItem(PutItemRequest.builder()
+ .tableName(ddbStreamsTestTable)
+ .item(new HashMap() {
+ {
+ put("id", AttributeValue.builder()
+ .s(theId)
+ .build());
+ }
+ })
+ .build());
+ Thread.sleep(90000); // wait for function to be executed
+
+ // THEN
+ ScanResponse items = ddbClient.scan(ScanRequest.builder()
+ .tableName(outputTable)
+ .build());
+
+ assertThat(items.count()).isEqualTo(1);
+ assertThat(items.items().get(0).get("id").s()).isEqualTo(theId);
+ }
+
+ private void validateAllItemsHandled(ScanResponse items) {
+ for (Product p : testProducts) {
+ boolean foundIt = false;
+ for (Map a : items.items()) {
+ if (a.get("id").s().equals(Long.toString(p.id))) {
+ foundIt = true;
+ }
+ }
+ assertThat(foundIt).isTrue();
+ }
+ }
+
+ class Product {
+ private long id;
+
+ private String name;
+
+ private double price;
+
+ public Product() {
+ }
+
+ public Product(long id, String name, double price) {
+ this.id = id;
+ this.name = name;
+ this.price = price;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public double getPrice() {
+ return price;
+ }
+ }
+}
diff --git a/powertools-e2e-tests/src/test/java/software/amazon/lambda/powertools/testutils/Infrastructure.java b/powertools-e2e-tests/src/test/java/software/amazon/lambda/powertools/testutils/Infrastructure.java
index 996f49bd4..2a1af093c 100644
--- a/powertools-e2e-tests/src/test/java/software/amazon/lambda/powertools/testutils/Infrastructure.java
+++ b/powertools-e2e-tests/src/test/java/software/amazon/lambda/powertools/testutils/Infrastructure.java
@@ -45,14 +45,16 @@
import software.amazon.awscdk.services.appconfig.CfnDeploymentStrategy;
import software.amazon.awscdk.services.appconfig.CfnEnvironment;
import software.amazon.awscdk.services.appconfig.CfnHostedConfigurationVersion;
-import software.amazon.awscdk.services.dynamodb.Attribute;
-import software.amazon.awscdk.services.dynamodb.AttributeType;
-import software.amazon.awscdk.services.dynamodb.BillingMode;
-import software.amazon.awscdk.services.dynamodb.Table;
+import software.amazon.awscdk.services.dynamodb.*;
import software.amazon.awscdk.services.iam.PolicyStatement;
+import software.amazon.awscdk.services.kinesis.Stream;
+import software.amazon.awscdk.services.kinesis.StreamMode;
import software.amazon.awscdk.services.lambda.Code;
import software.amazon.awscdk.services.lambda.Function;
+import software.amazon.awscdk.services.lambda.StartingPosition;
import software.amazon.awscdk.services.lambda.Tracing;
+import software.amazon.awscdk.services.lambda.eventsources.DynamoEventSource;
+import software.amazon.awscdk.services.lambda.eventsources.KinesisEventSource;
import software.amazon.awscdk.services.lambda.eventsources.SqsEventSource;
import software.amazon.awscdk.services.logs.LogGroup;
import software.amazon.awscdk.services.logs.RetentionDays;
@@ -110,8 +112,9 @@ public class Infrastructure {
private final AppConfig appConfig;
private final SdkHttpClient httpClient;
private final String queue;
+ private final String kinesisStream;
private final String largeMessagesBucket;
-
+ private String ddbStreamsTableName;
private String functionName;
private Object cfnTemplate;
private String cfnAssetDirectory;
@@ -126,7 +129,9 @@ private Infrastructure(Builder builder) {
this.idempotencyTable = builder.idemPotencyTable;
this.appConfig = builder.appConfig;
this.queue = builder.queue;
+ this.kinesisStream = builder.kinesisStream;
this.largeMessagesBucket = builder.largeMessagesBucket;
+ this.ddbStreamsTableName = builder.ddbStreamsTableName;
this.app = new App();
this.stack = createStackWithLambda();
@@ -279,7 +284,12 @@ private Stack createStackWithLambda() {
.maxReceiveCount(1) // do not retry in case of error
.build();
sqsQueue.grantConsumeMessages(function);
- SqsEventSource sqsEventSource = SqsEventSource.Builder.create(sqsQueue).enabled(true).batchSize(1).build();
+ SqsEventSource sqsEventSource = SqsEventSource.Builder
+ .create(sqsQueue)
+ .enabled(true)
+ .reportBatchItemFailures(true)
+ .batchSize(1)
+ .build();
function.addEventSource(sqsEventSource);
CfnOutput.Builder
.create(stack, "QueueURL")
@@ -287,6 +297,46 @@ private Stack createStackWithLambda() {
.build();
createTableForAsyncTests = true;
}
+ if (!StringUtils.isEmpty(kinesisStream)) {
+ Stream stream = Stream.Builder
+ .create(stack, "KinesisStream")
+ .streamMode(StreamMode.ON_DEMAND)
+ .streamName(kinesisStream)
+ .build();
+
+ stream.grantRead(function);
+ KinesisEventSource kinesisEventSource = KinesisEventSource.Builder
+ .create(stream)
+ .enabled(true)
+ .batchSize(3)
+ .reportBatchItemFailures(true)
+ .startingPosition(StartingPosition.TRIM_HORIZON)
+ .maxBatchingWindow(Duration.seconds(1))
+ .build();
+ function.addEventSource(kinesisEventSource);
+ CfnOutput.Builder
+ .create(stack, "KinesisStreamName")
+ .value(stream.getStreamName())
+ .build();
+ }
+
+ if (!StringUtils.isEmpty(ddbStreamsTableName)) {
+ Table ddbStreamsTable = Table.Builder.create(stack, "DDBStreamsTable")
+ .tableName(ddbStreamsTableName)
+ .stream(StreamViewType.KEYS_ONLY)
+ .removalPolicy(RemovalPolicy.DESTROY)
+ .partitionKey(Attribute.builder().name("id").type(AttributeType.STRING).build())
+ .build();
+
+ DynamoEventSource ddbEventSource = DynamoEventSource.Builder.create(ddbStreamsTable)
+ .batchSize(1)
+ .startingPosition(StartingPosition.TRIM_HORIZON)
+ .maxBatchingWindow(Duration.seconds(1))
+ .reportBatchItemFailures(true)
+ .build();
+ function.addEventSource(ddbEventSource);
+ CfnOutput.Builder.create(stack, "DdbStreamsTestTable").value(ddbStreamsTable.getTableName()).build();
+ }
if (!StringUtils.isEmpty(largeMessagesBucket)) {
Bucket offloadBucket = Bucket.Builder
@@ -451,6 +501,8 @@ public static class Builder {
private Map environmentVariables = new HashMap<>();
private String idemPotencyTable;
private String queue;
+ private String kinesisStream;
+ private String ddbStreamsTableName;
private Builder() {
getJavaRuntime();
@@ -526,6 +578,16 @@ public Builder queue(String queue) {
return this;
}
+ public Builder kinesisStream(String stream) {
+ this.kinesisStream = stream;
+ return this;
+ }
+
+ public Builder ddbStreamsTableName(String tableName) {
+ this.ddbStreamsTableName = tableName;
+ return this;
+ }
+
public Builder largeMessagesBucket(String largeMessagesBucket) {
this.largeMessagesBucket = largeMessagesBucket;
return this;
diff --git a/powertools-serialization/src/main/java/software/amazon/lambda/powertools/utilities/EventDeserializer.java b/powertools-serialization/src/main/java/software/amazon/lambda/powertools/utilities/EventDeserializer.java
index 22712e8ce..13ad4d28f 100644
--- a/powertools-serialization/src/main/java/software/amazon/lambda/powertools/utilities/EventDeserializer.java
+++ b/powertools-serialization/src/main/java/software/amazon/lambda/powertools/utilities/EventDeserializer.java
@@ -33,7 +33,9 @@
import com.amazonaws.services.lambda.runtime.events.SNSEvent;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import com.amazonaws.services.lambda.runtime.events.ScheduledEvent;
+import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectReader;
import java.io.IOException;
import java.util.List;
@@ -96,6 +98,8 @@ public static EventPart extractDataFrom(Object object) {
return new EventPart(event.getRecords().stream()
.map(SQSEvent.SQSMessage::getBody)
.collect(Collectors.toList()));
+ } else if (object instanceof SQSEvent.SQSMessage) {
+ return new EventPart(((SQSEvent.SQSMessage) object).getBody());
} else if (object instanceof ScheduledEvent) {
ScheduledEvent event = (ScheduledEvent) object;
return new EventPart(event.getDetail());
@@ -113,6 +117,8 @@ public static EventPart extractDataFrom(Object object) {
return new EventPart(event.getRecords().stream()
.map(r -> decode(r.getKinesis().getData()))
.collect(Collectors.toList()));
+ } else if (object instanceof KinesisEvent.KinesisEventRecord) {
+ return new EventPart(decode(((KinesisEvent.KinesisEventRecord)object).getKinesis().getData()));
} else if (object instanceof KinesisFirehoseEvent) {
KinesisFirehoseEvent event = (KinesisFirehoseEvent) object;
return new EventPart(event.getRecords().stream()
@@ -214,6 +220,17 @@ public T as(Class clazz) {
}
}
+ public M as() {
+ TypeReference typeRef = new TypeReference() {};
+
+ try {
+ JsonParser parser = JsonConfig.get().getObjectMapper().createParser(content);
+ return JsonConfig.get().getObjectMapper().reader().readValue(parser, typeRef);
+ } catch (IOException e) {
+ throw new EventDeserializationException("Cannot load the event as " + typeRef, e);
+ }
+ };
+
/**
* Deserialize this part of event from JSON to a list of objects of type T
*
diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsBatch.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsBatch.java
index d0ffe6a73..4378fa707 100644
--- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsBatch.java
+++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsBatch.java
@@ -23,6 +23,10 @@
import java.lang.annotation.Target;
/**
+ * @deprecated
+ * @see software.amazon.lambda.powertools.batch in powertools-batch module.
+ * Will be removed in V2.
+ *
* {@link SqsBatch} is used to process batch messages in {@link SQSEvent}
*
*
@@ -87,6 +91,7 @@
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
+@Deprecated
public @interface SqsBatch {
Class extends SqsMessageHandler> value();
diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java
index c838180fd..1f00edf17 100644
--- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java
+++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java
@@ -119,6 +119,11 @@ public static void overrideS3Client(S3Client s3Client) {
}
/**
+ *
+ * @deprecated
+ * @see software.amazon.lambda.powertools.batch in powertools-batch module.
+ * Will be removed in V2.
+ *
* This utility method is used to process each {@link SQSMessage} inside the received {@link SQSEvent}
*
*
@@ -146,12 +151,17 @@ public static void overrideS3Client(S3Client s3Client) {
* @return List of values returned by {@link SqsMessageHandler#process(SQSMessage)} while processing each message.
* @throws SQSBatchProcessingException if some messages fail during processing.
*/
+ @Deprecated
public static List batchProcessor(final SQSEvent event,
final Class extends SqsMessageHandler> handler) {
return batchProcessor(event, false, handler);
}
/**
+ * @deprecated
+ * @see software.amazon.lambda.powertools.batch in powertools-batch module.
+ * Will be removed in V2.
+ *
* This utility method is used to process each {@link SQSMessage} inside the received {@link SQSEvent}
*
*
@@ -200,6 +210,7 @@ public static List batchProcessor(final SQSEvent event,
* @see Amazon SQS dead-letter queues
*/
@SafeVarargs
+ @Deprecated
public static List batchProcessor(final SQSEvent event,
final Class extends SqsMessageHandler> handler,
final Class extends Exception>... nonRetryableExceptions) {
@@ -207,6 +218,10 @@ public static List batchProcessor(final SQSEvent event,
}
/**
+ * @deprecated
+ * @see software.amazon.lambda.powertools.batch in powertools-batch module.
+ * Will be removed in V2.
+ *
* This utility method is used to process each {@link SQSMessage} inside the received {@link SQSEvent}
*
*
@@ -232,6 +247,7 @@ public static List batchProcessor(final SQSEvent event,
* @return List of values returned by {@link SqsMessageHandler#process(SQSMessage)} while processing each message.
* @throws SQSBatchProcessingException if some messages fail during processing and no suppression enabled.
*/
+ @Deprecated
public static List batchProcessor(final SQSEvent event,
final boolean suppressException,
final Class extends SqsMessageHandler> handler) {
@@ -241,6 +257,10 @@ public static List batchProcessor(final SQSEvent event,
}
/**
+ * @deprecated
+ * @see software.amazon.lambda.powertools.batch in powertools-batch module.
+ * Will be removed in V2.
+ *
* This utility method is used to process each {@link SQSMessage} inside the received {@link SQSEvent}
*
*
@@ -291,6 +311,7 @@ public static List batchProcessor(final SQSEvent event,
* @see Amazon SQS dead-letter queues
*/
@SafeVarargs
+ @Deprecated
public static List batchProcessor(final SQSEvent event,
final boolean suppressException,
final Class extends SqsMessageHandler> handler,
@@ -301,6 +322,10 @@ public static List batchProcessor(final SQSEvent event,
}
/**
+ * @deprecated
+ * @see software.amazon.lambda.powertools.batch in powertools-batch module.
+ * Will be removed in V2.
+ *
* This utility method is used to process each {@link SQSMessage} inside the received {@link SQSEvent}
*
*
@@ -355,6 +380,7 @@ public static List batchProcessor(final SQSEvent event,
* @see Amazon SQS dead-letter queues
*/
@SafeVarargs
+ @Deprecated
public static List batchProcessor(final SQSEvent event,
final boolean suppressException,
final Class extends SqsMessageHandler> handler,
@@ -367,6 +393,10 @@ public static List batchProcessor(final SQSEvent event,
}
/**
+ * @deprecated
+ * @see software.amazon.lambda.powertools.batch in powertools-batch module.
+ * Will be removed in V2.
+ *
* This utility method is used to process each {@link SQSMessage} inside the received {@link SQSEvent}
*
*
@@ -394,6 +424,7 @@ public static List batchProcessor(final SQSEvent event,
* @return List of values returned by {@link SqsMessageHandler#process(SQSMessage)} while processing each message-
* @throws SQSBatchProcessingException if some messages fail during processing.
*/
+ @Deprecated
public static List batchProcessor(final SQSEvent event,
final SqsMessageHandler handler) {
return batchProcessor(event, false, handler);
@@ -401,6 +432,10 @@ public static List batchProcessor(final SQSEvent event,
/**
+ * @deprecated
+ * @see software.amazon.lambda.powertools.batch in powertools-batch module.
+ * Will be removed in V2.
+ *
* This utility method is used to process each {@link SQSMessage} inside the received {@link SQSEvent}
*
*
@@ -450,6 +485,7 @@ public static List batchProcessor(final SQSEvent event,
* @see Amazon SQS dead-letter queues
*/
@SafeVarargs
+ @Deprecated
public static List batchProcessor(final SQSEvent event,
final SqsMessageHandler