Skip to content

Commit 870ec47

Browse files
author
Scott Gerring
committed
Variant 1
1 parent c34f5d9 commit 870ec47

13 files changed

+120
-100
lines changed

powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/BatchMessageHandlerBuilder.java

+4-8
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,6 @@
11
package software.amazon.lambda.powertools.batch;
22

3-
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
4-
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
5-
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
6-
import software.amazon.lambda.powertools.batch.BatchMessageProcessor;
7-
import software.amazon.lambda.powertools.batch.message.*;
3+
import com.amazonaws.services.lambda.runtime.events.*;
84
import sun.reflect.generics.reflectiveObjects.NotImplementedException;
95

106
import java.time.Duration;
@@ -32,15 +28,15 @@ public BatchMessageHandlerBuilder withTimeout(Duration timeout) {
3228
}
3329

3430
// TODO - can we put a meaningful type on the message, or do we need to do it, per-message-type?
35-
public List<String> process(SQSEvent message, Consumer<SqsMessage> messageHandler) {
31+
public SQSBatchResponse process(SQSEvent message, Consumer<SQSEvent.SQSMessage> messageHandler) {
3632
throw new NotImplementedException();
3733
};
3834

39-
public List<String> process(KinesisEvent message, Consumer<KinesisDataStreamsMessage> messageHandler) {
35+
public StreamsEventResponse process(KinesisEvent message, Consumer<KinesisEvent.KinesisEventRecord> messageHandler) {
4036
throw new NotImplementedException();
4137
}
4238

43-
public List<String> process(DynamodbEvent message, Consumer<DynamoDbStreamMessage> messageHandler) {
39+
public StreamsEventResponse process(DynamodbEvent message, Consumer<DynamodbEvent.DynamodbStreamRecord> messageHandler) {
4440
throw new NotImplementedException();
4541
}
4642

powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/BatchMessageProcessor.java

-18
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package software.amazon.lambda.powertools.batch;
2+
3+
import com.amazonaws.services.lambda.runtime.Context;
4+
import com.amazonaws.services.lambda.runtime.RequestHandler;
5+
import com.sun.xml.internal.ws.api.message.Message;
6+
7+
import java.util.List;
8+
import java.util.stream.Collectors;
9+
10+
public abstract class BatchRequestHandler<T, U, V> implements RequestHandler<T, V> {
11+
12+
protected class MessageProcessingResult<U> {
13+
private final U message;
14+
private final Exception exception;
15+
16+
public MessageProcessingResult(U message, Exception exception) {
17+
this.message = message;
18+
this.exception = exception;
19+
}
20+
}
21+
22+
@Override
23+
public V handleRequest(T input, Context context) {
24+
// Extract messages
25+
List<U> messages = extractMessages(input);
26+
27+
// Try process them
28+
List<MessageProcessingResult<U>> results = messages.stream().map(m -> {
29+
try {
30+
processItem(m, context);
31+
return new MessageProcessingResult<>(m, null);
32+
} catch (Exception e) {
33+
return new MessageProcessingResult<>(m, e);
34+
}
35+
}).collect(Collectors.toList());
36+
37+
// Generate the response
38+
return writeResponse(results);
39+
}
40+
41+
/**
42+
* Provided by the event-specific child to extract the individual records
43+
* from the batch request
44+
*
45+
* @param input
46+
* @return
47+
*/
48+
protected abstract List<U> extractMessages(T input);
49+
50+
protected abstract V writeResponse(Iterable<MessageProcessingResult<U>> results);
51+
52+
public abstract void processItem(U message, Context context);
53+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package software.amazon.lambda.powertools.batch;
2+
3+
import com.amazonaws.services.lambda.runtime.Context;
4+
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
5+
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
6+
import sun.reflect.generics.reflectiveObjects.NotImplementedException;
7+
8+
import java.util.List;
9+
10+
public abstract class KinesisBatchRequestHandler extends BatchRequestHandler<KinesisEvent, KinesisEvent.KinesisEventRecord, StreamsEventResponse> {
11+
12+
@Override
13+
protected List<KinesisEvent.KinesisEventRecord> extractMessages(KinesisEvent input) {
14+
return input.getRecords();
15+
}
16+
17+
@Override
18+
protected StreamsEventResponse writeResponse(Iterable<MessageProcessingResult<KinesisEvent.KinesisEventRecord>> results) {
19+
// Here we map up the kinesis-specific response for the batch based on the success of the individual messages
20+
throw new NotImplementedException();
21+
}
22+
23+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package software.amazon.lambda.powertools.batch;
2+
3+
import com.amazonaws.services.lambda.runtime.Context;
4+
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
5+
import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse;
6+
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
7+
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
8+
import sun.reflect.generics.reflectiveObjects.NotImplementedException;
9+
10+
import java.util.List;
11+
12+
public abstract class SQSBatchRequestHandler extends BatchRequestHandler<SQSEvent, SQSEvent.SQSMessage, SQSBatchResponse> {
13+
14+
@Override
15+
protected List<SQSEvent.SQSMessage> extractMessages(SQSEvent input) {
16+
return input.getRecords();
17+
}
18+
19+
@Override
20+
protected SQSBatchResponse writeResponse(Iterable<MessageProcessingResult<SQSEvent.SQSMessage>> results) {
21+
// Here we map up the SQS-specific response for the batch based on the success of the individual messages
22+
throw new NotImplementedException();
23+
}
24+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package software.amazon.lambda.powertools.batch.examples;
2+
3+
import com.amazonaws.services.lambda.runtime.Context;
4+
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
5+
import software.amazon.lambda.powertools.batch.SQSBatchRequestHandler;
6+
7+
public class ExampleBatchRequestHandler extends SQSBatchRequestHandler {
8+
9+
@Override
10+
public void processItem(SQSEvent.SQSMessage message, Context context) {
11+
// Process an SQS message without throwing
12+
}
13+
}

powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/examples/ExampleMessageHandler.java renamed to powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/examples/ExampleMessageHandlerBuilder.java

+3-8
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,17 @@
44
import com.amazonaws.services.lambda.runtime.RequestHandler;
55
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
66
import software.amazon.lambda.powertools.batch.BatchMessageHandlerBuilder;
7-
import software.amazon.lambda.powertools.batch.BatchMessageProcessor;
8-
import software.amazon.lambda.powertools.batch.message.BatchProcessorMessageType;
9-
import software.amazon.lambda.powertools.batch.message.SqsMessage;
10-
11-
import java.util.List;
127

138
/**
149
* This is just here for illustrative purposes, and won't
1510
* be released with this code!
1611
*/
17-
public class ExampleMessageHandler implements RequestHandler<SQSEvent, List<String>> {
12+
public class ExampleMessageHandlerBuilder implements RequestHandler<SQSEvent, Object> {
1813

1914
@Override
20-
public List<String> handleRequest(SQSEvent sqsEvent, Context context) {
15+
public Object handleRequest(SQSEvent sqsEvent, Context context) {
2116
return new BatchMessageHandlerBuilder()
22-
.process(sqsEvent, (SqsMessage message) -> {
17+
.process(sqsEvent, (SQSEvent.SQSMessage message) -> {
2318
// Process the message without throwing an exception
2419
});
2520
}

powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/message/BatchProcessorMessage.java

-18
This file was deleted.

powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/message/BatchProcessorMessageHandler.java

-7
This file was deleted.

powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/message/BatchProcessorMessageType.java

-7
This file was deleted.

powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/message/DynamoDbStreamMessage.java

-5
This file was deleted.

powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/message/KinesisDataStreamsMessage.java

-15
This file was deleted.

powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/message/SqsMessage.java

-14
This file was deleted.

0 commit comments

Comments
 (0)