Skip to content

feat: Add Batch Processor module #1317

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 95 commits into from
Aug 4, 2023
Merged
Changes from 1 commit
Commits
Show all changes
95 commits
Select commit Hold shift + click to select a range
2c20023
Starting to sketch out shape of API for batch processor
scottgerring Jun 21, 2023
63ebcab
Merge branch 'main' into rfc-batch-processor
scottgerring Jun 22, 2023
c34f5d9
Merge branch 'main' into rfc-batch-processor
Jun 25, 2023
870ec47
Variant 1
Jun 25, 2023
5469287
Some more examples
Jun 25, 2023
0435d18
Add extra bit for handling message-specific mutation
Jun 25, 2023
6241014
Make clear what's not public
Jun 25, 2023
8168719
test with interfaces
jeromevdl Jun 28, 2023
b0675b7
move tests
jeromevdl Jun 28, 2023
5ae16f9
refactoring a bit
jeromevdl Jun 29, 2023
3127096
refactoring and adding FIFO
jeromevdl Jun 30, 2023
9b8a310
refactoring and adding FIFO
jeromevdl Jun 30, 2023
12cb97f
adding FIFO management
jeromevdl Jul 1, 2023
061bfb1
cleanup
jeromevdl Jul 1, 2023
6b07f78
add javadoc
jeromevdl Jul 3, 2023
7d2e1fb
Merge remote-tracking branch 'refs/remotes/origin/rfc-batch-processor…
scottgerring Jul 3, 2023
b69c09b
Merge branch 'main' into rfc-batch-processor
scottgerring Jul 11, 2023
cc28ce5
Flesh out builder option a bit
scottgerring Jul 11, 2023
916c26f
Flesh out a bit more
scottgerring Jul 11, 2023
96c30ff
more changes
scottgerring Jul 11, 2023
ee64d62
Merge branch 'main' into rfc-batch-processor
scottgerring Jul 19, 2023
26d8da5
Leaning into the builder style. needs some more thought
scottgerring Jul 19, 2023
a9517f6
The shape of it is rightish
scottgerring Jul 20, 2023
d3ad219
Working working
scottgerring Jul 20, 2023
418e32b
Work
scottgerring Jul 20, 2023
a1e441c
Work on kinesis batch handler
scottgerring Jul 24, 2023
28d1f8d
More tests
scottgerring Jul 24, 2023
a660a8b
More tests and starting to add an example
scottgerring Jul 24, 2023
2332e2d
Working on batch
scottgerring Jul 25, 2023
4af594c
feat(batch): initial DdbBatchMessageHandler implementation
mriccia Jul 25, 2023
34a58e6
more
scottgerring Jul 26, 2023
ed161e9
fix pom.xml for powertools-examples-batch
mriccia Jul 26, 2023
f8812be
Add dynamodb example
mriccia Jul 26, 2023
53a5abe
Move template into subdir
scottgerring Jul 26, 2023
9872410
Better structure
scottgerring Jul 26, 2023
e53c1e9
tidy up
mriccia Jul 26, 2023
4ca1726
Trying to get kinesis going
scottgerring Jul 26, 2023
fb422da
Kinesis demo working
scottgerring Jul 26, 2023
a862daf
Merge remote-tracking branch 'refs/remotes/origin/rfc-batch-processor…
scottgerring Jul 26, 2023
b8af8de
Merge branch 'main' into rfc-batch-processor
scottgerring Jul 26, 2023
4118768
Updated readme
scottgerring Jul 26, 2023
e75de66
Deprecated everywhere
scottgerring Jul 26, 2023
1e7d305
Address initial review comments
scottgerring Jul 27, 2023
6ad7e7d
Add success tests for Kinesis/S3
scottgerring Jul 27, 2023
8f69551
Increase DDB coverage
scottgerring Jul 27, 2023
672ba51
Tell sonar to ignore dupes in examples
scottgerring Jul 27, 2023
70a08ba
Add docs
scottgerring Jul 27, 2023
dae6131
Add warning
scottgerring Jul 27, 2023
5e4d709
Merge remote-tracking branch 'origin/main' into rfc-batch-processor
scottgerring Jul 27, 2023
6d842da
More doco
scottgerring Jul 27, 2023
9de7d0a
Format
scottgerring Jul 27, 2023
0e95238
Docs good
scottgerring Jul 27, 2023
ba17efd
Disabling formatting check for now as its breaking the build and I ca…
scottgerring Jul 27, 2023
e87e6dc
Make checkstyle happy
scottgerring Jul 27, 2023
5c046a6
Add docs from heitor
scottgerring Jul 27, 2023
56af4d0
More docs changes
scottgerring Jul 28, 2023
e0fd524
Merge branch 'main' into rfc-batch-processor
scottgerring Jul 28, 2023
c43df12
Merge branch 'main' into rfc-batch-processor
scottgerring Aug 2, 2023
5d8be7a
Merged
scottgerring Aug 2, 2023
033a922
move ddb template in the right folder
mriccia Aug 2, 2023
13baa6a
Changes
scottgerring Aug 2, 2023
ca6cd09
Merge remote-tracking branch 'refs/remotes/origin/rfc-batch-processor…
scottgerring Aug 2, 2023
00fe0b0
add items updates and deletions to ddb example
mriccia Aug 2, 2023
1f65ceb
Will it blend?
scottgerring Aug 2, 2023
ebc7630
Merge remote-tracking branch 'refs/remotes/origin/rfc-batch-processor…
scottgerring Aug 2, 2023
bb74fe4
More changes
scottgerring Aug 2, 2023
76e51a6
e2e test handler
mriccia Aug 2, 2023
0e4b018
Try work for SQS only
scottgerring Aug 2, 2023
f8c9802
Merged
scottgerring Aug 2, 2023
c641d4b
More greatness
scottgerring Aug 2, 2023
0dbf1a3
Almost good
scottgerring Aug 2, 2023
7834a6e
SQS works
scottgerring Aug 2, 2023
062870f
Also kinesis e2e
scottgerring Aug 2, 2023
5548dc0
Lets try doing it with streams
scottgerring Aug 2, 2023
3cf034a
Try make it work with streams
scottgerring Aug 2, 2023
55cbbf4
Streams?
scottgerring Aug 3, 2023
6e44fe6
Make SQS test work
scottgerring Aug 3, 2023
0c9f8b8
SQS and Kinesis work
scottgerring Aug 3, 2023
4b13467
Merge branch 'main' into rfc-batch-processor
scottgerring Aug 3, 2023
006b9e0
DynamoDB E2E works
scottgerring Aug 3, 2023
c3ae363
Formatting
scottgerring Aug 3, 2023
570d051
Try exclude e2e-tests from dupe checking
scottgerring Aug 3, 2023
6303a75
Rename sonar file
scottgerring Aug 3, 2023
f91c2c1
Formatting
scottgerring Aug 3, 2023
db0e7f1
Update docs/utilities/batch.md
scottgerring Aug 4, 2023
fa9aa11
Update docs/utilities/batch.md
scottgerring Aug 4, 2023
710a94c
Address review comments
scottgerring Aug 4, 2023
06373a7
Merge
scottgerring Aug 4, 2023
294313f
Missed one
scottgerring Aug 4, 2023
c63fb28
Formatting
scottgerring Aug 4, 2023
e27a31f
Cleanup doc linking
scottgerring Aug 4, 2023
0929fb1
More doco
scottgerring Aug 4, 2023
3d8aca8
Update docs/utilities/batch.md
scottgerring Aug 4, 2023
1ff4658
Update batch.md
scottgerring Aug 4, 2023
83fdf12
Skip aspectj run
scottgerring Aug 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 103 additions & 12 deletions docs/utilities/batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,9 @@ see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.

!!! note "You do not need any additional IAM permissions to use this utility, except for what each event source requires."

## Processing messages from SQS
### Processing messages from SQS

=== "App.java"
=== "SQSBatchHandler"

```java hl_lines="10 13-15 20 25"
import com.amazonaws.services.lambda.runtime.Context;
Expand Down Expand Up @@ -159,7 +159,7 @@ see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.
}
```

=== "Product.java"
=== "SQS Product"

```java
public class Product {
Expand Down Expand Up @@ -204,7 +204,7 @@ see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.
}
```

=== "Example Event"
=== "SQS Example Event"

```json
{
Expand Down Expand Up @@ -246,9 +246,9 @@ see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.
}
```

## Processing messages from Kinesis Streams
### Processing messages from Kinesis Streams

=== "App.java"
=== "KinesisBatchHandler"

```java hl_lines="10 13-15 20 24"
import com.amazonaws.services.lambda.runtime.Context;
Expand Down Expand Up @@ -280,7 +280,7 @@ see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.
}
```

=== "Product.java"
=== "Kinesis Product"

```java
public class Product {
Expand Down Expand Up @@ -325,7 +325,7 @@ see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.
}
```

=== "Example Event"
=== "Kinesis Example Event"

```json
{
Expand Down Expand Up @@ -367,9 +367,9 @@ see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.
]
}
```
## Processing messages from DynamoDB Streams
### Processing messages from DynamoDB Streams

=== "App.java"
=== "DynamoDBStreamBatchHandler"

```java hl_lines="10 13-15 20 24"
import com.amazonaws.services.lambda.runtime.Context;
Expand Down Expand Up @@ -400,7 +400,7 @@ see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.
}
```

=== "Example Event"
=== "DynamoDB Example Event"

```json
{
Expand Down Expand Up @@ -473,4 +473,95 @@ see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.
}
]
}
```
```


## Handling Messages

### Raw message and deserialized message handlers
You must provide either a raw message handler, or a deserialized message handler. The raw message handler receives
the envelope record type relevant for the particular event source - for instance, the SQS event source provides
[SQSMessage](https://javadoc.io/doc/com.amazonaws/aws-lambda-java-events/2.2.2/com/amazonaws/services/lambda/runtime/events/SQSEvent.html)
instances. The deserialized message handler extracts the body from this envelope, and deserializes it to a user-defined
type.

In general, the deserialized message handler should be used unless you need access to information on the envelope.

=== "Raw Message Handler"

```java
public void setup() {
BatchMessageHandler<SQSEvent, SQSBatchResponse> handler = new BatchMessageHandlerBuilder()
.withSqsBatchHandler()
.buildWithRawMessageHandler(this::processRawMessage);
}

private void processRawMessage(SQSEvent.SQSMessage sqsMessage) {
// Do something with the raw message
}

```

=== "Deserialized Message Handler"

```java
public void setup() {
BatchMessageHandler<SQSEvent, SQSBatchResponse> handler = new BatchMessageHandlerBuilder()
.withSqsBatchHandler()
.buildWitMessageHandler(this::processRawMessage, Product.class);
}

private void processMessage(Product product) {
// Do something with the raw message
}

```

### Success and failure handlers

You can register a success or failure handler which will be invoked as each message is processed by the batch
module.

Handlers can be provided when building the batch processor and are available for all event sources.
For instance for DynamoDB:

```java
BatchMessageHandler<DynamodbEvent, StreamsEventResponse> handler = new BatchMessageHandlerBuilder()
.withDynamoDbBatchHandler()
.withSuccessHandler((m) -> {
// Success handler receives the raw message
LOGGER.info("Message with sequenceNumber {} was successfully processed",
m.getDynamodb().getSequenceNumber());
})
.withFailureHandler((m, e) -> {
// Failure handler receives the raw message and the exception thrown
LOGGER.info("Message with sequenceNumber {} failed to be processed: {}"
, e.getDynamodb().getSequenceNumber(), e);
})
.buildWithMessageHander(this::processMessage);
```

!!! info
If the success handler throws an exception, the item it is processing will be marked as failed by the
batch processor.
If the failure handler throws, the batch processing will continue; the item it is processing has
already been marked as failed.


### Lambda Context

Both raw and deserialized message handlers can choose to take the Lambda context as an argument if they
need it, or not:

```java
public class ClassWithHandlers {

private void processMessage(Product product) {
// Do something with the raw message
}

private void processMessageWithContext(Product product, Context context) {
// Do something with the raw message and the lambda Context
}
}
```