Skip to content

Commit 006b9e0

Browse files
committed
DynamoDB E2E works
1 parent 4b13467 commit 006b9e0

File tree

7 files changed

+102
-55
lines changed

7 files changed

+102
-55
lines changed

powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/DynamoDbBatchMessageHandler.java

+1
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ public StreamsEventResponse processBatch(DynamodbEvent event, Context context) {
6060
String sequenceNumber = record.getDynamodb().getSequenceNumber();
6161
LOGGER.error("Error while processing record with id {}: {}, adding it to batch item failures",
6262
sequenceNumber, t.getMessage());
63+
LOGGER.error("Error was", t);
6364
batchFailures.add(new StreamsEventResponse.BatchItemFailure(sequenceNumber));
6465

6566
// Report failure if we have a handler

powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/KinesisStreamsBatchMessageHandler.java

+5
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,11 @@ public StreamsEventResponse processBatch(KinesisEvent event, Context context) {
7373
this.successHandler.accept(record);
7474
}
7575
} catch (Throwable t) {
76+
String sequenceNumber = record.getEventID();
77+
LOGGER.error("Error while processing record with eventID {}: {}, adding it to batch item failures",
78+
sequenceNumber, t.getMessage());
79+
LOGGER.error("Error was", t);
80+
7681
batchFailures.add(new StreamsEventResponse.BatchItemFailure(record.getKinesis().getSequenceNumber()));
7782

7883
// Report failure if we have a handler

powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/SqsBatchMessageHandler.java

+2
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ public SQSBatchResponse processBatch(SQSEvent event, Context context) {
8686
} catch (Throwable t) {
8787
LOGGER.error("Error while processing message with messageId {}: {}, adding it to batch item failures",
8888
message.getMessageId(), t.getMessage());
89+
LOGGER.error("Error was", t);
90+
8991
response.getBatchItemFailures()
9092
.add(SQSBatchResponse.BatchItemFailure.builder().withItemIdentifier(message.getMessageId())
9193
.build());

powertools-e2e-tests/handlers/batch/src/main/java/software/amazon/lambda/powertools/e2e/Function.java

+35-16
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.io.OutputStreamWriter;
3838
import java.nio.charset.StandardCharsets;
3939
import java.util.HashMap;
40+
import java.util.Map;
4041
import java.util.stream.Collectors;
4142
import org.apache.logging.log4j.LogManager;
4243
import org.apache.logging.log4j.Logger;
@@ -50,6 +51,8 @@
5051
import software.amazon.lambda.powertools.logging.Logging;
5152
import software.amazon.lambda.powertools.utilities.JsonConfig;
5253

54+
import javax.management.Attribute;
55+
5356

5457
public class Function implements RequestHandler<InputStream, Object> {
5558

@@ -83,30 +86,46 @@ private void processProductMessage(Product p, Context c) {
8386
// TODO - write product details to output table
8487
ddbClient = DynamoDbClient.builder()
8588
.build();
89+
Map<String, AttributeValue> results = new HashMap<>();
90+
results.put("functionName", AttributeValue.builder()
91+
.s(c.getFunctionName())
92+
.build());
93+
results.put("id", AttributeValue.builder()
94+
.s(Long.toString(p.getId()))
95+
.build());
96+
results.put("name", AttributeValue.builder()
97+
.s(p.getName())
98+
.build());
99+
results.put("price", AttributeValue.builder()
100+
.n(Double.toString(p.getPrice()))
101+
.build());
86102
ddbClient.putItem(PutItemRequest.builder()
87103
.tableName(ddbOutputTable)
88-
.item(new HashMap<String, AttributeValue>() {
89-
{
90-
put("functionName", AttributeValue.builder()
91-
.s(c.getFunctionName())
92-
.build());
93-
put("id", AttributeValue.builder()
94-
.s(Long.toString(p.getId()))
95-
.build());
96-
put("name", AttributeValue.builder()
97-
.s(p.getName())
98-
.build());
99-
put("price", AttributeValue.builder()
100-
.n(Double.toString(p.getPrice()))
101-
.build());
102-
}})
104+
.item(results)
103105
.build());
104106
}
105107

106108
private void processDdbMessage(DynamodbEvent.DynamodbStreamRecord dynamodbStreamRecord, Context context) {
107109
LOGGER.info("Processing DynamoDB Stream Record" + dynamodbStreamRecord);
108110

109-
// TODO write DDB change details to batch output table
111+
ddbClient = DynamoDbClient.builder()
112+
.build();
113+
114+
String id = dynamodbStreamRecord.getDynamodb().getKeys().get("id").getS();
115+
LOGGER.info("Incoming ID is " + id);
116+
117+
Map<String, AttributeValue> results = new HashMap<>();
118+
results.put("functionName", AttributeValue.builder()
119+
.s(context.getFunctionName())
120+
.build());
121+
results.put("id", AttributeValue.builder()
122+
.s(id)
123+
.build());
124+
125+
ddbClient.putItem(PutItemRequest.builder()
126+
.tableName(ddbOutputTable)
127+
.item(results)
128+
.build());
110129
}
111130

112131
public Object createResult(String input, Context context) {

powertools-e2e-tests/handlers/batch/src/test/java/TestSerialization.java

-29
This file was deleted.

powertools-e2e-tests/src/test/java/software/amazon/lambda/powertools/BatchE2ET.java

+32-4
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,7 @@
4242
import software.amazon.awssdk.regions.Region;
4343
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
4444
import software.amazon.awssdk.services.dynamodb.DynamoDbClientBuilder;
45-
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
46-
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
47-
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
48-
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
45+
import software.amazon.awssdk.services.dynamodb.model.*;
4946
import software.amazon.awssdk.services.kinesis.KinesisClient;
5047
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
5148
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
@@ -72,6 +69,7 @@ public class BatchE2ET {
7269
private static DynamoDbClient ddbClient;
7370
private static SqsClient sqsClient;
7471
private static KinesisClient kinesisClient;
72+
private static String ddbStreamsTestTable;
7573
private final List<Product> testProducts;
7674

7775
@BeforeAll
@@ -80,13 +78,16 @@ public static void setup() {
8078
String random = UUID.randomUUID().toString().substring(0, 6);
8179
String queueName = "batchqueue" + random;
8280
kinesisStreamName = "batchstream" + random;
81+
ddbStreamsTestTable = "ddbstreams" + random;
82+
8383
objectMapper = JsonConfig.get().getObjectMapper();
8484

8585
infrastructure = Infrastructure.builder()
8686
.testName(BatchE2ET.class.getSimpleName())
8787
.pathToFunction("batch")
8888
.idempotencyTable("idempo" + random)
8989
.queue(queueName)
90+
.ddbStreamsTableName(ddbStreamsTestTable)
9091
.kinesisStream(kinesisStreamName)
9192
.build();
9293

@@ -95,6 +96,7 @@ public static void setup() {
9596
queueUrl = outputs.get("QueueURL");
9697
kinesisStreamName = outputs.get("KinesisStreamName");
9798
outputTable = outputs.get("TableNameForAsyncTests");
99+
ddbStreamsTestTable = outputs.get("DdbStreamsTestTable");
98100

99101
ddbClient = DynamoDbClient.builder()
100102
.region(region)
@@ -211,6 +213,32 @@ public void kinesisBatchProcessingSucceeds() throws InterruptedException {
211213
validateAllItemsHandled(items);
212214
}
213215

216+
@Test
217+
public void ddbStreamsBatchProcessingSucceeds() throws InterruptedException {
218+
// GIVEN
219+
String theId = "my-test-id";
220+
221+
// WHEN
222+
ddbClient.putItem(PutItemRequest.builder()
223+
.tableName(ddbStreamsTestTable)
224+
.item(new HashMap<String, AttributeValue>() {
225+
{
226+
put("id", AttributeValue.builder()
227+
.s(theId)
228+
.build());
229+
}})
230+
.build());
231+
Thread.sleep(90000); // wait for function to be executed
232+
233+
// THEN
234+
ScanResponse items = ddbClient.scan(ScanRequest.builder()
235+
.tableName(outputTable)
236+
.build());
237+
238+
assertThat(items.count()).isEqualTo(1);
239+
assertThat(items.items().get(0).get("id").s()).isEqualTo(theId);
240+
}
241+
214242
private void validateAllItemsHandled(ScanResponse items) {
215243
for (Product p : testProducts) {
216244
boolean foundIt = false;

powertools-e2e-tests/src/test/java/software/amazon/lambda/powertools/testutils/Infrastructure.java

+27-6
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
import static java.util.Collections.singletonList;
1818

19-
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
2019
import com.fasterxml.jackson.databind.JsonNode;
2120
import java.io.File;
2221
import java.io.IOException;
@@ -46,18 +45,15 @@
4645
import software.amazon.awscdk.services.appconfig.CfnDeploymentStrategy;
4746
import software.amazon.awscdk.services.appconfig.CfnEnvironment;
4847
import software.amazon.awscdk.services.appconfig.CfnHostedConfigurationVersion;
49-
import software.amazon.awscdk.services.dynamodb.Attribute;
50-
import software.amazon.awscdk.services.dynamodb.AttributeType;
51-
import software.amazon.awscdk.services.dynamodb.BillingMode;
52-
import software.amazon.awscdk.services.dynamodb.Table;
53-
import software.amazon.awscdk.services.events.targets.KinesisStream;
48+
import software.amazon.awscdk.services.dynamodb.*;
5449
import software.amazon.awscdk.services.iam.PolicyStatement;
5550
import software.amazon.awscdk.services.kinesis.Stream;
5651
import software.amazon.awscdk.services.kinesis.StreamMode;
5752
import software.amazon.awscdk.services.lambda.Code;
5853
import software.amazon.awscdk.services.lambda.Function;
5954
import software.amazon.awscdk.services.lambda.StartingPosition;
6055
import software.amazon.awscdk.services.lambda.Tracing;
56+
import software.amazon.awscdk.services.lambda.eventsources.DynamoEventSource;
6157
import software.amazon.awscdk.services.lambda.eventsources.KinesisEventSource;
6258
import software.amazon.awscdk.services.lambda.eventsources.SqsEventSource;
6359
import software.amazon.awscdk.services.logs.LogGroup;
@@ -118,6 +114,7 @@ public class Infrastructure {
118114
private final String queue;
119115
private final String kinesisStream;
120116
private final String largeMessagesBucket;
117+
private String ddbStreamsTableName;
121118

122119
private String functionName;
123120
private Object cfnTemplate;
@@ -135,6 +132,7 @@ private Infrastructure(Builder builder) {
135132
this.queue = builder.queue;
136133
this.kinesisStream = builder.kinesisStream;
137134
this.largeMessagesBucket = builder.largeMessagesBucket;
135+
this.ddbStreamsTableName = builder.ddbStreamsTableName;
138136

139137
this.app = new App();
140138
this.stack = createStackWithLambda();
@@ -317,6 +315,23 @@ private Stack createStackWithLambda() {
317315
.build();
318316
}
319317

318+
if (!StringUtils.isEmpty(ddbStreamsTableName)) {
319+
Table ddbStreamsTable = Table.Builder.create(stack, "DDBStreamsTable")
320+
.tableName(ddbStreamsTableName)
321+
.stream(StreamViewType.KEYS_ONLY)
322+
.removalPolicy(RemovalPolicy.DESTROY)
323+
.partitionKey(Attribute.builder().name("id").type(AttributeType.STRING).build())
324+
.build();
325+
326+
DynamoEventSource ddbEventSource = DynamoEventSource.Builder.create(ddbStreamsTable)
327+
.batchSize(1)
328+
.startingPosition(StartingPosition.TRIM_HORIZON)
329+
.maxBatchingWindow(Duration.seconds(1))
330+
.build();
331+
function.addEventSource(ddbEventSource);
332+
CfnOutput.Builder.create(stack, "DdbStreamsTestTable").value(ddbStreamsTable.getTableName()).build();
333+
}
334+
320335
if (!StringUtils.isEmpty(largeMessagesBucket)) {
321336
Bucket offloadBucket = Bucket.Builder
322337
.create(stack, "LargeMessagesOffloadBucket")
@@ -481,6 +496,7 @@ public static class Builder {
481496
private String idemPotencyTable;
482497
private String queue;
483498
private String kinesisStream;
499+
private String ddbStreamsTableName;
484500

485501
private Builder() {
486502
getJavaRuntime();
@@ -561,6 +577,11 @@ public Builder kinesisStream(String stream) {
561577
return this;
562578
}
563579

580+
public Builder ddbStreamsTableName(String tableName) {
581+
this.ddbStreamsTableName = tableName;
582+
return this;
583+
}
584+
564585
public Builder largeMessagesBucket(String largeMessagesBucket) {
565586
this.largeMessagesBucket = largeMessagesBucket;
566587
return this;

0 commit comments

Comments
 (0)