Skip to content

feat: Add Batch Processor module #1317

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 95 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from 84 commits
Commits
Show all changes
95 commits
Select commit Hold shift + click to select a range
2c20023
Starting to sketch out shape of API for batch processor
scottgerring Jun 21, 2023
63ebcab
Merge branch 'main' into rfc-batch-processor
scottgerring Jun 22, 2023
c34f5d9
Merge branch 'main' into rfc-batch-processor
Jun 25, 2023
870ec47
Variant 1
Jun 25, 2023
5469287
Some more examples
Jun 25, 2023
0435d18
Add extra bit for handling message-specific mutation
Jun 25, 2023
6241014
Make clear what's not public
Jun 25, 2023
8168719
test with interfaces
jeromevdl Jun 28, 2023
b0675b7
move tests
jeromevdl Jun 28, 2023
5ae16f9
refactoring a bit
jeromevdl Jun 29, 2023
3127096
refactoring and adding FIFO
jeromevdl Jun 30, 2023
9b8a310
refactoring and adding FIFO
jeromevdl Jun 30, 2023
12cb97f
adding FIFO management
jeromevdl Jul 1, 2023
061bfb1
cleanup
jeromevdl Jul 1, 2023
6b07f78
add javadoc
jeromevdl Jul 3, 2023
7d2e1fb
Merge remote-tracking branch 'refs/remotes/origin/rfc-batch-processor…
scottgerring Jul 3, 2023
b69c09b
Merge branch 'main' into rfc-batch-processor
scottgerring Jul 11, 2023
cc28ce5
Flesh out builder option a bit
scottgerring Jul 11, 2023
916c26f
Flesh out a bit more
scottgerring Jul 11, 2023
96c30ff
more changes
scottgerring Jul 11, 2023
ee64d62
Merge branch 'main' into rfc-batch-processor
scottgerring Jul 19, 2023
26d8da5
Leaning into the builder style. needs some more thought
scottgerring Jul 19, 2023
a9517f6
The shape of it is rightish
scottgerring Jul 20, 2023
d3ad219
Working working
scottgerring Jul 20, 2023
418e32b
Work
scottgerring Jul 20, 2023
a1e441c
Work on kinesis batch handler
scottgerring Jul 24, 2023
28d1f8d
More tests
scottgerring Jul 24, 2023
a660a8b
More tests and starting to add an example
scottgerring Jul 24, 2023
2332e2d
Working on batch
scottgerring Jul 25, 2023
4af594c
feat(batch): initial DdbBatchMessageHandler implementation
mriccia Jul 25, 2023
34a58e6
more
scottgerring Jul 26, 2023
ed161e9
fix pom.xml for powertools-examples-batch
mriccia Jul 26, 2023
f8812be
Add dynamodb example
mriccia Jul 26, 2023
53a5abe
Move template into subdir
scottgerring Jul 26, 2023
9872410
Better structure
scottgerring Jul 26, 2023
e53c1e9
tidy up
mriccia Jul 26, 2023
4ca1726
Trying to get kinesis going
scottgerring Jul 26, 2023
fb422da
Kinesis demo working
scottgerring Jul 26, 2023
a862daf
Merge remote-tracking branch 'refs/remotes/origin/rfc-batch-processor…
scottgerring Jul 26, 2023
b8af8de
Merge branch 'main' into rfc-batch-processor
scottgerring Jul 26, 2023
4118768
Updated readme
scottgerring Jul 26, 2023
e75de66
Deprecated everywhere
scottgerring Jul 26, 2023
1e7d305
Address initial review comments
scottgerring Jul 27, 2023
6ad7e7d
Add success tests for Kinesis/S3
scottgerring Jul 27, 2023
8f69551
Increase DDB coverage
scottgerring Jul 27, 2023
672ba51
Tell sonar to ignore dupes in examples
scottgerring Jul 27, 2023
70a08ba
Add docs
scottgerring Jul 27, 2023
dae6131
Add warning
scottgerring Jul 27, 2023
5e4d709
Merge remote-tracking branch 'origin/main' into rfc-batch-processor
scottgerring Jul 27, 2023
6d842da
More doco
scottgerring Jul 27, 2023
9de7d0a
Format
scottgerring Jul 27, 2023
0e95238
Docs good
scottgerring Jul 27, 2023
ba17efd
Disabling formatting check for now as its breaking the build and I ca…
scottgerring Jul 27, 2023
e87e6dc
Make checkstyle happy
scottgerring Jul 27, 2023
5c046a6
Add docs from heitor
scottgerring Jul 27, 2023
56af4d0
More docs changes
scottgerring Jul 28, 2023
e0fd524
Merge branch 'main' into rfc-batch-processor
scottgerring Jul 28, 2023
c43df12
Merge branch 'main' into rfc-batch-processor
scottgerring Aug 2, 2023
5d8be7a
Merged
scottgerring Aug 2, 2023
033a922
move ddb template in the right folder
mriccia Aug 2, 2023
13baa6a
Changes
scottgerring Aug 2, 2023
ca6cd09
Merge remote-tracking branch 'refs/remotes/origin/rfc-batch-processor…
scottgerring Aug 2, 2023
00fe0b0
add items updates and deletions to ddb example
mriccia Aug 2, 2023
1f65ceb
Will it blend?
scottgerring Aug 2, 2023
ebc7630
Merge remote-tracking branch 'refs/remotes/origin/rfc-batch-processor…
scottgerring Aug 2, 2023
bb74fe4
More changes
scottgerring Aug 2, 2023
76e51a6
e2e test handler
mriccia Aug 2, 2023
0e4b018
Try work for SQS only
scottgerring Aug 2, 2023
f8c9802
Merged
scottgerring Aug 2, 2023
c641d4b
More greatness
scottgerring Aug 2, 2023
0dbf1a3
Almost good
scottgerring Aug 2, 2023
7834a6e
SQS works
scottgerring Aug 2, 2023
062870f
Also kinesis e2e
scottgerring Aug 2, 2023
5548dc0
Lets try doing it with streams
scottgerring Aug 2, 2023
3cf034a
Try make it work with streams
scottgerring Aug 2, 2023
55cbbf4
Streams?
scottgerring Aug 3, 2023
6e44fe6
Make SQS test work
scottgerring Aug 3, 2023
0c9f8b8
SQS and Kinesis work
scottgerring Aug 3, 2023
4b13467
Merge branch 'main' into rfc-batch-processor
scottgerring Aug 3, 2023
006b9e0
DynamoDB E2E works
scottgerring Aug 3, 2023
c3ae363
Formatting
scottgerring Aug 3, 2023
570d051
Try exclude e2e-tests from dupe checking
scottgerring Aug 3, 2023
6303a75
Rename sonar file
scottgerring Aug 3, 2023
f91c2c1
Formatting
scottgerring Aug 3, 2023
db0e7f1
Update docs/utilities/batch.md
scottgerring Aug 4, 2023
fa9aa11
Update docs/utilities/batch.md
scottgerring Aug 4, 2023
710a94c
Address review comments
scottgerring Aug 4, 2023
06373a7
Merge
scottgerring Aug 4, 2023
294313f
Missed one
scottgerring Aug 4, 2023
c63fb28
Formatting
scottgerring Aug 4, 2023
e27a31f
Cleanup doc linking
scottgerring Aug 4, 2023
0929fb1
More doco
scottgerring Aug 4, 2023
3d8aca8
Update docs/utilities/batch.md
scottgerring Aug 4, 2023
1ff4658
Update batch.md
scottgerring Aug 4, 2023
83fdf12
Skip aspectj run
scottgerring Aug 4, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .sonarcloud.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Ignore code duplicates in the examples
sonar.cpd.exclusions=examples/**/*,powertools-e2e-tests/**/*
586 changes: 224 additions & 362 deletions docs/utilities/batch.md

Large diffs are not rendered by default.

489 changes: 489 additions & 0 deletions docs/utilities/sqs_batch.md

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ Each example can be copied from its subdirectory and used independently of the r
* [powertools-examples-idempotency](powertools-examples-idempotency) - An idempotent HTTP API
* [powertools-examples-parameters](powertools-examples-parameters) - Uses the parameters module to provide runtime parameters to a function
* [powertools-examples-serialization](powertools-examples-serialization) - Uses the serialization module to serialize and deserialize API Gateway & SQS payloads
* [powertools-examples-sqs](powertools-examples-sqs) - Processes SQS batch requests
* [powertools-examples-sqs](powertools-examples-sqs) - Processes SQS batch requests (**Deprecated** - will be replaced by `powertools-examples-batch` in version 2 of this library)
* [powertools-examples-validation](powertools-examples-validation) - Uses the validation module to validate user requests received via API Gateway
* [powertools-examples-cloudformation](powertools-examples-cloudformation) - Deploys a Cloudformation custom resource
* [powertools-examples-batch](powertools-examples-batch) - Examples for each of the different batch processing deployments

## Working with AWS Serverless Application Model (SAM) Examples
Many of the examples use [AWS Serverless Application Model](https://aws.amazon.com/serverless/sam/) (SAM). To get started
Expand Down
1 change: 1 addition & 0 deletions examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
<module>powertools-examples-parameters</module>
<module>powertools-examples-serialization</module>
<module>powertools-examples-sqs</module>
<module>powertools-examples-batch</module>
<module>powertools-examples-validation</module>
<module>powertools-examples-cloudformation</module>
</modules>
Expand Down
35 changes: 35 additions & 0 deletions examples/powertools-examples-batch/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Powertools for AWS Lambda (Java) - Batch Example

This project contains examples of Lambda function using the batch processing module of Powertools for AWS Lambda (Java).
For more information on this module, please refer to the
[documentation](https://docs.powertools.aws.dev/lambda-java/utilities/batch/).

Three different examples and SAM deployments are included, covering each of the batch sources:

* [SQS](src/main/java/org/demo/batch/sqs) - SQS batch processing
* [Kinesis Streams](src/main/java/org/demo/batch/kinesis) - Kinesis Streams batch processing
* [DynamoDB Streams](src/main/java/org/demo/batch/dynamo) - DynamoDB Streams batch processing

## Deploy the sample application

This sample is based on Serverless Application Model (SAM). To deploy it, check out the instructions for getting
started with SAM in [the examples directory](../README.md)

This sample contains three different deployments, depending on which batch processor you'd like to use, you can
change to the subdirectory containing the example SAM template, and deploy. For instance, for the SQS batch
deployment:

```bash
cd deploy/sqs
sam build
sam deploy --guided
```

## Test the application

Each of the examples uses a Lambda scheduled every 5 minutes to push a batch, and a separate lambda to read it. To
see this in action, we can simply tail the logs of our stack:

```bash
sam logs --tail $STACK_NAME
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
DynamoDB Streams batch processing demo

Globals:
Function:
Timeout: 20
Runtime: java11
MemorySize: 512
Tracing: Active
Architectures:
- x86_64
Environment:
Variables:
POWERTOOLS_LOG_LEVEL: INFO
POWERTOOLS_LOGGER_SAMPLE_RATE: 1.0
POWERTOOLS_LOGGER_LOG_EVENT: true

Resources:
DynamoDBTable:
Type: AWS::DynamoDB::Table
Properties:
AttributeDefinitions:
- AttributeName: id
AttributeType: S
KeySchema:
- AttributeName: id
KeyType: HASH
ProvisionedThroughput:
ReadCapacityUnits: 5
WriteCapacityUnits: 5
StreamSpecification:
StreamViewType: NEW_IMAGE


DemoDynamoDBWriter:
Type: AWS::Serverless::Function
Properties:
CodeUri: ../..
Handler: org.demo.batch.dynamo.DynamoDBWriter::handleRequest
Environment:
Variables:
POWERTOOLS_SERVICE_NAME: ddbstreams-demo
TABLE_NAME: !Ref DynamoDBTable
Policies:
- DynamoDBCrudPolicy:
TableName: !Ref DynamoDBTable
Events:
CWSchedule:
Type: Schedule
Properties:
Schedule: 'rate(1 minute)'
Name: !Join [ "-", [ "ddb-writer-schedule", !Select [ 0, !Split [ -, !Select [ 2, !Split [ /, !Ref AWS::StackId ] ] ] ] ] ]
Description: Write records to DynamoDB via a Lambda function
Enabled: true

DemoDynamoDBStreamsConsumerFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: ../..
Handler: org.demo.batch.dynamo.DynamoDBStreamBatchHandler::handleRequest
Environment:
Variables:
POWERTOOLS_SERVICE_NAME: ddbstreams-batch-demo
Policies: AWSLambdaDynamoDBExecutionRole
Events:
Stream:
Type: DynamoDB
Properties:
Stream: !GetAtt DynamoDBTable.StreamArn
BatchSize: 100
StartingPosition: TRIM_HORIZON

83 changes: 83 additions & 0 deletions examples/powertools-examples-batch/deploy/kinesis/template.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
Kinesis batch processing demo

Globals:
Function:
Timeout: 20
Runtime: java11
MemorySize: 512
Tracing: Active
Environment:
Variables:
POWERTOOLS_LOG_LEVEL: INFO
POWERTOOLS_LOGGER_SAMPLE_RATE: 1.0
POWERTOOLS_LOGGER_LOG_EVENT: true

Resources:

DemoKinesisStream:
Type: AWS::Kinesis::Stream
Properties:
ShardCount: 1

StreamConsumer:
Type: "AWS::Kinesis::StreamConsumer"
Properties:
StreamARN: !GetAtt DemoKinesisStream.Arn
ConsumerName: KinesisBatchHandlerConsumer

DemoKinesisSenderFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: ../..
Handler: org.demo.batch.kinesis.KinesisBatchSender::handleRequest
Environment:
Variables:
POWERTOOLS_SERVICE_NAME: kinesis-batch-demo
STREAM_NAME: !Ref DemoKinesisStream
Policies:
- Statement:
- Sid: WriteToKinesis
Effect: Allow
Action:
- kinesis:PutRecords
- kinesis:DescribeStream
Resource: !GetAtt DemoKinesisStream.Arn
Events:
CWSchedule:
Type: Schedule
Properties:
Schedule: 'rate(5 minutes)'
Name: !Join [ "-", [ "message-producer-schedule", !Select [ 0, !Split [ -, !Select [ 2, !Split [ /, !Ref AWS::StackId ] ] ] ] ] ]
Description: Produce message to Kinesis via a Lambda function
Enabled: true

DemoKinesisConsumerFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: ../..
Handler: org.demo.batch.kinesis.KinesisBatchHandler::handleRequest
Environment:
Variables:
POWERTOOLS_SERVICE_NAME: kinesis-demo
Events:
Kinesis:
Type: Kinesis
Properties:
Stream: !GetAtt StreamConsumer.ConsumerARN
StartingPosition: LATEST
BatchSize: 2

Outputs:
DemoKinesisQueue:
Description: "ARN for Kinesis Stream"
Value: !GetAtt DemoKinesisStream.Arn
DemoKinesisSenderFunction:
Description: "Kinesis Batch Sender - Lambda Function ARN"
Value: !GetAtt DemoKinesisSenderFunction.Arn
DemoSQSConsumerFunction:
Description: "SQS Batch Handler - Lambda Function ARN"
Value: !GetAtt DemoKinesisConsumerFunction.Arn

147 changes: 147 additions & 0 deletions examples/powertools-examples-batch/deploy/sqs/template.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
sqs batch processing demo

Globals:
Function:
Timeout: 20
Runtime: java11
MemorySize: 512
Tracing: Active
Environment:
Variables:
POWERTOOLS_LOG_LEVEL: INFO
POWERTOOLS_LOGGER_SAMPLE_RATE: 1.0
POWERTOOLS_LOGGER_LOG_EVENT: true

Resources:
CustomerKey:
Type: AWS::KMS::Key
Properties:
Description: KMS key for encrypted queues
Enabled: true
KeyPolicy:
Version: '2012-10-17'
Statement:
- Sid: Enable IAM User Permissions
Effect: Allow
Principal:
AWS: !Sub 'arn:aws:iam::${AWS::AccountId}:root'
Action: 'kms:*'
Resource: '*'
- Sid: Allow use of the key
Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action:
- kms:Decrypt
- kms:GenerateDataKey
Resource: '*'

CustomerKeyAlias:
Type: AWS::KMS::Alias
Properties:
AliasName: alias/powertools-batch-sqs-demo
TargetKeyId: !Ref CustomerKey

DemoDlqSqsQueue:
Type: AWS::SQS::Queue
Properties:
KmsMasterKeyId: !Ref CustomerKey

DemoSqsQueue:
Type: AWS::SQS::Queue
Properties:
RedrivePolicy:
deadLetterTargetArn:
Fn::GetAtt:
- "DemoDlqSqsQueue"
- "Arn"
maxReceiveCount: 2
KmsMasterKeyId: !Ref CustomerKey

DemoSQSSenderFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: ../..
Handler: org.demo.batch.sqs.SqsBatchSender::handleRequest
Environment:
Variables:
POWERTOOLS_SERVICE_NAME: sqs-batch-demo
QUEUE_URL: !Ref DemoSqsQueue
Policies:
- Statement:
- Sid: SQSSendMessageBatch
Effect: Allow
Action:
- sqs:SendMessageBatch
- sqs:SendMessage
Resource: !GetAtt DemoSqsQueue.Arn
- Sid: SQSKMSKey
Effect: Allow
Action:
- kms:GenerateDataKey
- kms:Decrypt
Resource: !GetAtt CustomerKey.Arn
Events:
CWSchedule:
Type: Schedule
Properties:
Schedule: 'rate(5 minutes)'
Name: !Join [ "-", [ "message-producer-schedule", !Select [ 0, !Split [ -, !Select [ 2, !Split [ /, !Ref AWS::StackId ] ] ] ] ] ]
Description: Produce message to SQS via a Lambda function
Enabled: true

DemoSQSConsumerFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: ../..
Handler: org.demo.batch.sqs.SqsBatchHandler::handleRequest
Environment:
Variables:
POWERTOOLS_SERVICE_NAME: sqs-demo
Policies:
- Statement:
- Sid: SQSDeleteGetAttribute
Effect: Allow
Action:
- sqs:DeleteMessageBatch
- sqs:GetQueueAttributes
Resource: !GetAtt DemoSqsQueue.Arn
- Sid: SQSSendMessageBatch
Effect: Allow
Action:
- sqs:SendMessageBatch
- sqs:SendMessage
Resource: !GetAtt DemoDlqSqsQueue.Arn
- Sid: SQSKMSKey
Effect: Allow
Action:
- kms:GenerateDataKey
- kms:Decrypt
Resource: !GetAtt CustomerKey.Arn
Events:
MySQSEvent:
Type: SQS
Properties:
Queue: !GetAtt DemoSqsQueue.Arn
BatchSize: 2
MaximumBatchingWindowInSeconds: 300

Outputs:
DemoSqsQueue:
Description: "ARN for main SQS queue"
Value: !GetAtt DemoSqsQueue.Arn
DemoDlqSqsQueue:
Description: "ARN for DLQ"
Value: !GetAtt DemoDlqSqsQueue.Arn
DemoSQSSenderFunction:
Description: "SQS Batch Sender - Lambda Function ARN"
Value: !GetAtt DemoSQSSenderFunction.Arn
DemoSQSConsumerFunction:
Description: "SQS Batch Handler - Lambda Function ARN"
Value: !GetAtt DemoSQSConsumerFunction.Arn
DemoSQSConsumerFunctionRole:
Description: "Implicit IAM Role created for SQS Lambda Function ARN"
Value: !GetAtt DemoSQSConsumerFunctionRole.Arn
Loading