Skip to content

Commit d0c7f91

Browse files
scottgerringScott Gerringjeromevdlmriccia
authored
feat: Add Batch Processor module (#1317)
* Starting to sketch out shape of API for batch processor * Variant 1 * Some more examples * Add extra bit for handling message-specific mutation * Make clear what's not public * test with interfaces * move tests * refactoring a bit * refactoring and adding FIFO * refactoring and adding FIFO * adding FIFO management * cleanup * add javadoc * Flesh out builder option a bit * Flesh out a bit more * more changes * Leaning into the builder style. needs some more thought * The shape of it is rightish * Working working * Work * Work on kinesis batch handler * More tests * More tests and starting to add an example * Working on batch * feat(batch): initial DdbBatchMessageHandler implementation * more * fix pom.xml for powertools-examples-batch * Add dynamodb example * Move template into subdir * Better structure * tidy up * Trying to get kinesis going * Kinesis demo working * Updated readme * Deprecated everywhere * Address initial review comments * Add success tests for Kinesis/S3 * Increase DDB coverage * Tell sonar to ignore dupes in examples * Add docs * Add warning * More doco * Format * Docs good * Disabling formatting check for now as its breaking the build and I can't work out how to autoapply it from intellij properly * Make checkstyle happy * Add docs from heitor * More docs changes * move ddb template in the right folder * Changes * add items updates and deletions to ddb example * Will it blend? * More changes * e2e test handler * Try work for SQS only * More greatness * Almost good * SQS works * Also kinesis e2e * Lets try doing it with streams * Try make it work with streams * Streams? * Make SQS test work * SQS and Kinesis work * DynamoDB E2E works * Formatting * Try exclude e2e-tests from dupe checking * Rename sonar file * Formatting * Update docs/utilities/batch.md Co-authored-by: Jérôme Van Der Linden <[email protected]> * Update docs/utilities/batch.md Co-authored-by: Jérôme Van Der Linden <[email protected]> * Address review comments * Missed one * Formatting * Cleanup doc linking * More doco * Update docs/utilities/batch.md Co-authored-by: Jérôme Van Der Linden <[email protected]> * Update batch.md Address review comments * Skip aspectj run --------- Co-authored-by: Scott Gerring <[email protected]> Co-authored-by: Jerome Van Der Linden <[email protected]> Co-authored-by: Michele Ricciardi <[email protected]> Co-authored-by: Jérôme Van Der Linden <[email protected]>
1 parent fc3e971 commit d0c7f91

File tree

53 files changed

+4460
-358
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+4460
-358
lines changed

.sonarcloud.properties

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
# Ignore code duplicates in the examples
2+
sonar.cpd.exclusions=examples/**/*,powertools-e2e-tests/**/*

docs/utilities/batch.md

+458-347
Large diffs are not rendered by default.

docs/utilities/sqs_batch.md

+489
Large diffs are not rendered by default.

docs/utilities/sqs_large_message_handling.md

+5-3
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
---
2-
title: SQS Large Message Handling
2+
title: SQS Large Message Handling (Deprecated)
33
description: Utility
44
---
55

66
!!! warning
7-
This module is now deprecated and will be removed in version 2.
8-
See [Large Message Handling](large_messages.md) for the new module (`powertools-large-messages`) documentation.
7+
This module is now deprecated and will be removed in version 2.
8+
See [Large Message Handling](large_messages.md) and
9+
[the migration guide](http://localhost:8000/lambda-java/utilities/large_messages/#migration-from-the-sqs-large-message-utility)
10+
for the new module (`powertools-large-messages`) documentation
911

1012
The large message handling utility handles SQS messages which have had their payloads
1113
offloaded to S3 due to them being larger than the SQS maximum.

examples/README.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,10 @@ Each example can be copied from its subdirectory and used independently of the r
99
* [powertools-examples-idempotency](powertools-examples-idempotency) - An idempotent HTTP API
1010
* [powertools-examples-parameters](powertools-examples-parameters) - Uses the parameters module to provide runtime parameters to a function
1111
* [powertools-examples-serialization](powertools-examples-serialization) - Uses the serialization module to serialize and deserialize API Gateway & SQS payloads
12-
* [powertools-examples-sqs](powertools-examples-sqs) - Processes SQS batch requests
12+
* [powertools-examples-sqs](powertools-examples-sqs) - Processes SQS batch requests (**Deprecated** - will be replaced by `powertools-examples-batch` in version 2 of this library)
1313
* [powertools-examples-validation](powertools-examples-validation) - Uses the validation module to validate user requests received via API Gateway
1414
* [powertools-examples-cloudformation](powertools-examples-cloudformation) - Deploys a Cloudformation custom resource
15+
* [powertools-examples-batch](powertools-examples-batch) - Examples for each of the different batch processing deployments
1516

1617
## Working with AWS Serverless Application Model (SAM) Examples
1718
Many of the examples use [AWS Serverless Application Model](https://aws.amazon.com/serverless/sam/) (SAM). To get started

examples/pom.xml

+1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
<module>powertools-examples-parameters</module>
3535
<module>powertools-examples-serialization</module>
3636
<module>powertools-examples-sqs</module>
37+
<module>powertools-examples-batch</module>
3738
<module>powertools-examples-validation</module>
3839
<module>powertools-examples-cloudformation</module>
3940
</modules>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# Powertools for AWS Lambda (Java) - Batch Example
2+
3+
This project contains examples of Lambda function using the batch processing module of Powertools for AWS Lambda (Java).
4+
For more information on this module, please refer to the
5+
[documentation](https://docs.powertools.aws.dev/lambda-java/utilities/batch/).
6+
7+
Three different examples and SAM deployments are included, covering each of the batch sources:
8+
9+
* [SQS](src/main/java/org/demo/batch/sqs) - SQS batch processing
10+
* [Kinesis Streams](src/main/java/org/demo/batch/kinesis) - Kinesis Streams batch processing
11+
* [DynamoDB Streams](src/main/java/org/demo/batch/dynamo) - DynamoDB Streams batch processing
12+
13+
## Deploy the sample application
14+
15+
This sample is based on Serverless Application Model (SAM). To deploy it, check out the instructions for getting
16+
started with SAM in [the examples directory](../README.md)
17+
18+
This sample contains three different deployments, depending on which batch processor you'd like to use, you can
19+
change to the subdirectory containing the example SAM template, and deploy. For instance, for the SQS batch
20+
deployment:
21+
22+
```bash
23+
cd deploy/sqs
24+
sam build
25+
sam deploy --guided
26+
```
27+
28+
## Test the application
29+
30+
Each of the examples uses a Lambda scheduled every 5 minutes to push a batch, and a separate lambda to read it. To
31+
see this in action, we can simply tail the logs of our stack:
32+
33+
```bash
34+
sam logs --tail $STACK_NAME
35+
```
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
AWSTemplateFormatVersion: '2010-09-09'
2+
Transform: AWS::Serverless-2016-10-31
3+
Description: >
4+
DynamoDB Streams batch processing demo
5+
6+
Globals:
7+
Function:
8+
Timeout: 20
9+
Runtime: java11
10+
MemorySize: 512
11+
Tracing: Active
12+
Architectures:
13+
- x86_64
14+
Environment:
15+
Variables:
16+
POWERTOOLS_LOG_LEVEL: INFO
17+
POWERTOOLS_LOGGER_SAMPLE_RATE: 1.0
18+
POWERTOOLS_LOGGER_LOG_EVENT: true
19+
20+
Resources:
21+
DynamoDBTable:
22+
Type: AWS::DynamoDB::Table
23+
Properties:
24+
AttributeDefinitions:
25+
- AttributeName: id
26+
AttributeType: S
27+
KeySchema:
28+
- AttributeName: id
29+
KeyType: HASH
30+
ProvisionedThroughput:
31+
ReadCapacityUnits: 5
32+
WriteCapacityUnits: 5
33+
StreamSpecification:
34+
StreamViewType: NEW_IMAGE
35+
36+
37+
DemoDynamoDBWriter:
38+
Type: AWS::Serverless::Function
39+
Properties:
40+
CodeUri: ../..
41+
Handler: org.demo.batch.dynamo.DynamoDBWriter::handleRequest
42+
Environment:
43+
Variables:
44+
POWERTOOLS_SERVICE_NAME: ddbstreams-demo
45+
TABLE_NAME: !Ref DynamoDBTable
46+
Policies:
47+
- DynamoDBCrudPolicy:
48+
TableName: !Ref DynamoDBTable
49+
Events:
50+
CWSchedule:
51+
Type: Schedule
52+
Properties:
53+
Schedule: 'rate(1 minute)'
54+
Name: !Join [ "-", [ "ddb-writer-schedule", !Select [ 0, !Split [ -, !Select [ 2, !Split [ /, !Ref AWS::StackId ] ] ] ] ] ]
55+
Description: Write records to DynamoDB via a Lambda function
56+
Enabled: true
57+
58+
DemoDynamoDBStreamsConsumerFunction:
59+
Type: AWS::Serverless::Function
60+
Properties:
61+
CodeUri: ../..
62+
Handler: org.demo.batch.dynamo.DynamoDBStreamBatchHandler::handleRequest
63+
Environment:
64+
Variables:
65+
POWERTOOLS_SERVICE_NAME: ddbstreams-batch-demo
66+
Policies: AWSLambdaDynamoDBExecutionRole
67+
Events:
68+
Stream:
69+
Type: DynamoDB
70+
Properties:
71+
Stream: !GetAtt DynamoDBTable.StreamArn
72+
BatchSize: 100
73+
StartingPosition: TRIM_HORIZON
74+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
AWSTemplateFormatVersion: '2010-09-09'
2+
Transform: AWS::Serverless-2016-10-31
3+
Description: >
4+
Kinesis batch processing demo
5+
6+
Globals:
7+
Function:
8+
Timeout: 20
9+
Runtime: java11
10+
MemorySize: 512
11+
Tracing: Active
12+
Environment:
13+
Variables:
14+
POWERTOOLS_LOG_LEVEL: INFO
15+
POWERTOOLS_LOGGER_SAMPLE_RATE: 1.0
16+
POWERTOOLS_LOGGER_LOG_EVENT: true
17+
18+
Resources:
19+
20+
DemoKinesisStream:
21+
Type: AWS::Kinesis::Stream
22+
Properties:
23+
ShardCount: 1
24+
25+
StreamConsumer:
26+
Type: "AWS::Kinesis::StreamConsumer"
27+
Properties:
28+
StreamARN: !GetAtt DemoKinesisStream.Arn
29+
ConsumerName: KinesisBatchHandlerConsumer
30+
31+
DemoKinesisSenderFunction:
32+
Type: AWS::Serverless::Function
33+
Properties:
34+
CodeUri: ../..
35+
Handler: org.demo.batch.kinesis.KinesisBatchSender::handleRequest
36+
Environment:
37+
Variables:
38+
POWERTOOLS_SERVICE_NAME: kinesis-batch-demo
39+
STREAM_NAME: !Ref DemoKinesisStream
40+
Policies:
41+
- Statement:
42+
- Sid: WriteToKinesis
43+
Effect: Allow
44+
Action:
45+
- kinesis:PutRecords
46+
- kinesis:DescribeStream
47+
Resource: !GetAtt DemoKinesisStream.Arn
48+
Events:
49+
CWSchedule:
50+
Type: Schedule
51+
Properties:
52+
Schedule: 'rate(5 minutes)'
53+
Name: !Join [ "-", [ "message-producer-schedule", !Select [ 0, !Split [ -, !Select [ 2, !Split [ /, !Ref AWS::StackId ] ] ] ] ] ]
54+
Description: Produce message to Kinesis via a Lambda function
55+
Enabled: true
56+
57+
DemoKinesisConsumerFunction:
58+
Type: AWS::Serverless::Function
59+
Properties:
60+
CodeUri: ../..
61+
Handler: org.demo.batch.kinesis.KinesisBatchHandler::handleRequest
62+
Environment:
63+
Variables:
64+
POWERTOOLS_SERVICE_NAME: kinesis-demo
65+
Events:
66+
Kinesis:
67+
Type: Kinesis
68+
Properties:
69+
Stream: !GetAtt StreamConsumer.ConsumerARN
70+
StartingPosition: LATEST
71+
BatchSize: 2
72+
73+
Outputs:
74+
DemoKinesisQueue:
75+
Description: "ARN for Kinesis Stream"
76+
Value: !GetAtt DemoKinesisStream.Arn
77+
DemoKinesisSenderFunction:
78+
Description: "Kinesis Batch Sender - Lambda Function ARN"
79+
Value: !GetAtt DemoKinesisSenderFunction.Arn
80+
DemoSQSConsumerFunction:
81+
Description: "SQS Batch Handler - Lambda Function ARN"
82+
Value: !GetAtt DemoKinesisConsumerFunction.Arn
83+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
AWSTemplateFormatVersion: '2010-09-09'
2+
Transform: AWS::Serverless-2016-10-31
3+
Description: >
4+
sqs batch processing demo
5+
6+
Globals:
7+
Function:
8+
Timeout: 20
9+
Runtime: java11
10+
MemorySize: 512
11+
Tracing: Active
12+
Environment:
13+
Variables:
14+
POWERTOOLS_LOG_LEVEL: INFO
15+
POWERTOOLS_LOGGER_SAMPLE_RATE: 1.0
16+
POWERTOOLS_LOGGER_LOG_EVENT: true
17+
18+
Resources:
19+
CustomerKey:
20+
Type: AWS::KMS::Key
21+
Properties:
22+
Description: KMS key for encrypted queues
23+
Enabled: true
24+
KeyPolicy:
25+
Version: '2012-10-17'
26+
Statement:
27+
- Sid: Enable IAM User Permissions
28+
Effect: Allow
29+
Principal:
30+
AWS: !Sub 'arn:aws:iam::${AWS::AccountId}:root'
31+
Action: 'kms:*'
32+
Resource: '*'
33+
- Sid: Allow use of the key
34+
Effect: Allow
35+
Principal:
36+
Service: lambda.amazonaws.com
37+
Action:
38+
- kms:Decrypt
39+
- kms:GenerateDataKey
40+
Resource: '*'
41+
42+
CustomerKeyAlias:
43+
Type: AWS::KMS::Alias
44+
Properties:
45+
AliasName: alias/powertools-batch-sqs-demo
46+
TargetKeyId: !Ref CustomerKey
47+
48+
DemoDlqSqsQueue:
49+
Type: AWS::SQS::Queue
50+
Properties:
51+
KmsMasterKeyId: !Ref CustomerKey
52+
53+
DemoSqsQueue:
54+
Type: AWS::SQS::Queue
55+
Properties:
56+
RedrivePolicy:
57+
deadLetterTargetArn:
58+
Fn::GetAtt:
59+
- "DemoDlqSqsQueue"
60+
- "Arn"
61+
maxReceiveCount: 2
62+
KmsMasterKeyId: !Ref CustomerKey
63+
64+
DemoSQSSenderFunction:
65+
Type: AWS::Serverless::Function
66+
Properties:
67+
CodeUri: ../..
68+
Handler: org.demo.batch.sqs.SqsBatchSender::handleRequest
69+
Environment:
70+
Variables:
71+
POWERTOOLS_SERVICE_NAME: sqs-batch-demo
72+
QUEUE_URL: !Ref DemoSqsQueue
73+
Policies:
74+
- Statement:
75+
- Sid: SQSSendMessageBatch
76+
Effect: Allow
77+
Action:
78+
- sqs:SendMessageBatch
79+
- sqs:SendMessage
80+
Resource: !GetAtt DemoSqsQueue.Arn
81+
- Sid: SQSKMSKey
82+
Effect: Allow
83+
Action:
84+
- kms:GenerateDataKey
85+
- kms:Decrypt
86+
Resource: !GetAtt CustomerKey.Arn
87+
Events:
88+
CWSchedule:
89+
Type: Schedule
90+
Properties:
91+
Schedule: 'rate(5 minutes)'
92+
Name: !Join [ "-", [ "message-producer-schedule", !Select [ 0, !Split [ -, !Select [ 2, !Split [ /, !Ref AWS::StackId ] ] ] ] ] ]
93+
Description: Produce message to SQS via a Lambda function
94+
Enabled: true
95+
96+
DemoSQSConsumerFunction:
97+
Type: AWS::Serverless::Function
98+
Properties:
99+
CodeUri: ../..
100+
Handler: org.demo.batch.sqs.SqsBatchHandler::handleRequest
101+
Environment:
102+
Variables:
103+
POWERTOOLS_SERVICE_NAME: sqs-demo
104+
Policies:
105+
- Statement:
106+
- Sid: SQSDeleteGetAttribute
107+
Effect: Allow
108+
Action:
109+
- sqs:DeleteMessageBatch
110+
- sqs:GetQueueAttributes
111+
Resource: !GetAtt DemoSqsQueue.Arn
112+
- Sid: SQSSendMessageBatch
113+
Effect: Allow
114+
Action:
115+
- sqs:SendMessageBatch
116+
- sqs:SendMessage
117+
Resource: !GetAtt DemoDlqSqsQueue.Arn
118+
- Sid: SQSKMSKey
119+
Effect: Allow
120+
Action:
121+
- kms:GenerateDataKey
122+
- kms:Decrypt
123+
Resource: !GetAtt CustomerKey.Arn
124+
Events:
125+
MySQSEvent:
126+
Type: SQS
127+
Properties:
128+
Queue: !GetAtt DemoSqsQueue.Arn
129+
BatchSize: 2
130+
MaximumBatchingWindowInSeconds: 300
131+
132+
Outputs:
133+
DemoSqsQueue:
134+
Description: "ARN for main SQS queue"
135+
Value: !GetAtt DemoSqsQueue.Arn
136+
DemoDlqSqsQueue:
137+
Description: "ARN for DLQ"
138+
Value: !GetAtt DemoDlqSqsQueue.Arn
139+
DemoSQSSenderFunction:
140+
Description: "SQS Batch Sender - Lambda Function ARN"
141+
Value: !GetAtt DemoSQSSenderFunction.Arn
142+
DemoSQSConsumerFunction:
143+
Description: "SQS Batch Handler - Lambda Function ARN"
144+
Value: !GetAtt DemoSQSConsumerFunction.Arn
145+
DemoSQSConsumerFunctionRole:
146+
Description: "Implicit IAM Role created for SQS Lambda Function ARN"
147+
Value: !GetAtt DemoSQSConsumerFunctionRole.Arn

0 commit comments

Comments
 (0)