Skip to content

docs(batch): extract and fix examples #1114

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,11 @@ changelog:

mypy:
poetry run mypy --pretty aws_lambda_powertools

format-examples:
poetry run isort docs/examples
poetry run black docs/examples/*/*/*.py

lint-examples:
poetry run python3 -m py_compile docs/examples/*/*/*.py
cfn-lint docs/examples/*/*/*.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import json

from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
from aws_lambda_powertools.utilities.typing import LambdaContext

processor = BatchProcessor(event_type=EventType.SQS)
tracer = Tracer()
logger = Logger()


@tracer.capture_method(capture_response=False)
def record_handler(record: SQSRecord):
payload: str = record.body
if payload:
item: dict = json.loads(payload)
...


@logger.inject_lambda_context
@tracer.capture_lambda_handler
@batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event, context: LambdaContext):
return processor.response()
62 changes: 62 additions & 0 deletions docs/examples/utilities/batch/custom_batch_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import os
from random import randint

import boto3

from aws_lambda_powertools.utilities.batch import BasePartialProcessor, batch_processor

table_name = os.getenv("TABLE_NAME", "table_not_found")


class MyPartialProcessor(BasePartialProcessor):
"""
Process a record and stores successful results at a Amazon DynamoDB Table

Parameters
----------
table_name: str
DynamoDB table name to write results to
"""

def __init__(self, table_name: str):
self.table_name = table_name

super().__init__()

def _prepare(self):
# It's called once, *before* processing
# Creates table resource and clean previous results
self.ddb_table = boto3.resource("dynamodb").Table(self.table_name)
self.success_messages.clear()

def _clean(self):
# It's called once, *after* closing processing all records (closing the context manager)
# Here we're sending, at once, all successful messages to a ddb table
with self.ddb_table.batch_writer() as batch:
for result in self.success_messages:
batch.put_item(Item=result)

def _process_record(self, record):
# It handles how your record is processed
# Here we're keeping the status of each run
# where self.handler is the record_handler function passed as an argument
try:
result = self.handler(record) # record_handler passed to decorator/context manager
return self.success_handler(record, result)
except Exception as exc:
return self.failure_handler(record, exc)

def success_handler(self, record, result):
entry = ("success", result, record)
message = {"age": result}
self.success_messages.append(message)
return entry


def record_handler(record):
return randint(0, 100)


@batch_processor(record_handler=record_handler, processor=MyPartialProcessor(table_name))
def lambda_handler(event, context):
return {"statusCode": 200}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import boto3

from aws_lambda_powertools.utilities.batch import PartialSQSProcessor

session = boto3.session.Session()


def record_handler(record):
# This will be called for each individual message from a batch
# It should raise an exception if the message was not processed successfully
return_value = do_something_with(record["body"])
return return_value


def lambda_handler(event, context):
records = event["Records"]

processor = PartialSQSProcessor(boto3_session=session)

with processor(records, record_handler):
result = processor.process()

return result
17 changes: 17 additions & 0 deletions docs/examples/utilities/batch/custom_boto3_session_decorator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import boto3

from aws_lambda_powertools.utilities.batch import sqs_batch_processor

session = boto3.session.Session()


def record_handler(record):
# This will be called for each individual message from a batch
# It should raise an exception if the message was not processed successfully
return_value = do_something_with(record["body"])
return return_value


@sqs_batch_processor(record_handler=record_handler, boto3_session=session)
def lambda_handler(event, context):
return {"statusCode": 200}
23 changes: 23 additions & 0 deletions docs/examples/utilities/batch/custom_config_context_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from botocore.config import Config

from aws_lambda_powertools.utilities.batch import PartialSQSProcessor

config = Config(region_name="us-east-1")


def record_handler(record):
# This will be called for each individual message from a batch
# It should raise an exception if the message was not processed successfully
return_value = do_something_with(record["body"])
return return_value


def lambda_handler(event, context):
records = event["Records"]

processor = PartialSQSProcessor(config=config)

with processor(records, record_handler):
result = processor.process()

return result
17 changes: 17 additions & 0 deletions docs/examples/utilities/batch/custom_config_decorator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from botocore.config import Config

from aws_lambda_powertools.utilities.batch import sqs_batch_processor

config = Config(region_name="us-east-1")


def record_handler(record):
# This will be called for each individual message from a batch
# It should raise an exception if the message was not processed successfully
return_value = do_something_with(record["body"])
return return_value


@sqs_batch_processor(record_handler=record_handler, config=config)
def lambda_handler(event, context):
return {"statusCode": 200}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import json

from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import DynamoDBRecord
from aws_lambda_powertools.utilities.typing import LambdaContext

processor = BatchProcessor(event_type=EventType.DynamoDBStreams)
tracer = Tracer()
logger = Logger()


@tracer.capture_method
def record_handler(record: DynamoDBRecord):
logger.info(record.dynamodb.new_image)
payload: dict = json.loads(record.dynamodb.new_image.get("item").s_value)
# alternatively:
# changes: Dict[str, dynamo_db_stream_event.AttributeValue] = record.dynamodb.new_image
# payload = change.get("Message").raw_event -> {"S": "<payload>"}
...


@logger.inject_lambda_context
@tracer.capture_lambda_handler
def lambda_handler(event, context: LambdaContext):
batch = event["Records"]
with processor(records=batch, handler=record_handler):
processed_messages = processor.process() # kick off processing, return list[tuple]

return processor.response()
27 changes: 27 additions & 0 deletions docs/examples/utilities/batch/dynamodb_streams_decorator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import json

from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import DynamoDBRecord
from aws_lambda_powertools.utilities.typing import LambdaContext

processor = BatchProcessor(event_type=EventType.DynamoDBStreams)
tracer = Tracer()
logger = Logger()


@tracer.capture_method
def record_handler(record: DynamoDBRecord):
logger.info(record.dynamodb.new_image)
payload: dict = json.loads(record.dynamodb.new_image.get("Message").get_value)
# alternatively:
# changes: Dict[str, dynamo_db_stream_event.AttributeValue] = record.dynamodb.new_image
# payload = change.get("Message").raw_event -> {"S": "<payload>"}
...


@logger.inject_lambda_context
@tracer.capture_lambda_handler
@batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event, context: LambdaContext):
return processor.response()
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import json
from typing import Dict, Literal, Optional

from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
from aws_lambda_powertools.utilities.parser import BaseModel, validator
from aws_lambda_powertools.utilities.parser.models import DynamoDBStreamChangedRecordModel, DynamoDBStreamRecordModel
from aws_lambda_powertools.utilities.typing import LambdaContext


class Order(BaseModel):
item: dict


class OrderDynamoDB(BaseModel):
Message: Order

# auto transform json string
# so Pydantic can auto-initialize nested Order model
@validator("Message", pre=True)
def transform_message_to_dict(cls, value: Dict[Literal["S"], str]):
return json.loads(value["S"])


class OrderDynamoDBChangeRecord(DynamoDBStreamChangedRecordModel):
NewImage: Optional[OrderDynamoDB]
OldImage: Optional[OrderDynamoDB]


class OrderDynamoDBRecord(DynamoDBStreamRecordModel):
dynamodb: OrderDynamoDBChangeRecord


processor = BatchProcessor(event_type=EventType.DynamoDBStreams, model=OrderDynamoDBRecord)
tracer = Tracer()
logger = Logger()


@tracer.capture_method
def record_handler(record: OrderDynamoDBRecord):
return record.dynamodb.NewImage.Message.item


@logger.inject_lambda_context
@tracer.capture_lambda_handler
@batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event, context: LambdaContext):
return processor.response()
66 changes: 66 additions & 0 deletions docs/examples/utilities/batch/dynamodb_streams_template.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: partial batch response sample

Globals:
Function:
Timeout: 5
MemorySize: 256
Runtime: python3.9
Tracing: Active
Environment:
Variables:
LOG_LEVEL: INFO
POWERTOOLS_SERVICE_NAME: hello

Resources:
HelloWorldFunction:
Type: AWS::Serverless::Function
Properties:
Handler: app.lambda_handler
CodeUri: hello_world
Policies:
# Lambda Destinations require additional permissions
# to send failure records from Kinesis/DynamoDB
- Version: "2012-10-17"
Statement:
Effect: "Allow"
Action:
- sqs:GetQueueAttributes
- sqs:GetQueueUrl
- sqs:SendMessage
Resource: !GetAtt SampleDLQ.Arn
Events:
DynamoDBStream:
Type: DynamoDB
Properties:
Stream: !GetAtt SampleTable.StreamArn
StartingPosition: LATEST
MaximumRetryAttempts: 2
DestinationConfig:
OnFailure:
Destination: !GetAtt SampleDLQ.Arn
FunctionResponseTypes:
- ReportBatchItemFailures

SampleDLQ:
Type: AWS::SQS::Queue

SampleTable:
Type: AWS::DynamoDB::Table
Properties:
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
- AttributeName: pk
AttributeType: S
- AttributeName: sk
AttributeType: S
KeySchema:
- AttributeName: pk
KeyType: HASH
- AttributeName: sk
KeyType: RANGE
SSESpecification:
SSEEnabled: yes
StreamSpecification:
StreamViewType: NEW_AND_OLD_IMAGES
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import json

from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import KinesisStreamRecord
from aws_lambda_powertools.utilities.typing import LambdaContext

processor = BatchProcessor(event_type=EventType.KinesisDataStreams)
tracer = Tracer()
logger = Logger()


@tracer.capture_method
def record_handler(record: KinesisStreamRecord):
logger.info(record.kinesis.data_as_text)
payload: dict = record.kinesis.data_as_json()
...


@logger.inject_lambda_context
@tracer.capture_lambda_handler
def lambda_handler(event, context: LambdaContext):
batch = event["Records"]
with processor(records=batch, handler=record_handler):
processed_messages = processor.process() # kick off processing, return list[tuple]

return processor.response()
Loading