Skip to content

Latest commit

 

History

History
137 lines (97 loc) · 5.41 KB

File metadata and controls

137 lines (97 loc) · 5.41 KB
title description
Batch
Utility

import Note from "../../src/components/Note"

One very attractive feature of Lambda functions is the ability to integrate them with a plethora of internal and external event sources. Some of these event providers allows a feature called "Batch processing" in which predefined number of events is sent to lambda function at once.

The proposed batch utility aims to provide an abstraction to handle a partial failure during a batch execution from a SQS event source, providing a base class (BasePartialProcessor) allowing you to create your own batch processor.

Key Features

  • Removal of successful messages for AWS SQS batch - in case of partial failure;
  • Build your own batch processor using the base classes.

IAM Permissions

This utility requires additional permissions to work as expected. See the following table:

Processor Function/Method IAM Permission
PartialSQSProcessor _clean sqs:DeleteMessageBatch

PartialSQSProcessor

SQS integration with Lambda is one of the most well established ones and pretty useful when building asynchronous applications. One common approach to maximize the performance of this integration is to enable the batch processing feature, resulting in higher throughput with less invocations.

As any function call, you may face errors during execution, in one or more records belonging to a batch. SQS's native behavior is to redrive the whole batch to the queue again, reprocessing all of them again, including successful ones. This cycle can happen multiple times depending on your configuration, until the whole batch succeeds or the maximum number of attempts is reached. Your application may face some problems with such behavior, especially if there's no idempotency.

A naive approach to solving this problem is to delete successful records from the queue before redriving's phase. The PartialSQSProcessor class offers this solution both as context manager and middleware, removing all successful messages from the queue case one or more failures occurred during lambda's execution. Two examples are provided below, displaying the behavior of this class.

Examples:

from aws_lambda_powertools.utilities.batch import batch_processor, PartialSQSProcessor

def record_handler(record):
    return record["body"]

def lambda_handler(event, context):
    records = event["Records"]

    # highlight-start
    with processor(records, record_handler):
        result = processor.process()
    # highlight-end

    return result
from aws_lambda_powertools.utilities.batch import batch_processor, PartialSQSProcessor

def record_handler(record):
    return record["body"]

# highlight-start
@batch_processor(record_handler=record_handler, processor=PartialSQSProcessor())
# highlight-end
def lambda_handler(event, context):
    return {"statusCode": 200}

Create your own partial processor

You can create your own partial batch processor by inheriting the BasePartialProcessor class, and implementing _prepare(), _clean() and _process_record().

All processing logic is handled by _process_record() whilst _prepare() and clean() take care of doing a setup/teardown of the processor, being called at start/end of processor's execution, respectively.

Example:

from random import randint

from aws_lambda_powertools.utilities.batch import BasePartialProcessor, batch_processor
import boto3

def record_handler(record):
    return randint(0, 100)


class MyPartialProcessor(BasePartialProcessor):
    """
    Process a record and stores successful results at a DDB Table

    Parameters
    ----------
    table_name: str
        Table name to write results
    """

    def __init__(self, table_name: str):
        self.table_name = table_name

        super().__init__()

    def _prepare(self):
        # It's called once, *before* processing
        # Creates table resource and clean previous results
        # E.g.:
        self.ddb_table = boto3.resource("dynamodb").Table(self.table_name)
        self.success_messages.clear()

    def _clean(self):
        # It's called once, *after* closing processing all records (closing the context manager)
        # Here we're sending, at once, all successful messages to a ddb table
        # E.g.:
        with ddb_table.batch_writer() as batch:
            for result in self.success_messages:
                batch.put_item(Item=result)

    def _process_record(self, record):
        # It handles how your record is processed
        # Here we're keeping the status of each run
        # E.g.:
        try:
            result = self.handler(record)
            return self.success_handler(record, result)
        except Exception as exc:
            return self.failure_handler(record, exc)

    def success_handler(self, record):
        entry = ("success", result, record)
        message = {"age": result}
        self.success_messages.append(message)
        return entry


@batch_processor(record_handler=record_handler, processor=MyPartialProcessor("dummy-table"))
def lambda_handler(event, context):
    return {"statusCode": 200}