Skip to content

Commit 9787fa9

Browse files
committed
fix: Extract batch docs examples
Changes: - Extract examples for batch - Fix python syntax - Fix yaml synxtax Related to - aws-powertools#1064
1 parent b577366 commit 9787fa9

31 files changed

+933
-872
lines changed

Diff for: Makefile

+8
Original file line numberDiff line numberDiff line change
@@ -90,3 +90,11 @@ changelog:
9090

9191
mypy:
9292
poetry run mypy --pretty aws_lambda_powertools
93+
94+
format-examples:
95+
poetry run isort docs/examples
96+
poetry run black docs/examples/*/*/*.py
97+
98+
lint-examples:
99+
poetry run python3 -m py_compile docs/examples/*/*/*.py
100+
cfn-lint docs/examples/*/*/*.yml
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import json
2+
3+
from aws_lambda_powertools import Logger, Tracer
4+
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
5+
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
6+
from aws_lambda_powertools.utilities.typing import LambdaContext
7+
8+
processor = BatchProcessor(event_type=EventType.SQS)
9+
tracer = Tracer()
10+
logger = Logger()
11+
12+
13+
@tracer.capture_method(capture_response=False)
14+
def record_handler(record: SQSRecord):
15+
payload: str = record.body
16+
if payload:
17+
item: dict = json.loads(payload)
18+
...
19+
20+
21+
@logger.inject_lambda_context
22+
@tracer.capture_lambda_handler
23+
@batch_processor(record_handler=record_handler, processor=processor)
24+
def lambda_handler(event, context: LambdaContext):
25+
return processor.response()
+62
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import os
2+
from random import randint
3+
4+
import boto3
5+
6+
from aws_lambda_powertools.utilities.batch import BasePartialProcessor, batch_processor
7+
8+
table_name = os.getenv("TABLE_NAME", "table_not_found")
9+
10+
11+
class MyPartialProcessor(BasePartialProcessor):
12+
"""
13+
Process a record and stores successful results at a Amazon DynamoDB Table
14+
15+
Parameters
16+
----------
17+
table_name: str
18+
DynamoDB table name to write results to
19+
"""
20+
21+
def __init__(self, table_name: str):
22+
self.table_name = table_name
23+
24+
super().__init__()
25+
26+
def _prepare(self):
27+
# It's called once, *before* processing
28+
# Creates table resource and clean previous results
29+
self.ddb_table = boto3.resource("dynamodb").Table(self.table_name)
30+
self.success_messages.clear()
31+
32+
def _clean(self):
33+
# It's called once, *after* closing processing all records (closing the context manager)
34+
# Here we're sending, at once, all successful messages to a ddb table
35+
with self.ddb_table.batch_writer() as batch:
36+
for result in self.success_messages:
37+
batch.put_item(Item=result)
38+
39+
def _process_record(self, record):
40+
# It handles how your record is processed
41+
# Here we're keeping the status of each run
42+
# where self.handler is the record_handler function passed as an argument
43+
try:
44+
result = self.handler(record) # record_handler passed to decorator/context manager
45+
return self.success_handler(record, result)
46+
except Exception as exc:
47+
return self.failure_handler(record, exc)
48+
49+
def success_handler(self, record, result):
50+
entry = ("success", result, record)
51+
message = {"age": result}
52+
self.success_messages.append(message)
53+
return entry
54+
55+
56+
def record_handler(record):
57+
return randint(0, 100)
58+
59+
60+
@batch_processor(record_handler=record_handler, processor=MyPartialProcessor(table_name))
61+
def lambda_handler(event, context):
62+
return {"statusCode": 200}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import boto3
2+
3+
from aws_lambda_powertools.utilities.batch import PartialSQSProcessor
4+
5+
session = boto3.session.Session()
6+
7+
8+
def record_handler(record):
9+
# This will be called for each individual message from a batch
10+
# It should raise an exception if the message was not processed successfully
11+
return_value = do_something_with(record["body"])
12+
return return_value
13+
14+
15+
def lambda_handler(event, context):
16+
records = event["Records"]
17+
18+
processor = PartialSQSProcessor(boto3_session=session)
19+
20+
with processor(records, record_handler):
21+
result = processor.process()
22+
23+
return result
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import boto3
2+
3+
from aws_lambda_powertools.utilities.batch import sqs_batch_processor
4+
5+
session = boto3.session.Session()
6+
7+
8+
def record_handler(record):
9+
# This will be called for each individual message from a batch
10+
# It should raise an exception if the message was not processed successfully
11+
return_value = do_something_with(record["body"])
12+
return return_value
13+
14+
15+
@sqs_batch_processor(record_handler=record_handler, boto3_session=session)
16+
def lambda_handler(event, context):
17+
return {"statusCode": 200}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from botocore.config import Config
2+
3+
from aws_lambda_powertools.utilities.batch import PartialSQSProcessor
4+
5+
config = Config(region_name="us-east-1")
6+
7+
8+
def record_handler(record):
9+
# This will be called for each individual message from a batch
10+
# It should raise an exception if the message was not processed successfully
11+
return_value = do_something_with(record["body"])
12+
return return_value
13+
14+
15+
def lambda_handler(event, context):
16+
records = event["Records"]
17+
18+
processor = PartialSQSProcessor(config=config)
19+
20+
with processor(records, record_handler):
21+
result = processor.process()
22+
23+
return result
+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
from botocore.config import Config
2+
3+
from aws_lambda_powertools.utilities.batch import sqs_batch_processor
4+
5+
config = Config(region_name="us-east-1")
6+
7+
8+
def record_handler(record):
9+
# This will be called for each individual message from a batch
10+
# It should raise an exception if the message was not processed successfully
11+
return_value = do_something_with(record["body"])
12+
return return_value
13+
14+
15+
@sqs_batch_processor(record_handler=record_handler, config=config)
16+
def lambda_handler(event, context):
17+
return {"statusCode": 200}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import json
2+
3+
from aws_lambda_powertools import Logger, Tracer
4+
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
5+
from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import DynamoDBRecord
6+
from aws_lambda_powertools.utilities.typing import LambdaContext
7+
8+
processor = BatchProcessor(event_type=EventType.DynamoDBStreams)
9+
tracer = Tracer()
10+
logger = Logger()
11+
12+
13+
@tracer.capture_method
14+
def record_handler(record: DynamoDBRecord):
15+
logger.info(record.dynamodb.new_image)
16+
payload: dict = json.loads(record.dynamodb.new_image.get("item").s_value)
17+
# alternatively:
18+
# changes: Dict[str, dynamo_db_stream_event.AttributeValue] = record.dynamodb.new_image
19+
# payload = change.get("Message").raw_event -> {"S": "<payload>"}
20+
...
21+
22+
23+
@logger.inject_lambda_context
24+
@tracer.capture_lambda_handler
25+
def lambda_handler(event, context: LambdaContext):
26+
batch = event["Records"]
27+
with processor(records=batch, handler=record_handler):
28+
processed_messages = processor.process() # kick off processing, return list[tuple]
29+
30+
return processor.response()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import json
2+
3+
from aws_lambda_powertools import Logger, Tracer
4+
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
5+
from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import DynamoDBRecord
6+
from aws_lambda_powertools.utilities.typing import LambdaContext
7+
8+
processor = BatchProcessor(event_type=EventType.DynamoDBStreams)
9+
tracer = Tracer()
10+
logger = Logger()
11+
12+
13+
@tracer.capture_method
14+
def record_handler(record: DynamoDBRecord):
15+
logger.info(record.dynamodb.new_image)
16+
payload: dict = json.loads(record.dynamodb.new_image.get("Message").get_value)
17+
# alternatively:
18+
# changes: Dict[str, dynamo_db_stream_event.AttributeValue] = record.dynamodb.new_image
19+
# payload = change.get("Message").raw_event -> {"S": "<payload>"}
20+
...
21+
22+
23+
@logger.inject_lambda_context
24+
@tracer.capture_lambda_handler
25+
@batch_processor(record_handler=record_handler, processor=processor)
26+
def lambda_handler(event, context: LambdaContext):
27+
return processor.response()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import json
2+
from typing import Dict, Literal, Optional
3+
4+
from aws_lambda_powertools import Logger, Tracer
5+
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
6+
from aws_lambda_powertools.utilities.parser import BaseModel, validator
7+
from aws_lambda_powertools.utilities.parser.models import DynamoDBStreamChangedRecordModel, DynamoDBStreamRecordModel
8+
from aws_lambda_powertools.utilities.typing import LambdaContext
9+
10+
11+
class Order(BaseModel):
12+
item: dict
13+
14+
15+
class OrderDynamoDB(BaseModel):
16+
Message: Order
17+
18+
# auto transform json string
19+
# so Pydantic can auto-initialize nested Order model
20+
@validator("Message", pre=True)
21+
def transform_message_to_dict(cls, value: Dict[Literal["S"], str]):
22+
return json.loads(value["S"])
23+
24+
25+
class OrderDynamoDBChangeRecord(DynamoDBStreamChangedRecordModel):
26+
NewImage: Optional[OrderDynamoDB]
27+
OldImage: Optional[OrderDynamoDB]
28+
29+
30+
class OrderDynamoDBRecord(DynamoDBStreamRecordModel):
31+
dynamodb: OrderDynamoDBChangeRecord
32+
33+
34+
processor = BatchProcessor(event_type=EventType.DynamoDBStreams, model=OrderDynamoDBRecord)
35+
tracer = Tracer()
36+
logger = Logger()
37+
38+
39+
@tracer.capture_method
40+
def record_handler(record: OrderDynamoDBRecord):
41+
return record.dynamodb.NewImage.Message.item
42+
43+
44+
@logger.inject_lambda_context
45+
@tracer.capture_lambda_handler
46+
@batch_processor(record_handler=record_handler, processor=processor)
47+
def lambda_handler(event, context: LambdaContext):
48+
return processor.response()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
AWSTemplateFormatVersion: '2010-09-09'
2+
Transform: AWS::Serverless-2016-10-31
3+
Description: partial batch response sample
4+
5+
Globals:
6+
Function:
7+
Timeout: 5
8+
MemorySize: 256
9+
Runtime: python3.9
10+
Tracing: Active
11+
Environment:
12+
Variables:
13+
LOG_LEVEL: INFO
14+
POWERTOOLS_SERVICE_NAME: hello
15+
16+
Resources:
17+
HelloWorldFunction:
18+
Type: AWS::Serverless::Function
19+
Properties:
20+
Handler: app.lambda_handler
21+
CodeUri: hello_world
22+
Policies:
23+
# Lambda Destinations require additional permissions
24+
# to send failure records from Kinesis/DynamoDB
25+
- Version: "2012-10-17"
26+
Statement:
27+
Effect: "Allow"
28+
Action:
29+
- sqs:GetQueueAttributes
30+
- sqs:GetQueueUrl
31+
- sqs:SendMessage
32+
Resource: !GetAtt SampleDLQ.Arn
33+
Events:
34+
DynamoDBStream:
35+
Type: DynamoDB
36+
Properties:
37+
Stream: !GetAtt SampleTable.StreamArn
38+
StartingPosition: LATEST
39+
MaximumRetryAttempts: 2
40+
DestinationConfig:
41+
OnFailure:
42+
Destination: !GetAtt SampleDLQ.Arn
43+
FunctionResponseTypes:
44+
- ReportBatchItemFailures
45+
46+
SampleDLQ:
47+
Type: AWS::SQS::Queue
48+
49+
SampleTable:
50+
Type: AWS::DynamoDB::Table
51+
Properties:
52+
BillingMode: PAY_PER_REQUEST
53+
AttributeDefinitions:
54+
- AttributeName: pk
55+
AttributeType: S
56+
- AttributeName: sk
57+
AttributeType: S
58+
KeySchema:
59+
- AttributeName: pk
60+
KeyType: HASH
61+
- AttributeName: sk
62+
KeyType: RANGE
63+
SSESpecification:
64+
SSEEnabled: yes
65+
StreamSpecification:
66+
StreamViewType: NEW_AND_OLD_IMAGES
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import json
2+
3+
from aws_lambda_powertools import Logger, Tracer
4+
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
5+
from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import KinesisStreamRecord
6+
from aws_lambda_powertools.utilities.typing import LambdaContext
7+
8+
processor = BatchProcessor(event_type=EventType.KinesisDataStreams)
9+
tracer = Tracer()
10+
logger = Logger()
11+
12+
13+
@tracer.capture_method
14+
def record_handler(record: KinesisStreamRecord):
15+
logger.info(record.kinesis.data_as_text)
16+
payload: dict = record.kinesis.data_as_json()
17+
...
18+
19+
20+
@logger.inject_lambda_context
21+
@tracer.capture_lambda_handler
22+
def lambda_handler(event, context: LambdaContext):
23+
batch = event["Records"]
24+
with processor(records=batch, handler=record_handler):
25+
processed_messages = processor.process() # kick off processing, return list[tuple]
26+
27+
return processor.response()

0 commit comments

Comments
 (0)