-
Notifications
You must be signed in to change notification settings - Fork 153
RFC: Addition of BatchProcessing Utilities for EventSource Mapping triggers (akin to python utility) #1082
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Hi @walmsles, thank you for opening this RFC. Batch processing is definitely one of the use cases that we want to look into and consider for the future, however at the moment we cannot commit to any timeline as we are already working on Idempotency, Parameters, and other topics. However I think this is a valuable RFC and I would be keen to hear what the community & other maintainers have to say in terms of design, DX, etc. |
+1 |
I will be picking up the implementation for this -- design details are in the works! |
Hi everyone, here is a design proposal. Would appreciate any feedback, especially on alternative solutions listed for some of the extra features. Design Proposal (Request for comments)1 SummaryThe goal of this document is to propose the scope and design of the batch processing utility for Powertools for AWS (TypeScript). The utility has been implemented in the Python and Java repositories. We will use the current Python implementation (https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/) as a baseline, and describe the design decisions we will make in TypeScript. 2 MotivationBatch processing is a utility that is currently used frequently by customers in Python. Typically, when sending a batch of messages to a Lambda function to process, an error on one message will cause the entire batch to return to the queue and be processed again. The batch processing utility allows customers using SQS, Kinesis, or DynamoDB Stream connected to a Lambda function to ensure only errored messages are re-run through the Lambda function. To maintain parity with the Python version and provide this useful functionality to customers using the TypeScript repository, we provide the following design proposal. 3 Utility InterfaceThere are two primary usages of the batch processing utility we will aim to support:
Both usages will be able to support batch processing from SQS, Kinesis, and DynamoDB. 3.1 Batch Processing Function This is an example of how the batch processing function would be invoked for processing records from SQS: import {
BatchProcessor,
EventType,
processPartialResponse
} from `@aws-lambda-powertools/batch`;
import { SQSRecord } from "aws-lambda";
const processor = new BatchProcessor({ eventType: EventType.SQS });
const recordHandler = async (record: SQSRecord): Promise<void> {
// example of record handling logic, this is provided by customers
const item = JSON.parse(record.body);
console.log(item)
}
const lambdaHandler = async ( _event: any, _context: any): Promise<void> => {
return processPartialResponse(_event, recordHandler, processor);
} The 3.2 Batch Processing Handler import {
BatchProcessor,
EventType,
makeBatchHandler
} from '@aws-lambda-powertools/batch';
import type { SQSRecord, Context } from "aws-lambda";
import middy from '@middy/core';
const processor = new BatchProcessor({ eventType: EventType.SQS })
const lambdaHandler = async (event: SQSRecord, _context: Context): Promise<void> => {
/* ...processor logic here... */
const item = JSON.parse(record.body);
console.log(item)
}
export const handler = middy(lambdaHandler)
.use(makeBatchHandler({
processor: processor
}); 4 Additional FeaturesThese features may not be released as part of the first release, but should be implemented soon thereafter. 4.1 Bring your own processor The class myProcessor extends BatchProcessor {
public successHandler(self, record, result): SuccessResponse {
// custom success handling logic
}
public failureHandler(self, record, exception): FailureResponse {
// custom failure handling logic
}
} 4.2 Access processed messages This will allow users to access a list of all returned values from the import {
BatchProcessor,
EventType,
} from '@aws-lambda-powertools/batch';
import type { SQSEvent, Context, SQSBatchResponse } from 'aws-lambda';
const processor = new BatchProcessor({ eventType: EventType.SQS });
const recordHandler = async (record: SQSRecord): Promise<void> {
// example of record handling logic, this is provided by customers
const item = JSON.parse(record.body);
console.log(item)
}
export const handler = async (event: SQSEvent, context: Context): Promise<SQSBatchResponse> => {
const batch = event.Records;
processor.register({ records: batch, handler: recordHandler });
const processedMessaeges = processor.process();
for (const message in processedMessages) {
const [status, record] = message;
console.log(status, record);
}
return processorClass.response();
} This implementation would require the definition of an additional 4.3 FIFO queues for SQS In addition to the base import {
SqsFifoPartialProcessor,
EventType,
processPartialResponse
} from `@aws-lambda-powertools/batch`;
import { SQSRecord } from "aws-lambda";
const processor = new SqsFifoPartialProcessor();
const recordHandler = async (record: SQSRecord): Promise<void> {
// example of record handling logic, this is provided by customers
const item = JSON.parse(record.body);
console.log(item)
}
const lambdaHandler = async ( _event: any, _context: any): Promise<void> => {
return processPartialResponse(_event, recordHandler, processor);
} 4.4 Accessing Lambda context In the use case where customers may need to access Lambda context while handling records, the recordHandler can optionally be defined to take in a import {
BatchProcessor,
EventType,
processPartialResponse
} from `@aws-lambda-powertools/batch`;
import { SQSRecord } from "aws-lambda";
const processor = new BatchProcessor({ eventType: EventType.SQS });
const recordHandler = async (record: SQSRecord, lambdaContext?: Context): Promise<void> {
lambdaContext.getRemaining...() // access some Lambda context
}
const lambdaHandler = async ( _event: any, _context: LambdaContext): Promise<void> => {
return processPartialResponse(_event, recordHandler, processor, _context);
} Alternative solution for accessing lambda context: Rather than accessing Lambda context directly from a // Option 2
type BatchProcessingOptions = {
context: Context;
}
const recordHandler = async (record: SQSRecord, options: BatchProcessingOptions) => {
const { context } = options;
context.getRemaining...()
} This typing will allow us to provide additional parameters for the 5 Out of ScopePython has an integration with Pydantic & Event Source Data Classes. For TypeScript, for now, we will rely on types only. This will ensure type safety but not runtime safety/validation. We will revisit this and consider an integration when we address the Parser utility. 6 Other Discussions6.1 Processor decorator The processor decorator functionality is now considered legacy in Python, as customers often felt it was too complex, and the processor function was created instead to reduce boilerplate. As a result, for this design, we will skip implementation for the decorator and focus on the processor function from the start. |
|
This is now released under v1.12.1 version! |
BatchProcessing Utility
Adding a BatchProcessing utility with the feature set from Python is needed to simplify coding and developer experience around processing data from EventSource mappings.
Summary
Motivation
This utility IMO allows teams to move faster and not make silly mistakes in batch processing - it provides a safety net for new serverless developers who make similar mistakes over and over, which I can prevent by making them use Python 😬. The automated handling of Batch Errors in Python is Out of the Box and dead simple to use, so Devs can get it right 100% of the time without thinking too deeply about it.
Combined with Idempotency across functions (when available) is a killer feature and is what provides the "Power" in Powertools for me.
Proposal
I am not a Node Typescript expert, so leaving a more detailed proposal to the wider Typescript contributors, maintainers and community.
User Experience
Drawbacks
Rationale and alternatives
Unresolved questions
The text was updated successfully, but these errors were encountered: