Skip to content

Commit 89df51e

Browse files
docs(batch_processing): snippets split, improved, and lint (#2231)
Co-authored-by: Leandro Damascena <[email protected]>
1 parent f4821ed commit 89df51e

40 files changed

+1212
-945
lines changed

Diff for: docs/utilities/batch.md

+79-927
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
AWSTemplateFormatVersion: '2010-09-09'
2+
Transform: AWS::Serverless-2016-10-31
3+
Description: partial batch response sample
4+
5+
Globals:
6+
Function:
7+
Timeout: 5
8+
MemorySize: 256
9+
Runtime: python3.10
10+
Tracing: Active
11+
Environment:
12+
Variables:
13+
LOG_LEVEL: INFO
14+
POWERTOOLS_SERVICE_NAME: hello
15+
16+
Resources:
17+
HelloWorldFunction:
18+
Type: AWS::Serverless::Function
19+
Properties:
20+
Handler: app.lambda_handler
21+
CodeUri: hello_world
22+
Policies:
23+
# Lambda Destinations require additional permissions
24+
# to send failure records from Kinesis/DynamoDB
25+
- Version: "2012-10-17"
26+
Statement:
27+
Effect: "Allow"
28+
Action:
29+
- sqs:GetQueueAttributes
30+
- sqs:GetQueueUrl
31+
- sqs:SendMessage
32+
Resource: !GetAtt SampleDLQ.Arn
33+
Events:
34+
DynamoDBStream:
35+
Type: DynamoDB
36+
Properties:
37+
Stream: !GetAtt SampleTable.StreamArn
38+
StartingPosition: LATEST
39+
MaximumRetryAttempts: 2
40+
DestinationConfig:
41+
OnFailure:
42+
Destination: !GetAtt SampleDLQ.Arn
43+
FunctionResponseTypes:
44+
- ReportBatchItemFailures
45+
46+
SampleDLQ:
47+
Type: AWS::SQS::Queue
48+
49+
SampleTable:
50+
Type: AWS::DynamoDB::Table
51+
Properties:
52+
BillingMode: PAY_PER_REQUEST
53+
AttributeDefinitions:
54+
- AttributeName: pk
55+
AttributeType: S
56+
- AttributeName: sk
57+
AttributeType: S
58+
KeySchema:
59+
- AttributeName: pk
60+
KeyType: HASH
61+
- AttributeName: sk
62+
KeyType: RANGE
63+
SSESpecification:
64+
SSEEnabled: true
65+
StreamSpecification:
66+
StreamViewType: NEW_AND_OLD_IMAGES
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
AWSTemplateFormatVersion: '2010-09-09'
2+
Transform: AWS::Serverless-2016-10-31
3+
Description: partial batch response sample
4+
5+
Globals:
6+
Function:
7+
Timeout: 5
8+
MemorySize: 256
9+
Runtime: python3.10
10+
Tracing: Active
11+
Environment:
12+
Variables:
13+
LOG_LEVEL: INFO
14+
POWERTOOLS_SERVICE_NAME: hello
15+
16+
Resources:
17+
HelloWorldFunction:
18+
Type: AWS::Serverless::Function
19+
Properties:
20+
Handler: app.lambda_handler
21+
CodeUri: hello_world
22+
Policies:
23+
# Lambda Destinations require additional permissions
24+
# to send failure records to DLQ from Kinesis/DynamoDB
25+
- Version: "2012-10-17"
26+
Statement:
27+
Effect: "Allow"
28+
Action:
29+
- sqs:GetQueueAttributes
30+
- sqs:GetQueueUrl
31+
- sqs:SendMessage
32+
Resource: !GetAtt SampleDLQ.Arn
33+
Events:
34+
KinesisStream:
35+
Type: Kinesis
36+
Properties:
37+
Stream: !GetAtt SampleStream.Arn
38+
BatchSize: 100
39+
StartingPosition: LATEST
40+
MaximumRetryAttempts: 2
41+
DestinationConfig:
42+
OnFailure:
43+
Destination: !GetAtt SampleDLQ.Arn
44+
FunctionResponseTypes:
45+
- ReportBatchItemFailures
46+
47+
SampleDLQ:
48+
Type: AWS::SQS::Queue
49+
50+
SampleStream:
51+
Type: AWS::Kinesis::Stream
52+
Properties:
53+
ShardCount: 1
+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
AWSTemplateFormatVersion: '2010-09-09'
2+
Transform: AWS::Serverless-2016-10-31
3+
Description: partial batch response sample
4+
5+
Globals:
6+
Function:
7+
Timeout: 5
8+
MemorySize: 256
9+
Runtime: python3.10
10+
Tracing: Active
11+
Environment:
12+
Variables:
13+
LOG_LEVEL: INFO
14+
POWERTOOLS_SERVICE_NAME: hello
15+
16+
Resources:
17+
HelloWorldFunction:
18+
Type: AWS::Serverless::Function
19+
Properties:
20+
Handler: app.lambda_handler
21+
CodeUri: hello_world
22+
Policies:
23+
- SQSPollerPolicy:
24+
QueueName: !GetAtt SampleQueue.QueueName
25+
Events:
26+
Batch:
27+
Type: SQS
28+
Properties:
29+
Queue: !GetAtt SampleQueue.Arn
30+
FunctionResponseTypes:
31+
- ReportBatchItemFailures
32+
33+
SampleDLQ:
34+
Type: AWS::SQS::Queue
35+
36+
SampleQueue:
37+
Type: AWS::SQS::Queue
38+
Properties:
39+
VisibilityTimeout: 30 # Fn timeout * 6
40+
RedrivePolicy:
41+
maxReceiveCount: 2
42+
deadLetterTargetArn: !GetAtt SampleDLQ.Arn

Diff for: examples/batch_processing/src/advanced_accessing_lambda_context.py

+3-6
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import json
21
from typing import Optional
32

43
from aws_lambda_powertools import Logger, Tracer
@@ -17,11 +16,9 @@
1716

1817
@tracer.capture_method
1918
def record_handler(record: SQSRecord, lambda_context: Optional[LambdaContext] = None):
20-
payload: str = record.body
21-
if payload:
22-
item: dict = json.loads(payload)
23-
logger.info(item)
24-
...
19+
if lambda_context is not None:
20+
remaining_time = lambda_context.get_remaining_time_in_millis()
21+
logger.info(remaining_time)
2522

2623

2724
@logger.inject_lambda_context
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
from typing import Optional
2+
3+
from aws_lambda_powertools import Logger, Tracer
4+
from aws_lambda_powertools.utilities.batch import (
5+
BatchProcessor,
6+
EventType,
7+
batch_processor,
8+
)
9+
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
10+
from aws_lambda_powertools.utilities.typing import LambdaContext
11+
12+
processor = BatchProcessor(event_type=EventType.SQS)
13+
tracer = Tracer()
14+
logger = Logger()
15+
16+
17+
@tracer.capture_method
18+
def record_handler(record: SQSRecord, lambda_context: Optional[LambdaContext] = None):
19+
if lambda_context is not None:
20+
remaining_time = lambda_context.get_remaining_time_in_millis()
21+
logger.info(remaining_time)
22+
23+
24+
@logger.inject_lambda_context
25+
@tracer.capture_lambda_handler
26+
@batch_processor(record_handler=record_handler, processor=processor)
27+
def lambda_handler(event, context: LambdaContext):
28+
return processor.response()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
from typing import Optional
2+
3+
from aws_lambda_powertools import Logger, Tracer
4+
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType
5+
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
6+
from aws_lambda_powertools.utilities.typing import LambdaContext
7+
8+
processor = BatchProcessor(event_type=EventType.SQS)
9+
tracer = Tracer()
10+
logger = Logger()
11+
12+
13+
@tracer.capture_method
14+
def record_handler(record: SQSRecord, lambda_context: Optional[LambdaContext] = None):
15+
if lambda_context is not None:
16+
remaining_time = lambda_context.get_remaining_time_in_millis()
17+
logger.info(remaining_time)
18+
19+
20+
@logger.inject_lambda_context
21+
@tracer.capture_lambda_handler
22+
def lambda_handler(event, context: LambdaContext):
23+
batch = event["Records"]
24+
with processor(records=batch, handler=record_handler, lambda_context=context):
25+
result = processor.process()
26+
27+
return result
+39
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
from __future__ import annotations
2+
3+
import json
4+
from typing import List, Tuple
5+
6+
from typing_extensions import Literal
7+
8+
from aws_lambda_powertools import Logger, Tracer
9+
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType
10+
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
11+
from aws_lambda_powertools.utilities.typing import LambdaContext
12+
13+
processor = BatchProcessor(event_type=EventType.SQS)
14+
tracer = Tracer()
15+
logger = Logger()
16+
17+
18+
@tracer.capture_method
19+
def record_handler(record: SQSRecord):
20+
payload: str = record.body
21+
if payload:
22+
item: dict = json.loads(payload)
23+
logger.info(item)
24+
25+
26+
@logger.inject_lambda_context
27+
@tracer.capture_lambda_handler
28+
def lambda_handler(event, context: LambdaContext):
29+
batch = event["Records"]
30+
with processor(records=batch, handler=record_handler):
31+
processed_messages: List[Tuple] = processor.process()
32+
33+
for message in processed_messages:
34+
status: Literal["success"] | Literal["fail"] = message[0]
35+
record: SQSRecord = message[2]
36+
37+
logger.info(status, record=record)
38+
39+
return processor.response()
+76
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
import os
2+
import sys
3+
from random import randint
4+
from typing import Any
5+
6+
import boto3
7+
8+
from aws_lambda_powertools import Logger
9+
from aws_lambda_powertools.utilities.batch import (
10+
BasePartialBatchProcessor,
11+
EventType,
12+
process_partial_response,
13+
)
14+
15+
table_name = os.getenv("TABLE_NAME", "table_not_found")
16+
17+
logger = Logger()
18+
19+
20+
class MyPartialProcessor(BasePartialBatchProcessor):
21+
"""
22+
Process a record and stores successful results at a Amazon DynamoDB Table
23+
24+
Parameters
25+
----------
26+
table_name: str
27+
DynamoDB table name to write results to
28+
"""
29+
30+
def __init__(self, table_name: str):
31+
self.table_name = table_name
32+
33+
super().__init__(event_type=EventType.SQS)
34+
35+
def _prepare(self):
36+
# It's called once, *before* processing
37+
# Creates table resource and clean previous results
38+
self.ddb_table = boto3.resource("dynamodb").Table(self.table_name)
39+
self.success_messages.clear()
40+
41+
def _clean(self):
42+
# It's called once, *after* closing processing all records (closing the context manager)
43+
# Here we're sending, at once, all successful messages to a ddb table
44+
with self.ddb_table.batch_writer() as batch:
45+
for result in self.success_messages:
46+
batch.put_item(Item=result)
47+
48+
def _process_record(self, record):
49+
# It handles how your record is processed
50+
# Here we're keeping the status of each run
51+
# where self.handler is the record_handler function passed as an argument
52+
try:
53+
result = self.handler(record) # record_handler passed to decorator/context manager
54+
return self.success_handler(record, result)
55+
except Exception as exc:
56+
logger.error(exc)
57+
return self.failure_handler(record, sys.exc_info())
58+
59+
def success_handler(self, record, result: Any):
60+
entry = ("success", result, record)
61+
self.success_messages.append(record)
62+
return entry
63+
64+
async def _async_process_record(self, record: dict):
65+
raise NotImplementedError()
66+
67+
68+
processor = MyPartialProcessor(table_name)
69+
70+
71+
def record_handler(record):
72+
return randint(0, 100)
73+
74+
75+
def lambda_handler(event, context):
76+
return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context)

Diff for: examples/batch_processing/src/disable_tracing.py

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import json
2+
3+
from aws_lambda_powertools import Logger, Tracer
4+
from aws_lambda_powertools.utilities.batch import (
5+
BatchProcessor,
6+
EventType,
7+
process_partial_response,
8+
)
9+
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
10+
from aws_lambda_powertools.utilities.typing import LambdaContext
11+
12+
processor = BatchProcessor(event_type=EventType.SQS)
13+
tracer = Tracer()
14+
logger = Logger()
15+
16+
17+
@tracer.capture_method(capture_response=False)
18+
def record_handler(record: SQSRecord):
19+
payload: str = record.body
20+
if payload:
21+
item: dict = json.loads(payload)
22+
logger.info(item)
23+
24+
25+
@logger.inject_lambda_context
26+
@tracer.capture_lambda_handler
27+
def lambda_handler(event, context: LambdaContext):
28+
return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context)

0 commit comments

Comments
 (0)