Skip to content

Latest commit

 

History

History
223 lines (157 loc) · 8.12 KB

File metadata and controls

223 lines (157 loc) · 8.12 KB
title description
SQS Batch Processing
Utility

import Note from "../../src/components/Note"

The SQS batch processing utility provides a way to handle partial failures when processing batches of messages from SQS.

Key Features

  • Prevent succesfully processed messages being returned to SQS
  • Simple interface for individually processing messages from a batch
  • Build your own batch processor using the base classes

Background

When using SQS as a Lambda event source mapping, functions can be triggered with a batch of messages from SQS. If the Lambda function fails when processing the batch, all messages in the batch will be returned to the queue. With this utility, messages within a batch are handled individually - only messages that were not successfully processed are returned to the queue. More details on how Lambda works with SQS can be found in the AWS documentation.

While this utility lowers the chance of processing messages more than once, it is not guaranteed. We recommend implementing processing logic in an idempotent manner wherever possible.

IAM Permissions

This utility requires additional permissions to work as expected. Lambda functions using this utility require the sqs:DeleteMessageBatch permission.

Processing messages from SQS

There are 2 ways to use this utility for processing SQS messages:

With a decorator:

Using the sqs_batch_processor decorator with your lambda handler function, you provide a record_handler which is responsible for processing individual messages. It should raise an exception if it is unable to process the record. All records in the batch will be passed to this handler for processing, even if exceptions are thrown. After all messages are processed, any successfully processed ones will be deleted from the queue. If there were any messages the record_handler couldn't process, SQSBatchProcessingError will be raised. You will not have accessed to the processed messages within the lambda handler - all processing logic should be performed by the record_handler function.

from aws_lambda_powertools.utilities.batch import sqs_batch_processor

def record_handler(record):
    # This will be called for each individual message from a batch
    # It should raise an exception if the message was not processed successfully
    return_value = do_something_with(record["body"])
    return return_value

@sqs_batch_processor(record_handler=record_handler)
def lambda_handler(event, context):
    return {"statusCode": 200}

With a context manager:

If you require access to the result of processed messages, you can use the context manager. The result from calling process() on the context manager will be a list of all the return values from your record_handler function.

from aws_lambda_powertools.utilities.batch import PartialSQSProcessor

def record_handler(record):
    # This will be called for each individual message from a batch
    # It should raise an exception if the message was not processed successfully
    return_value = do_something_with(record["body"])
    return return_value


def lambda_handler(event, context):
    records = event["Records"]

    processor = PartialSQSProcessor()

    with processor(records, record_handler):
        result = processor.process()  # Returns a list of all results from record_handler

    return result

Passing custom boto3 config

If you need to pass custom configuration such as region to the SDK, you can pass your own botocore config object to the sqs_batch_processor decorator:

from aws_lambda_powertools.utilities.batch import sqs_batch_processor
from botocore.config import Config

config = Config(region_name="us-east-1")  # highlight-line

def record_handler(record):
    # This will be called for each individual message from a batch
    # It should raise an exception if the message was not processed successfully
    return_value = do_something_with(record["body"])
    return return_value

@sqs_batch_processor(record_handler=record_handler, config=config)  # highlight-line
def lambda_handler(event, context):
    return {"statusCode": 200}

Or to the PartialSQSProcessor class:

from aws_lambda_powertools.utilities.batch import PartialSQSProcessor

from botocore.config import Config

config = Config(region_name="us-east-1")  # highlight-line

def record_handler(record):
    # This will be called for each individual message from a batch
    # It should raise an exception if the message was not processed successfully
    return_value = do_something_with(record["body"])
    return return_value


def lambda_handler(event, context):
    records = event["Records"]

    processor = PartialSQSProcessor(config=config)  # highlight-line

    with processor(records, record_handler):
        result = processor.process()

    return result

Suppressing exceptions

If you want to disable the defualt behavior where SQSBatchProcessingError is raised if there are any errors, you can pass the suppress_exception argument.

If your Lambda function executes successfully and returns a response, all messages in the batch will be deleted from the queue.
...
@sqs_batch_processor(record_handler=record_handler, config=config, suppress_exception=True)  # highlight-line
def lambda_handler(event, context):
    return {"statusCode": 200}

or

processor = PartialSQSProcessor(config=config, suppress_exception=True)  # highlight-line

with processor(records, record_handler):
    result = processor.process()

Create your own partial processor

You can create your own partial batch processor by inheriting the BasePartialProcessor class, and implementing _prepare(), _clean() and _process_record().

All processing logic is handled by _process_record() whilst _prepare() and clean() take care of doing a setup/teardown of the processor, being called at start/end of processor's execution, respectively.

You can then use this class as a context manager, or pass it to batch_processor to use as a decorator on your Lambda handler function.

Example:

from random import randint

from aws_lambda_powertools.utilities.batch import BasePartialProcessor, batch_processor
import boto3

def record_handler(record):
    return randint(0, 100)


class MyPartialProcessor(BasePartialProcessor):
    """
    Process a record and stores successful results at a DDB Table

    Parameters
    ----------
    table_name: str
        Table name to write results
    """

    def __init__(self, table_name: str):
        self.table_name = table_name

        super().__init__()

    def _prepare(self):
        # It's called once, *before* processing
        # Creates table resource and clean previous results
        # E.g.:
        self.ddb_table = boto3.resource("dynamodb").Table(self.table_name)
        self.success_messages.clear()

    def _clean(self):
        # It's called once, *after* closing processing all records (closing the context manager)
        # Here we're sending, at once, all successful messages to a ddb table
        # E.g.:
        with ddb_table.batch_writer() as batch:
            for result in self.success_messages:
                batch.put_item(Item=result)

    def _process_record(self, record):
        # It handles how your record is processed
        # Here we're keeping the status of each run
        # E.g.:
        try:
            result = self.handler(record)
            return self.success_handler(record, result)
        except Exception as exc:
            return self.failure_handler(record, exc)

    def success_handler(self, record):
        entry = ("success", result, record)
        message = {"age": result}
        self.success_messages.append(message)
        return entry


@batch_processor(record_handler=record_handler, processor=MyPartialProcessor("dummy-table"))
def lambda_handler(event, context):
    return {"statusCode": 200}