Skip to content

Commit c466c80

Browse files
feat(event_source): Add support for S3 batch operations (aws-powertools#3572)
* Add support for S3 Batch Operations event and response along with unit tests. This seamlessly support both schema 1.0 and 2.0 A few notes: - S3BatchOperationXXX or S3BatchOperationsXXX ? - s3 key is not url-encoded in real life despite what the documentation implies. Need to test with some keys that contain spaces, etc... - S3BatchOperationResult has some factory methods to simplifies building - S3BatchOperationEvent may need to as it makes initialization needlessly complicated * Add documentation with example based on the AWS S3 documentation * Use unquote_plus and add unit test for key encoded with space * Initial refactor * Changing the DX to improve usability * Documentation * Adding parser * Small refactor * Addressing Ruben's feedback - Docs and examples * Addressing Ruben's feedback - Docs and examples * Addressing Ruben's feedback - Code * Addressing Ruben's feedback - Code --------- Co-authored-by: Leandro Damascena <[email protected]>
1 parent 9afbc78 commit c466c80

13 files changed

+622
-1
lines changed

aws_lambda_powertools/utilities/data_classes/__init__.py

+8
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@
2323
)
2424
from .kinesis_stream_event import KinesisStreamEvent
2525
from .lambda_function_url_event import LambdaFunctionUrlEvent
26+
from .s3_batch_operation_event import (
27+
S3BatchOperationEvent,
28+
S3BatchOperationResponse,
29+
S3BatchOperationResponseRecord,
30+
)
2631
from .s3_event import S3Event, S3EventBridgeNotificationEvent
2732
from .secrets_manager_event import SecretsManagerEvent
2833
from .ses_event import SESEvent
@@ -52,6 +57,9 @@
5257
"LambdaFunctionUrlEvent",
5358
"S3Event",
5459
"S3EventBridgeNotificationEvent",
60+
"S3BatchOperationEvent",
61+
"S3BatchOperationResponse",
62+
"S3BatchOperationResponseRecord",
5563
"SESEvent",
5664
"SNSEvent",
5765
"SQSEvent",
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
import warnings
2+
from dataclasses import dataclass, field
3+
from typing import Any, Dict, Iterator, List, Optional, Tuple
4+
from urllib.parse import unquote_plus
5+
6+
from aws_lambda_powertools.shared.types import Literal
7+
from aws_lambda_powertools.utilities.data_classes.common import DictWrapper
8+
9+
# list of valid result code. Used both in S3BatchOperationResponse and S3BatchOperationResponseRecord
10+
VALID_RESULT_CODES: Tuple[str, str, str] = ("Succeeded", "TemporaryFailure", "PermanentFailure")
11+
RESULT_CODE_TYPE = Literal["Succeeded", "TemporaryFailure", "PermanentFailure"]
12+
13+
14+
@dataclass(repr=False, order=False)
15+
class S3BatchOperationResponseRecord:
16+
task_id: str
17+
result_code: RESULT_CODE_TYPE
18+
result_string: Optional[str] = None
19+
20+
def asdict(self) -> Dict[str, Any]:
21+
if self.result_code not in VALID_RESULT_CODES:
22+
warnings.warn(
23+
stacklevel=2,
24+
message=f"The resultCode {self.result_code} is not valid. "
25+
f"Choose from {', '.join(map(repr, VALID_RESULT_CODES))}.",
26+
)
27+
28+
return {
29+
"taskId": self.task_id,
30+
"resultCode": self.result_code,
31+
"resultString": self.result_string,
32+
}
33+
34+
35+
@dataclass(repr=False, order=False)
36+
class S3BatchOperationResponse:
37+
"""S3 Batch Operations response object
38+
39+
Documentation:
40+
--------------
41+
- https://docs.aws.amazon.com/lambda/latest/dg/services-s3-batch.html
42+
- https://docs.aws.amazon.com/AmazonS3/latest/userguide/batch-ops-invoke-lambda.html#batch-ops-invoke-lambda-custom-functions
43+
- https://docs.aws.amazon.com/AmazonS3/latest/API/API_control_LambdaInvokeOperation.html#AmazonS3-Type-control_LambdaInvokeOperation-InvocationSchemaVersion
44+
45+
Parameters
46+
----------
47+
invocation_schema_version : str
48+
Specifies the schema version for the payload that Batch Operations sends when invoking
49+
an AWS Lambda function., either '1.0' or '2.0'. This must be copied from the event.
50+
51+
invocation_id : str
52+
The identifier of the invocation request. This must be copied from the event.
53+
54+
treat_missing_keys_as : Literal["Succeeded", "TemporaryFailure", "PermanentFailure"]
55+
Undocumented parameter, defaults to "Succeeded"
56+
57+
results : List[S3BatchOperationResult]
58+
Results of each S3 Batch Operations task,
59+
optional parameter at start. Can be added later using `add_result` function.
60+
61+
Examples
62+
--------
63+
64+
**S3 Batch Operations**
65+
66+
```python
67+
import boto3
68+
69+
from botocore.exceptions import ClientError
70+
71+
from aws_lambda_powertools.utilities.data_classes import (
72+
S3BatchOperationEvent,
73+
S3BatchOperationResponse,
74+
event_source
75+
)
76+
from aws_lambda_powertools.utilities.typing import LambdaContext
77+
78+
79+
@event_source(data_class=S3BatchOperationEvent)
80+
def lambda_handler(event: S3BatchOperationEvent, context: LambdaContext):
81+
response = S3BatchOperationResponse(
82+
event.invocation_schema_version,
83+
event.invocation_id,
84+
"PermanentFailure"
85+
)
86+
87+
result = None
88+
task = event.task
89+
src_key: str = task.s3_key
90+
src_bucket: str = task.s3_bucket
91+
92+
s3 = boto3.client("s3", region_name='us-east-1')
93+
94+
try:
95+
dest_bucket, dest_key = do_some_work(s3, src_bucket, src_key)
96+
result = task.build_task_batch_response("Succeeded", f"s3://{dest_bucket}/{dest_key}")
97+
except ClientError as e:
98+
error_code = e.response['Error']['Code']
99+
error_message = e.response['Error']['Message']
100+
if error_code == 'RequestTimeout':
101+
result = task.build_task_batch_response("TemporaryFailure", "Timeout - trying again")
102+
else:
103+
result = task.build_task_batch_response("PermanentFailure", f"{error_code}: {error_message}")
104+
except Exception as e:
105+
result = task.build_task_batch_response("PermanentFailure", str(e))
106+
finally:
107+
response.add_result(result)
108+
109+
return response.asdict()
110+
```
111+
"""
112+
113+
invocation_schema_version: str
114+
invocation_id: str
115+
treat_missing_keys_as: RESULT_CODE_TYPE = "Succeeded"
116+
results: List[S3BatchOperationResponseRecord] = field(default_factory=list)
117+
118+
def __post_init__(self):
119+
if self.treat_missing_keys_as not in VALID_RESULT_CODES:
120+
warnings.warn(
121+
stacklevel=2,
122+
message=f"The value {self.treat_missing_keys_as} is not valid for treat_missing_keys_as, "
123+
f"Choose from {', '.join(map(repr, VALID_RESULT_CODES))}.",
124+
)
125+
126+
def add_result(self, result: S3BatchOperationResponseRecord):
127+
self.results.append(result)
128+
129+
def asdict(self) -> Dict:
130+
result_count = len(self.results)
131+
132+
if result_count != 1:
133+
raise ValueError(f"Response must have exactly one result, but got {result_count}")
134+
135+
return {
136+
"invocationSchemaVersion": self.invocation_schema_version,
137+
"treatMissingKeysAs": self.treat_missing_keys_as,
138+
"invocationId": self.invocation_id,
139+
"results": [result.asdict() for result in self.results],
140+
}
141+
142+
143+
class S3BatchOperationJob(DictWrapper):
144+
@property
145+
def get_id(self) -> str:
146+
# Note: this name conflicts with existing python builtins
147+
return self["id"]
148+
149+
@property
150+
def user_arguments(self) -> Optional[Dict[str, str]]:
151+
"""Get user arguments provided for this job (only for invocation schema 2.0)"""
152+
return self.get("userArguments")
153+
154+
155+
class S3BatchOperationTask(DictWrapper):
156+
@property
157+
def task_id(self) -> str:
158+
"""Get the task id"""
159+
return self["taskId"]
160+
161+
@property
162+
def s3_key(self) -> str:
163+
"""Get the object key using unquote_plus"""
164+
return unquote_plus(self["s3Key"])
165+
166+
@property
167+
def s3_version_id(self) -> Optional[str]:
168+
"""Object version if bucket is versioning-enabled, otherwise null"""
169+
return self.get("s3VersionId")
170+
171+
@property
172+
def s3_bucket_arn(self) -> Optional[str]:
173+
"""Get the s3 bucket arn (present only for invocationSchemaVersion '1.0')"""
174+
return self.get("s3BucketArn")
175+
176+
@property
177+
def s3_bucket(self) -> str:
178+
"""
179+
Get the s3 bucket, either from 's3Bucket' property (invocationSchemaVersion '2.0')
180+
or from 's3BucketArn' (invocationSchemaVersion '1.0')
181+
"""
182+
if self.s3_bucket_arn:
183+
return self.s3_bucket_arn.split(":::")[-1]
184+
return self["s3Bucket"]
185+
186+
def build_task_batch_response(
187+
self,
188+
result_code: Literal["Succeeded", "TemporaryFailure", "PermanentFailure"] = "Succeeded",
189+
result_string: str = "",
190+
) -> S3BatchOperationResponseRecord:
191+
"""Create a S3BatchOperationResponseRecord directly using the task_id and given values
192+
193+
Parameters
194+
----------
195+
result_code : Literal["Succeeded", "TemporaryFailure", "PermanentFailure"] = "Succeeded"
196+
task result, supported value: "Succeeded", "TemporaryFailure", "PermanentFailure"
197+
result_string : str
198+
string to identify in the report
199+
"""
200+
return S3BatchOperationResponseRecord(
201+
task_id=self.task_id,
202+
result_code=result_code,
203+
result_string=result_string,
204+
)
205+
206+
207+
class S3BatchOperationEvent(DictWrapper):
208+
"""Amazon S3BatchOperation Event
209+
210+
Documentation:
211+
--------------
212+
- https://docs.aws.amazon.com/AmazonS3/latest/userguide/batch-ops-invoke-lambda.html
213+
"""
214+
215+
@property
216+
def invocation_id(self) -> str:
217+
"""Get the identifier of the invocation request"""
218+
return self["invocationId"]
219+
220+
@property
221+
def invocation_schema_version(self) -> Literal["1.0", "2.0"]:
222+
"""
223+
Get the schema version for the payload that Batch Operations sends when invoking an
224+
AWS Lambda function. Either '1.0' or '2.0'.
225+
"""
226+
return self["invocationSchemaVersion"]
227+
228+
@property
229+
def tasks(self) -> Iterator[S3BatchOperationTask]:
230+
"""Get s3 batch operation tasks"""
231+
for task in self["tasks"]:
232+
yield S3BatchOperationTask(task)
233+
234+
@property
235+
def task(self) -> S3BatchOperationTask:
236+
"""Get the first s3 batch operation task"""
237+
return next(self.tasks)
238+
239+
@property
240+
def job(self) -> S3BatchOperationJob:
241+
"""Get the s3 batch operation job"""
242+
return S3BatchOperationJob(self["job"])

aws_lambda_powertools/utilities/parser/models/__init__.py

+4
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
S3Model,
6969
S3RecordModel,
7070
)
71+
from .s3_batch_operation import S3BatchOperationJobModel, S3BatchOperationModel, S3BatchOperationTaskModel
7172
from .s3_event_notification import (
7273
S3SqsEventNotificationModel,
7374
S3SqsEventNotificationRecordModel,
@@ -177,4 +178,7 @@
177178
"BedrockAgentEventModel",
178179
"BedrockAgentRequestBodyModel",
179180
"BedrockAgentRequestMediaModel",
181+
"S3BatchOperationJobModel",
182+
"S3BatchOperationModel",
183+
"S3BatchOperationTaskModel",
180184
]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
from typing import Any, Dict, List, Optional
2+
3+
from pydantic import BaseModel, validator
4+
5+
from aws_lambda_powertools.utilities.parser.types import Literal
6+
7+
8+
class S3BatchOperationTaskModel(BaseModel):
9+
taskId: str
10+
s3Key: str
11+
s3VersionId: Optional[str] = None
12+
s3BucketArn: Optional[str] = None
13+
s3Bucket: Optional[str] = None
14+
15+
@validator("s3Bucket", pre=True, always=True)
16+
def validate_bucket(cls, current_value, values):
17+
# Get the s3 bucket, either from 's3Bucket' property (invocationSchemaVersion '2.0')
18+
# or from 's3BucketArn' (invocationSchemaVersion '1.0')
19+
if values.get("s3BucketArn") and not current_value:
20+
# Replace s3Bucket value with the value from s3BucketArn
21+
return values["s3BucketArn"].split(":::")[-1]
22+
return current_value
23+
24+
25+
class S3BatchOperationJobModel(BaseModel):
26+
id: str
27+
userArguments: Optional[Dict[str, Any]] = None
28+
29+
30+
class S3BatchOperationModel(BaseModel):
31+
invocationId: str
32+
invocationSchemaVersion: Literal["1.0", "2.0"]
33+
job: S3BatchOperationJobModel
34+
tasks: List[S3BatchOperationTaskModel]

docs/utilities/data_classes.md

+12-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ Log Data Event for Troubleshooting
7575
## Supported event sources
7676

7777
| Event Source | Data_class |
78-
| ------------------------------------------------------------------------- | -------------------------------------------------- |
78+
|---------------------------------------------------------------------------|----------------------------------------------------|
7979
| [Active MQ](#active-mq) | `ActiveMQEvent` |
8080
| [API Gateway Authorizer](#api-gateway-authorizer) | `APIGatewayAuthorizerRequestEvent` |
8181
| [API Gateway Authorizer V2](#api-gateway-authorizer-v2) | `APIGatewayAuthorizerEventV2` |
@@ -99,6 +99,7 @@ Log Data Event for Troubleshooting
9999
| [Lambda Function URL](#lambda-function-url) | `LambdaFunctionUrlEvent` |
100100
| [Rabbit MQ](#rabbit-mq) | `RabbitMQEvent` |
101101
| [S3](#s3) | `S3Event` |
102+
| [S3 Batch Operations](#s3-batch-operations) | `S3BatchOperationEvent` |
102103
| [S3 Object Lambda](#s3-object-lambda) | `S3ObjectLambdaEvent` |
103104
| [S3 EventBridge Notification](#s3-eventbridge-notification) | `S3EventBridgeNotificationEvent` |
104105
| [SES](#ses) | `SESEvent` |
@@ -1076,6 +1077,16 @@ for more details.
10761077
do_something_with(f"{bucket_name}/{object_key}")
10771078
```
10781079

1080+
### S3 Batch Operations
1081+
1082+
This example is based on the AWS S3 Batch Operations documentation [Example Lambda function for S3 Batch Operations](https://docs.aws.amazon.com/AmazonS3/latest/userguide/batch-ops-invoke-lambda.html){target="_blank"}.
1083+
1084+
=== "app.py"
1085+
1086+
```python hl_lines="4 8 10 20 25 27 29 33"
1087+
--8<-- "examples/event_sources/src/s3_batch_operation.py"
1088+
```
1089+
10791090
### S3 Object Lambda
10801091

10811092
This example is based on the AWS Blog post [Introducing Amazon S3 Object Lambda – Use Your Code to Process Data as It Is Being Retrieved from S3](https://aws.amazon.com/blogs/aws/introducing-amazon-s3-object-lambda-use-your-code-to-process-data-as-it-is-being-retrieved-from-s3/){target="_blank"}.

docs/utilities/parser.md

+1
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ Parser comes with the following built-in models:
191191
| **KinesisFirehoseModel** | Lambda Event Source payload for Amazon Kinesis Firehose |
192192
| **KinesisFirehoseSqsModel** | Lambda Event Source payload for SQS messages wrapped in Kinesis Firehose records |
193193
| **LambdaFunctionUrlModel** | Lambda Event Source payload for Lambda Function URL payload |
194+
| **S3BatchOperationModel** | Lambda Event Source payload for Amazon S3 Batch Operation |
194195
| **S3EventNotificationEventBridgeModel** | Lambda Event Source payload for Amazon S3 Event Notification to EventBridge. |
195196
| **S3Model** | Lambda Event Source payload for Amazon S3 |
196197
| **S3ObjectLambdaEvent** | Lambda Event Source payload for Amazon S3 Object Lambda |
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import boto3
2+
from botocore.exceptions import ClientError
3+
4+
from aws_lambda_powertools.utilities.data_classes import S3BatchOperationEvent, S3BatchOperationResponse, event_source
5+
from aws_lambda_powertools.utilities.typing import LambdaContext
6+
7+
8+
@event_source(data_class=S3BatchOperationEvent)
9+
def lambda_handler(event: S3BatchOperationEvent, context: LambdaContext):
10+
response = S3BatchOperationResponse(event.invocation_schema_version, event.invocation_id, "PermanentFailure")
11+
12+
task = event.task
13+
src_key: str = task.s3_key
14+
src_bucket: str = task.s3_bucket
15+
16+
s3 = boto3.client("s3", region_name="us-east-1")
17+
18+
try:
19+
dest_bucket, dest_key = do_some_work(s3, src_bucket, src_key)
20+
result = task.build_task_batch_response("Succeeded", f"s3://{dest_bucket}/{dest_key}")
21+
except ClientError as e:
22+
error_code = e.response["Error"]["Code"]
23+
error_message = e.response["Error"]["Message"]
24+
if error_code == "RequestTimeout":
25+
result = task.build_task_batch_response("TemporaryFailure", "Retry request to Amazon S3 due to timeout.")
26+
else:
27+
result = task.build_task_batch_response("PermanentFailure", f"{error_code}: {error_message}")
28+
except Exception as e:
29+
result = task.build_task_batch_response("PermanentFailure", str(e))
30+
finally:
31+
response.add_result(result)
32+
33+
return response.asdict()
34+
35+
36+
def do_some_work(s3_client, src_bucket: str, src_key: str):
37+
...

0 commit comments

Comments
 (0)