Skip to content

Commit c5be568

Browse files
committed
sqs // batch processing example
1 parent 1bbad5b commit c5be568

File tree

8 files changed

+272
-30
lines changed

8 files changed

+272
-30
lines changed

examples/powertools-examples-batch/deploy/sqs/template.yml

+58-5
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,10 @@ Globals:
77
Function:
88
Timeout: 20
99
Runtime: java11
10-
MemorySize: 512
11-
Tracing: Active
10+
MemorySize: 5400
1211
Environment:
1312
Variables:
1413
POWERTOOLS_LOG_LEVEL: INFO
15-
POWERTOOLS_LOGGER_SAMPLE_RATE: 1.0
1614
POWERTOOLS_LOGGER_LOG_EVENT: true
1715

1816
Resources:
@@ -45,6 +43,9 @@ Resources:
4543
AliasName: alias/powertools-batch-sqs-demo
4644
TargetKeyId: !Ref CustomerKey
4745

46+
Bucket:
47+
Type: AWS::S3::Bucket
48+
4849
DemoDlqSqsQueue:
4950
Type: AWS::SQS::Queue
5051
Properties:
@@ -96,11 +97,57 @@ Resources:
9697
DemoSQSConsumerFunction:
9798
Type: AWS::Serverless::Function
9899
Properties:
100+
Tracing: Active
99101
CodeUri: ../..
100102
Handler: org.demo.batch.sqs.SqsBatchHandler::handleRequest
101103
Environment:
102104
Variables:
103105
POWERTOOLS_SERVICE_NAME: sqs-demo
106+
BUCKET: !Ref Bucket
107+
Policies:
108+
- Statement:
109+
- Sid: SQSDeleteGetAttribute
110+
Effect: Allow
111+
Action:
112+
- sqs:DeleteMessageBatch
113+
- sqs:GetQueueAttributes
114+
Resource: !GetAtt DemoSqsQueue.Arn
115+
- Sid: SQSSendMessageBatch
116+
Effect: Allow
117+
Action:
118+
- sqs:SendMessageBatch
119+
- sqs:SendMessage
120+
Resource: !GetAtt DemoDlqSqsQueue.Arn
121+
- Sid: SQSKMSKey
122+
Effect: Allow
123+
Action:
124+
- kms:GenerateDataKey
125+
- kms:Decrypt
126+
Resource: !GetAtt CustomerKey.Arn
127+
- Sid: WriteToS3
128+
Effect: Allow
129+
Action:
130+
- s3:PutObject
131+
Resource: !Sub ${Bucket.Arn}/*
132+
133+
# Events:
134+
# MySQSEvent:
135+
# Type: SQS
136+
# Properties:
137+
# Queue: !GetAtt DemoSqsQueue.Arn
138+
# BatchSize: 100
139+
# MaximumBatchingWindowInSeconds: 60
140+
141+
DemoSQSParallelConsumerFunction:
142+
Type: AWS::Serverless::Function
143+
Properties:
144+
Tracing: Active
145+
CodeUri: ../..
146+
Handler: org.demo.batch.sqs.SqsParallelBatchHandler::handleRequest
147+
Environment:
148+
Variables:
149+
POWERTOOLS_SERVICE_NAME: sqs-demo
150+
BUCKET: !Ref Bucket
104151
Policies:
105152
- Statement:
106153
- Sid: SQSDeleteGetAttribute
@@ -121,13 +168,19 @@ Resources:
121168
- kms:GenerateDataKey
122169
- kms:Decrypt
123170
Resource: !GetAtt CustomerKey.Arn
171+
- Sid: WriteToS3
172+
Effect: Allow
173+
Action:
174+
- s3:PutObject
175+
Resource: !Sub ${Bucket.Arn}/*
176+
124177
Events:
125178
MySQSEvent:
126179
Type: SQS
127180
Properties:
128181
Queue: !GetAtt DemoSqsQueue.Arn
129-
BatchSize: 2
130-
MaximumBatchingWindowInSeconds: 300
182+
BatchSize: 100
183+
MaximumBatchingWindowInSeconds: 60
131184

132185
Outputs:
133186
DemoSqsQueue:

examples/powertools-examples-batch/pom.xml

+12-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
<maven.compiler.source>11</maven.compiler.source>
1515
<maven.compiler.target>11</maven.compiler.target>
1616
<aspectj.version>1.9.20.1</aspectj.version>
17-
<sdk.version>2.24.10</sdk.version>
17+
<sdk.version>2.25.21</sdk.version>
1818
</properties>
1919

2020
<dependencies>
@@ -42,6 +42,17 @@
4242
<groupId>software.amazon.awssdk</groupId>
4343
<artifactId>sdk-core</artifactId>
4444
<version>${sdk.version}</version>
45+
<exclusions>
46+
<exclusion>
47+
<groupId>org.slf4j</groupId>
48+
<artifactId>slf4j-api</artifactId>
49+
</exclusion>
50+
</exclusions>
51+
</dependency>
52+
<dependency>
53+
<groupId>software.amazon.awssdk</groupId>
54+
<artifactId>s3</artifactId>
55+
<version>${sdk.version}</version>
4556
</dependency>
4657
<dependency>
4758
<groupId>software.amazon.awssdk</groupId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright 2024 Amazon.com, Inc. or its affiliates.
3+
* Licensed under the Apache License, Version 2.0 (the
4+
* "License"); you may not use this file except in compliance
5+
* with the License. You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
* limitations under the License.
12+
*
13+
*/
14+
15+
package org.demo.batch.sqs;
16+
17+
import com.amazonaws.services.lambda.runtime.Context;
18+
import com.fasterxml.jackson.databind.ObjectMapper;
19+
import java.io.File;
20+
import java.io.IOException;
21+
import java.util.Arrays;
22+
import java.util.Random;
23+
import org.demo.batch.model.Product;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
import software.amazon.awssdk.core.sync.RequestBody;
27+
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
28+
import software.amazon.awssdk.services.s3.S3Client;
29+
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
30+
import software.amazon.lambda.powertools.tracing.Tracing;
31+
import software.amazon.lambda.powertools.tracing.TracingUtils;
32+
33+
public class AbstractSqsBatchHandler {
34+
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSqsBatchHandler.class);
35+
private final ObjectMapper mapper = new ObjectMapper();
36+
private final String bucket = System.getenv("BUCKET");
37+
private final S3Client s3 = S3Client.builder().httpClient(UrlConnectionHttpClient.create()).build();
38+
private final Random r = new Random();
39+
40+
/**
41+
* Simulate some processing (I/O + S3 put request)
42+
* @param p deserialized product
43+
* @param context Lambda context
44+
*/
45+
@Tracing
46+
protected void processMessage(Product p, Context context) {
47+
TracingUtils.putAnnotation("productId", p.getId());
48+
LOGGER.info("Processing product {}", p);
49+
char c = (char)(r.nextInt(26) + 'a');
50+
char[] chars = new char[1024 * 1000];
51+
Arrays.fill(chars, c);
52+
p.setName(new String(chars));
53+
try {
54+
File file = new File("/tmp/"+p.getId()+".json");
55+
mapper.writeValue(file, p);
56+
s3.putObject(
57+
PutObjectRequest.builder().bucket(bucket).key(p.getId()+".json").build(), RequestBody.fromFile(file));
58+
} catch (IOException e) {
59+
throw new RuntimeException(e);
60+
}
61+
}
62+
}

examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/SqsBatchHandler.java

+7-8
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@
44
import com.amazonaws.services.lambda.runtime.RequestHandler;
55
import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse;
66
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
7+
import org.demo.batch.model.Product;
78
import org.slf4j.Logger;
89
import org.slf4j.LoggerFactory;
9-
import org.demo.batch.model.Product;
1010
import software.amazon.lambda.powertools.batch.BatchMessageHandlerBuilder;
1111
import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler;
12+
import software.amazon.lambda.powertools.logging.Logging;
13+
import software.amazon.lambda.powertools.tracing.Tracing;
1214

13-
public class SqsBatchHandler implements RequestHandler<SQSEvent, SQSBatchResponse> {
15+
public class SqsBatchHandler extends AbstractSqsBatchHandler implements RequestHandler<SQSEvent, SQSBatchResponse> {
1416
private static final Logger LOGGER = LoggerFactory.getLogger(SqsBatchHandler.class);
1517
private final BatchMessageHandler<SQSEvent, SQSBatchResponse> handler;
1618

@@ -20,14 +22,11 @@ public SqsBatchHandler() {
2022
.buildWithMessageHandler(this::processMessage, Product.class);
2123
}
2224

25+
@Logging
26+
@Tracing
2327
@Override
2428
public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) {
29+
LOGGER.info("Processing batch of {} messages", sqsEvent.getRecords().size());
2530
return handler.processBatch(sqsEvent, context);
2631
}
27-
28-
29-
private void processMessage(Product p, Context c) {
30-
LOGGER.info("Processing product " + p);
31-
}
32-
3332
}

examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/SqsBatchSender.java

+10-15
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,13 @@
1010
import java.security.SecureRandom;
1111
import java.util.List;
1212
import java.util.stream.IntStream;
13+
import org.demo.batch.model.Product;
1314
import org.slf4j.Logger;
1415
import org.slf4j.LoggerFactory;
15-
import org.demo.batch.model.Product;
1616
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
1717
import software.amazon.awssdk.services.sqs.SqsClient;
1818
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
1919
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
20-
import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
2120

2221

2322
/**
@@ -45,16 +44,12 @@ public SqsBatchSender() {
4544
public String handleRequest(ScheduledEvent scheduledEvent, Context context) {
4645
String queueUrl = System.getenv("QUEUE_URL");
4746

48-
LOGGER.info("handleRequest");
49-
50-
// Push 5 messages on each invoke.
51-
List<SendMessageBatchRequestEntry> batchRequestEntries = IntStream.range(0, 5)
47+
List<SendMessageBatchRequestEntry> batchRequestEntries = IntStream.range(0, 50)
5248
.mapToObj(value -> {
53-
long id = random.nextLong();
54-
float price = random.nextFloat();
49+
long id = Math.abs(random.nextLong());
50+
float price = Math.abs(random.nextFloat() * 3465);
5551
Product product = new Product(id, "product-" + id, price);
5652
try {
57-
5853
return SendMessageBatchRequestEntry.builder()
5954
.id(scheduledEvent.getId() + value)
6055
.messageBody(objectMapper.writeValueAsString(product))
@@ -65,12 +60,12 @@ public String handleRequest(ScheduledEvent scheduledEvent, Context context) {
6560
}
6661
}).collect(toList());
6762

68-
SendMessageBatchResponse sendMessageBatchResponse = sqsClient.sendMessageBatch(SendMessageBatchRequest.builder()
69-
.queueUrl(queueUrl)
70-
.entries(batchRequestEntries)
71-
.build());
72-
73-
LOGGER.info("Sent Message {}", sendMessageBatchResponse);
63+
for (int i = 0; i < 50; i += 10) {
64+
sqsClient.sendMessageBatch(SendMessageBatchRequest.builder()
65+
.queueUrl(queueUrl)
66+
.entries(batchRequestEntries.subList(i, i + 10))
67+
.build());
68+
}
7469

7570
return "Success";
7671
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2024 Amazon.com, Inc. or its affiliates.
3+
* Licensed under the Apache License, Version 2.0 (the
4+
* "License"); you may not use this file except in compliance
5+
* with the License. You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
* limitations under the License.
12+
*
13+
*/
14+
15+
package org.demo.batch.sqs;
16+
17+
import com.amazonaws.services.lambda.runtime.Context;
18+
import com.amazonaws.services.lambda.runtime.RequestHandler;
19+
import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse;
20+
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
21+
import org.demo.batch.model.Product;
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
import software.amazon.lambda.powertools.batch.BatchMessageHandlerBuilder;
25+
import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler;
26+
import software.amazon.lambda.powertools.logging.Logging;
27+
import software.amazon.lambda.powertools.tracing.Tracing;
28+
29+
public class SqsParallelBatchHandler extends AbstractSqsBatchHandler implements RequestHandler<SQSEvent, SQSBatchResponse> {
30+
private static final Logger LOGGER = LoggerFactory.getLogger(SqsParallelBatchHandler.class);
31+
private final BatchMessageHandler<SQSEvent, SQSBatchResponse> handler;
32+
33+
public SqsParallelBatchHandler() {
34+
handler = new BatchMessageHandlerBuilder()
35+
.withSqsBatchHandler()
36+
.buildWithMessageHandler(this::processMessage, Product.class);
37+
}
38+
39+
@Logging
40+
@Tracing
41+
@Override
42+
public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) {
43+
LOGGER.info("Processing batch of {} messages", sqsEvent.getRecords().size());
44+
return handler.processBatchInParallel(sqsEvent, context);
45+
}
46+
}

0 commit comments

Comments
 (0)