Skip to content

Commit 5469287

Browse files
author
Scott Gerring
committed
Some more examples
1 parent 870ec47 commit 5469287

File tree

7 files changed

+141
-10
lines changed

7 files changed

+141
-10
lines changed

powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/BatchMessageHandlerBuilder.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,12 @@
77
import java.util.List;
88
import java.util.function.Consumer;
99

10+
1011
/**
11-
* Let's make this builder-style so that we can extend the interface
12-
* if we need to with extra fields without breaking clients
12+
* A builder-style interface we can use within an existing Lambda RequestHandler to
13+
* deal with our batch responses.
14+
*
15+
* @see software.amazon.lambda.powertools.batch.examples.ExampleMessageHandlerBuilder
1316
*/
1417
public class BatchMessageHandlerBuilder {
1518

powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/BatchRequestHandler.java

+42-5
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,26 @@
77
import java.util.List;
88
import java.util.stream.Collectors;
99

10+
/**
11+
* An abstract base class that will be extended per-event-source to supply event-source
12+
* specific functionality. As much as possible is pulled into the base class, and
13+
* event-source specific logic is delegated downwards (e.g., mapping errors to response,
14+
* extracting messages from batch).
15+
*
16+
* IRL, this would be implemented leaning on the {@link BatchMessageHandlerBuilder}, here
17+
* I have provided implementation inline for illustrative purposes.
18+
*
19+
* @param <T> The batch event type
20+
* @param <U> The individual message type for each message within teh batch
21+
* @param <V> The batch result type
22+
*/
1023
public abstract class BatchRequestHandler<T, U, V> implements RequestHandler<T, V> {
1124

25+
/**
26+
* Used to wrap the result of processing a single message. We wrap the message itself,
27+
* and optionally an exception that was raised during the processing. A lack of
28+
* exception indicates success.
29+
*/
1230
protected class MessageProcessingResult<U> {
1331
private final U message;
1432
private final Exception exception;
@@ -19,12 +37,19 @@ public MessageProcessingResult(U message, Exception exception) {
1937
}
2038
}
2139

40+
/**
41+
* The batch processing logic goes here. This can be generic across all message types.
42+
*
43+
* @param input The batch message to process
44+
* @param context The Lambda execution environment context object.
45+
* @return
46+
*/
2247
@Override
2348
public V handleRequest(T input, Context context) {
24-
// Extract messages
49+
// Extract messages from batch
2550
List<U> messages = extractMessages(input);
2651

27-
// Try process them
52+
// For each message, map it to either 1/ a successful result, or 2/ an exception
2853
List<MessageProcessingResult<U>> results = messages.stream().map(m -> {
2954
try {
3055
processItem(m, context);
@@ -34,20 +59,32 @@ public V handleRequest(T input, Context context) {
3459
}
3560
}).collect(Collectors.toList());
3661

37-
// Generate the response
62+
// Generate the response from the list of results
3863
return writeResponse(results);
3964
}
4065

4166
/**
4267
* Provided by the event-specific child to extract the individual records
4368
* from the batch request
4469
*
45-
* @param input
46-
* @return
70+
* @param input The batch
71+
* @return the messages within the batch
4772
*/
4873
protected abstract List<U> extractMessages(T input);
4974

75+
/**
76+
* Given the set of message processing results, generates the appropriate batch
77+
* processing result to return to Lambda.
78+
*
79+
* @param results the result of processing each message, and the messages themselves
80+
* @return the batch processing result to return to lambda
81+
*/
5082
protected abstract V writeResponse(Iterable<MessageProcessingResult<U>> results);
5183

84+
/**
85+
* To be provided by the user. Processes an individual message within the batch
86+
* @param message
87+
* @param context
88+
*/
5289
public abstract void processItem(U message, Context context);
5390
}

powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/examples/ExampleBatchRequestHandler.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@
44
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
55
import software.amazon.lambda.powertools.batch.SQSBatchRequestHandler;
66

7+
/**
8+
* An example handler that is implemented by extending our {@link SQSBatchRequestHandler}
9+
*/
710
public class ExampleBatchRequestHandler extends SQSBatchRequestHandler {
8-
911
@Override
1012
public void processItem(SQSEvent.SQSMessage message, Context context) {
1113
// Process an SQS message without throwing

powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/examples/ExampleMessageHandlerBuilder.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
import software.amazon.lambda.powertools.batch.BatchMessageHandlerBuilder;
77

88
/**
9-
* This is just here for illustrative purposes, and won't
10-
* be released with this code!
9+
* An example handler that is implemented by using the {@link BatchMessageHandlerBuilder} inline
10+
* in an existing RequestHandler.
1111
*/
1212
public class ExampleMessageHandlerBuilder implements RequestHandler<SQSEvent, Object> {
1313

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package software.amazon.lambda.powertools.batch2;
2+
3+
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
4+
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
5+
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
6+
import sun.reflect.generics.reflectiveObjects.NotImplementedException;
7+
8+
public interface BatchProcessor<I, O> {
9+
10+
default O processBatch(I input) {
11+
// depending on the input type (SQS/Kinesis/Dynamo/...), create the appropriate response
12+
13+
// TODO - need a way of extracting items from input. Defer to interface?
14+
15+
// browse the list of items
16+
// for each item
17+
try {
18+
// Need to typematch to get the processItem calls to bind
19+
if (input instanceof SQSEvent.SQSMessage) {
20+
processItem((SQSEvent.SQSMessage) input);
21+
} else if (input instanceof KinesisEvent.KinesisEventRecord) {
22+
processItem((KinesisEvent.KinesisEventRecord) input);
23+
} else if (input instanceof DynamodbEvent.DynamodbStreamRecord) {
24+
processItem((DynamodbEvent.DynamodbStreamRecord) input);
25+
}
26+
}
27+
catch(Throwable t) {
28+
// put item in item failure list
29+
}
30+
31+
// TODO - need a way of converting resultset back to return type. Defer to interface?
32+
33+
throw new NotImplementedException();
34+
}
35+
36+
default void processItem(SQSEvent.SQSMessage message) {
37+
System.out.println(message.getMessageId());
38+
}
39+
40+
default void processItem(KinesisEvent.KinesisEventRecord record) {
41+
System.out.println(record.getEventID());
42+
}
43+
44+
default void processItem(DynamodbEvent.DynamodbStreamRecord record) {
45+
System.out.println(record.getEventID());
46+
}
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package software.amazon.lambda.powertools.batch2;
2+
3+
import com.amazonaws.services.lambda.runtime.Context;
4+
import com.amazonaws.services.lambda.runtime.RequestHandler;
5+
import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse;
6+
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
7+
8+
public class ProcessSQSMessageBatch implements RequestHandler<SQSEvent, SQSBatchResponse>, BatchProcessor<SQSEvent, SQSBatchResponse> {
9+
10+
@Override
11+
public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) {
12+
// processBatch is implemented as default in the interface and handle exceptions in the processElement function to add the item to the BatchItemFailure list
13+
14+
// TODO - as we're getting this on the outside, we could pass in `sqsEvent.getMessages()` which would
15+
// fix the "how do I extract event-specific message" problem in the interface default.
16+
return this.processBatch(sqsEvent); // we may need to pass context too... ?
17+
18+
// TODO - as we're still on the outside, we could take the list of results from `this.processBatch()` and
19+
// map it back to the SQS-specific response, again solving the "how do I do this event-specific thing"
20+
// problem in the base class.
21+
}
22+
23+
// this method comes from the BatchProcessor interface, developers need to override the appropriate one
24+
@Override
25+
public void processItem(SQSEvent.SQSMessage message) {
26+
// do some stuff with this item
27+
}
28+
29+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package software.amazon.lambda.powertools.batch2.examples;
2+
3+
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
4+
import software.amazon.lambda.powertools.batch2.ProcessSQSMessageBatch;
5+
6+
public class ExampleBatchRequestHandler extends ProcessSQSMessageBatch {
7+
8+
// this method comes from the BatchProcessor interface, developers need to override the appropriate one
9+
@Override
10+
public void processItem(SQSEvent.SQSMessage message) {
11+
// do some stuff with this item
12+
}
13+
}

0 commit comments

Comments
 (0)