From 9d8e9a48c3a1d5b4bd99ea43466556181a457265 Mon Sep 17 00:00:00 2001 From: Stephane Bailliez Date: Thu, 28 Dec 2023 23:27:11 -0500 Subject: [PATCH 01/12] Add support for S3 Batch Operations event and response along with unit tests. This seamlessly support both schema 1.0 and 2.0 A few notes: - S3BatchOperationXXX or S3BatchOperationsXXX ? - s3 key is not url-encoded in real life despite what the documentation implies. Need to test with some keys that contain spaces, etc... - S3BatchOperationResult has some factory methods to simplifies building - S3BatchOperationEvent may need to as it makes initialization needlessly complicated --- .../utilities/data_classes/__init__.py | 8 + .../data_classes/s3_batch_operation_event.py | 219 ++++++++++++++++++ .../events/s3BatchOperationEventSchemaV1.json | 15 ++ .../events/s3BatchOperationEventSchemaV2.json | 19 ++ .../test_s3_batch_operation_event.py | 50 ++++ .../test_s3_batch_operation_response.py | 120 ++++++++++ 6 files changed, 431 insertions(+) create mode 100644 aws_lambda_powertools/utilities/data_classes/s3_batch_operation_event.py create mode 100644 tests/events/s3BatchOperationEventSchemaV1.json create mode 100644 tests/events/s3BatchOperationEventSchemaV2.json create mode 100644 tests/unit/data_classes/test_s3_batch_operation_event.py create mode 100644 tests/unit/data_classes/test_s3_batch_operation_response.py diff --git a/aws_lambda_powertools/utilities/data_classes/__init__.py b/aws_lambda_powertools/utilities/data_classes/__init__.py index fd9294bc8bb..11b8debab77 100644 --- a/aws_lambda_powertools/utilities/data_classes/__init__.py +++ b/aws_lambda_powertools/utilities/data_classes/__init__.py @@ -23,6 +23,11 @@ ) from .kinesis_stream_event import KinesisStreamEvent from .lambda_function_url_event import LambdaFunctionUrlEvent +from .s3_batch_operation_event import ( + S3BatchOperationEvent, + S3BatchOperationResponse, + S3BatchOperationResult, +) from .s3_event import S3Event, S3EventBridgeNotificationEvent from .secrets_manager_event import SecretsManagerEvent from .ses_event import SESEvent @@ -52,6 +57,9 @@ "LambdaFunctionUrlEvent", "S3Event", "S3EventBridgeNotificationEvent", + "S3BatchOperationEvent", + "S3BatchOperationResponse", + "S3BatchOperationResult", "SESEvent", "SNSEvent", "SQSEvent", diff --git a/aws_lambda_powertools/utilities/data_classes/s3_batch_operation_event.py b/aws_lambda_powertools/utilities/data_classes/s3_batch_operation_event.py new file mode 100644 index 00000000000..b56508a2333 --- /dev/null +++ b/aws_lambda_powertools/utilities/data_classes/s3_batch_operation_event.py @@ -0,0 +1,219 @@ +from dataclasses import dataclass, field +from typing import Any, Dict, Iterator, List, Optional, Tuple +from urllib.parse import unquote + +from typing_extensions import Literal + +from aws_lambda_powertools.utilities.data_classes.common import DictWrapper + + +class S3BatchOperationJob(DictWrapper): + @property + def id(self) -> str: # noqa: A003 + return self["id"] + + @property + def user_arguments(self) -> Optional[Dict[str, str]]: + """Get user arguments provided for this job (only for invocation schema 2.0)""" + return self.get("userArguments") + + +class S3BatchOperationTask(DictWrapper): + @property + def task_id(self) -> str: + """Get the task id""" + return self["taskId"] + + @property + def s3_key(self) -> str: + """Get the object key unquote using strict utf-8 encoding""" + # note: is it unquote or unquote_plus ? Example uses unquote and is different from S3Event + # be strict to decode to avoid mangling the key transparently + return unquote(self["s3Key"], encoding="utf-8", errors="strict") + + @property + def s3_version_id(self) -> Optional[str]: + """Object version if bucket is versioning-enabled, otherwise null""" + return self.get("s3VersionId") + + @property + def s3_bucket_arn(self) -> Optional[str]: + """Get the s3 bucket arn (present only for invocationSchemaVersion '1.0')""" + return self.get("s3BucketArn") + + @property + def s3_bucket(self) -> str: + """ " + Get the s3 bucket, either from 's3Bucket' property (invocationSchemaVersion '2.0') + or from 's3BucketArn' (invocationSchemaVersion '1.0') + """ + if self.s3_bucket_arn: + return self.s3_bucket_arn.split(":::")[-1] + return self["s3Bucket"] + + +class S3BatchOperationEvent(DictWrapper): + """Amazon S3BatchOperation Event + + Documentation: + -------------- + - https://docs.aws.amazon.com/AmazonS3/latest/userguide/batch-ops-invoke-lambda.html + """ + + @property + def invocation_id(self) -> str: + """Get the identifier of the invocation request""" + return self["invocationId"] + + @property + def invocation_schema_version(self) -> str: + """ " + Get the schema version for the payload that Batch Operations sends when invoking an + AWS Lambda function. Either '1.0' or '2.0'. + """ + return self["invocationSchemaVersion"] + + @property + def tasks(self) -> Iterator[S3BatchOperationTask]: + for task in self["tasks"]: + yield S3BatchOperationTask(task) + + @property + def task(self) -> S3BatchOperationTask: + """Get the first s3 batch operation task""" + return next(self.tasks) + + @property + def job(self) -> S3BatchOperationJob: + """Get the s3 batch operation job""" + return S3BatchOperationJob(self["job"]) + + +# list of valid result code. Used both in S3BatchOperationResult and S3BatchOperationResponse +VALID_RESULT_CODE_TYPES: Tuple[str, str, str] = ("Succeeded", "TemporaryFailure", "PermanentFailure") + + +@dataclass(repr=False, order=False) +class S3BatchOperationResult: + task_id: str + result_code: Literal["Succeeded", "TemporaryFailure", "PermanentFailure"] + result_string: Optional[str] = None + + def __post_init__(self): + if self.result_code not in VALID_RESULT_CODE_TYPES: + raise ValueError(f"Invalid result_code: {self.result_code}") + + def asdict(self) -> Dict[str, Any]: + return { + "taskId": self.task_id, + "resultCode": self.result_code, + "resultString": self.result_string, + } + + @classmethod + def as_succeeded(cls, task: S3BatchOperationTask, result_string: Optional[str] = None) -> "S3BatchOperationResult": + """Create a `Succeeded` result for a given task""" + return S3BatchOperationResult(task.task_id, "Succeeded", result_string) + + @classmethod + def as_permanent_failure( + cls, + task: S3BatchOperationTask, + result_string: Optional[str] = None, + ) -> "S3BatchOperationResult": + """Create a `PermanentFailure` result for a given task""" + return S3BatchOperationResult(task.task_id, "PermanentFailure", result_string) + + @classmethod + def as_temporary_failure( + cls, + task: S3BatchOperationTask, + result_string: Optional[str] = None, + ) -> "S3BatchOperationResult": + """Create a `TemporaryFailure` result for a given task""" + return S3BatchOperationResult(task.task_id, "TemporaryFailure", result_string) + + +@dataclass(repr=False, order=False) +class S3BatchOperationResponse: + """S3 Batch Operations response object + + Documentation: + -------------- + - https://docs.aws.amazon.com/lambda/latest/dg/services-s3-batch.html + - https://docs.aws.amazon.com/AmazonS3/latest/userguide/batch-ops-invoke-lambda.html#batch-ops-invoke-lambda-custom-functions + - https://docs.aws.amazon.com/AmazonS3/latest/API/API_control_LambdaInvokeOperation.html#AmazonS3-Type-control_LambdaInvokeOperation-InvocationSchemaVersion + + Parameters + ---------- + invocation_schema_version : str + Specifies the schema version for the payload that Batch Operations sends when invoking + an AWS Lambda function., either '1.0' or '2.0'. This must be copied from the event. + + invocation_id : str + The identifier of the invocation request. This must be copied from the event. + + treat_missing_keys_as : Literal["Succeeded", "TemporaryFailure", "PermanentFailure"] + undocumented parameter, defaults to "PermanentFailure" + + results : List[S3BatchOperationResult] + results of each S3 Batch Operations task, + optional parameter at start. can be added later using `add_result` function. + + Examples + -------- + + **S3 Batch Operations** + + ```python + from aws_lambda_powertools.utilities.typing import LambdaContext + from aws_lambda_powertools.utilities.data_classes import ( + S3BatchOperationEvent, + S3BatchOperationResponse, + S3BatchOperationResult + ) + + def lambda_handler(event: dict, context: LambdaContext): + s3_event = S3BatchOperationEvent(event) + response = S3BatchOperationResponse(s3_event.invocation_schema_version, s3_event.invocation_id) + result = None + + task = s3_event.task + try: + do_work(task.s3_bucket, task.s3_key) + result = S3BatchOperationResult.as_succeeded(task) + except TimeoutError as e: + result = S3BatchOperationResult.as_temporary_failure(task, str(e)) + except Exception as e: + result = S3BatchOperationResult.as_permanent_failure(task, str(e)) + finally: + response.add_result(result) + + return response.asdict() + ``` + """ + + invocation_schema_version: str + invocation_id: str + treat_missing_keys_as: Literal["Succeeded", "TemporaryFailure", "PermanentFailure"] = "PermanentFailure" + results: List[S3BatchOperationResult] = field(default_factory=list) + + def __post_init__(self): + if self.treat_missing_keys_as not in VALID_RESULT_CODE_TYPES: + raise ValueError(f"Invalid treat_missing_keys_as: {self.treat_missing_keys_as}") + + def add_result(self, result: S3BatchOperationResult): + self.results.append(result) + + def asdict(self) -> Dict: + if not self.results: + raise ValueError("Response must have one result") + if len(self.results) > 1: + raise ValueError("Response cannot have more than one result") + + return { + "invocationSchemaVersion": self.invocation_schema_version, + "treatMissingKeysAs": self.treat_missing_keys_as, + "invocationId": self.invocation_id, + "results": [result.asdict() for result in self.results], + } diff --git a/tests/events/s3BatchOperationEventSchemaV1.json b/tests/events/s3BatchOperationEventSchemaV1.json new file mode 100644 index 00000000000..8a7bcabf590 --- /dev/null +++ b/tests/events/s3BatchOperationEventSchemaV1.json @@ -0,0 +1,15 @@ +{ + "invocationSchemaVersion": "1.0", + "invocationId": "YXNkbGZqYWRmaiBhc2RmdW9hZHNmZGpmaGFzbGtkaGZza2RmaAo", + "job": { + "id": "f3cc4f60-61f6-4a2b-8a21-d07600c373ce" + }, + "tasks": [ + { + "taskId": "dGFza2lkZ29lc2hlcmUK", + "s3Key": "prefix/dataset/dataset.20231222.json.gz", + "s3VersionId": "1", + "s3BucketArn": "arn:aws:s3:::powertools-dataset" + } + ] +} \ No newline at end of file diff --git a/tests/events/s3BatchOperationEventSchemaV2.json b/tests/events/s3BatchOperationEventSchemaV2.json new file mode 100644 index 00000000000..720dd1f0cf0 --- /dev/null +++ b/tests/events/s3BatchOperationEventSchemaV2.json @@ -0,0 +1,19 @@ +{ + "invocationSchemaVersion": "2.0", + "invocationId": "YXNkbGZqYWRmaiBhc2RmdW9hZHNmZGpmaGFzbGtkaGZza2RmaAo", + "job": { + "id": "f3cc4f60-61f6-4a2b-8a21-d07600c373ce", + "userArguments": { + "k1": "v1", + "k2": "v2" + } + }, + "tasks": [ + { + "taskId": "dGFza2lkZ29lc2hlcmUK", + "s3Key": "prefix/dataset/dataset.20231222.json.gz", + "s3VersionId": null, + "s3Bucket": "powertools-dataset" + } + ] +} \ No newline at end of file diff --git a/tests/unit/data_classes/test_s3_batch_operation_event.py b/tests/unit/data_classes/test_s3_batch_operation_event.py new file mode 100644 index 00000000000..d78c2c2dac3 --- /dev/null +++ b/tests/unit/data_classes/test_s3_batch_operation_event.py @@ -0,0 +1,50 @@ +# zzz from urllib.parse import quote_plus + +from aws_lambda_powertools.utilities.data_classes import S3BatchOperationEvent +from tests.functional.utils import load_event + + +def test_s3_batch_operation_schema_v1(): + raw_event = load_event("s3BatchOperationEventSchemaV1.json") + parsed_event = S3BatchOperationEvent(raw_event) + + tasks = list(parsed_event.tasks) + assert len(tasks) == 1 + task = tasks[0] + task_raw = raw_event["tasks"][0] + + assert task.task_id == task_raw["taskId"] + assert task.s3_version_id == task_raw["s3VersionId"] + assert task.s3_bucket_arn == task_raw["s3BucketArn"] + assert task.s3_bucket == task_raw["s3BucketArn"].split(":::")[-1] + assert task.s3_key == task_raw["s3Key"] + + job = parsed_event.job + assert job.id == raw_event["job"]["id"] + assert job.user_arguments is None + + assert parsed_event.invocation_schema_version == raw_event["invocationSchemaVersion"] + assert parsed_event.invocation_id == raw_event["invocationId"] + + +def test_s3_batch_operation_schema_v2(): + raw_event = load_event("s3BatchOperationEventSchemaV2.json") + parsed_event = S3BatchOperationEvent(raw_event) + + tasks = list(parsed_event.tasks) + assert len(tasks) == 1 + task = tasks[0] + task_raw = raw_event["tasks"][0] + + assert task.task_id == task_raw["taskId"] + assert task.s3_version_id == task_raw["s3VersionId"] + assert task.s3_bucket_arn is None + assert task.s3_bucket == task_raw["s3Bucket"] + assert task.s3_key == task_raw["s3Key"] + + job = parsed_event.job + assert job.id == raw_event["job"]["id"] + assert job.user_arguments == raw_event["job"]["userArguments"] + + assert parsed_event.invocation_schema_version == raw_event["invocationSchemaVersion"] + assert parsed_event.invocation_id == raw_event["invocationId"] diff --git a/tests/unit/data_classes/test_s3_batch_operation_response.py b/tests/unit/data_classes/test_s3_batch_operation_response.py new file mode 100644 index 00000000000..a0feccbcaee --- /dev/null +++ b/tests/unit/data_classes/test_s3_batch_operation_response.py @@ -0,0 +1,120 @@ +import pytest + +from aws_lambda_powertools.utilities.data_classes import ( + S3BatchOperationEvent, + S3BatchOperationResponse, + S3BatchOperationResult, +) +from tests.functional.utils import load_event + + +def test_result_as_succeeded(): + raw_event = load_event("s3BatchOperationEventSchemaV1.json") + parsed_event = S3BatchOperationEvent(raw_event) + + result_string = "successful op" + result = S3BatchOperationResult.as_succeeded( + parsed_event.task, + result_string, + ) + + assert_result(result, parsed_event.task.task_id, "Succeeded", result_string) + + +def test_result_as_temporary_failure(): + raw_event = load_event("s3BatchOperationEventSchemaV1.json") + parsed_event = S3BatchOperationEvent(raw_event) + + result_string = "failure op" + result = S3BatchOperationResult.as_temporary_failure( + parsed_event.task, + result_string, + ) + + assert_result(result, parsed_event.task.task_id, "TemporaryFailure", result_string) + + +def test_result_as_permanent_failure(): + raw_event = load_event("s3BatchOperationEventSchemaV1.json") + parsed_event = S3BatchOperationEvent(raw_event) + + result_string = "failure op" + result = S3BatchOperationResult.as_permanent_failure( + parsed_event.task, + result_string, + ) + + assert_result(result, parsed_event.task.task_id, "PermanentFailure", result_string) + + +def assert_result(result: S3BatchOperationResult, task_id: str, expected_result_code: str, expected_result_string: str): + assert result.result_code == expected_result_code + assert result.result_string == expected_result_string + + meta = result.asdict() + + assert meta == { + "taskId": task_id, + "resultCode": expected_result_code, + "resultString": expected_result_string, + } + + +def test_response(): + raw_event = load_event("s3BatchOperationEventSchemaV1.json") + parsed_event = S3BatchOperationEvent(raw_event) + + response = S3BatchOperationResponse(parsed_event.invocation_schema_version, parsed_event.invocation_id) + result_string = "successful op" + result = S3BatchOperationResult.as_succeeded( + parsed_event.task, + result_string, + ) + response.add_result(result) + + assert len(response.results) == 1 + assert response.treat_missing_keys_as == "PermanentFailure" + assert response.invocation_schema_version == parsed_event.invocation_schema_version + assert response.invocation_id == parsed_event.invocation_id + + assert response.asdict() == { + "invocationSchemaVersion": parsed_event.invocation_schema_version, + "treatMissingKeysAs": "PermanentFailure", + "invocationId": parsed_event.invocation_id, + "results": [ + { + "taskId": result.task_id, + "resultCode": result.result_code, + "resultString": result.result_string, + }, + ], + } + + +def test_response_multiple_results(): + raw_event = load_event("s3BatchOperationEventSchemaV1.json") + parsed_event = S3BatchOperationEvent(raw_event) + + response = S3BatchOperationResponse(parsed_event.invocation_schema_version, parsed_event.invocation_id) + result_string = "successful op" + result = S3BatchOperationResult.as_succeeded( + parsed_event.task, + result_string, + ) + response.add_result(result) + + # add another result + response.add_result(result) + + with pytest.raises(ValueError, match=r"Response cannot have more than one result"): + response.asdict() + + +def test_response_no_results(): + raw_event = load_event("s3BatchOperationEventSchemaV1.json") + parsed_event = S3BatchOperationEvent(raw_event) + + response = S3BatchOperationResponse(parsed_event.invocation_schema_version, parsed_event.invocation_id) + + with pytest.raises(ValueError, match=r"Response must have one result"): + response.asdict() From 319a048de2722fb8a17e3cf67e52a062e0f0fdf1 Mon Sep 17 00:00:00 2001 From: Stephane Bailliez Date: Thu, 28 Dec 2023 23:29:33 -0500 Subject: [PATCH 02/12] Add documentation with example based on the AWS S3 documentation --- docs/utilities/data_classes.md | 89 +++++++++++++++++++++++++++++++++- 1 file changed, 88 insertions(+), 1 deletion(-) diff --git a/docs/utilities/data_classes.md b/docs/utilities/data_classes.md index 37d3725967c..14b92f94122 100644 --- a/docs/utilities/data_classes.md +++ b/docs/utilities/data_classes.md @@ -75,7 +75,7 @@ Log Data Event for Troubleshooting ## Supported event sources | Event Source | Data_class | -| ------------------------------------------------------------------------- | -------------------------------------------------- | +|---------------------------------------------------------------------------|----------------------------------------------------| | [Active MQ](#active-mq) | `ActiveMQEvent` | | [API Gateway Authorizer](#api-gateway-authorizer) | `APIGatewayAuthorizerRequestEvent` | | [API Gateway Authorizer V2](#api-gateway-authorizer-v2) | `APIGatewayAuthorizerEventV2` | @@ -99,6 +99,7 @@ Log Data Event for Troubleshooting | [Lambda Function URL](#lambda-function-url) | `LambdaFunctionUrlEvent` | | [Rabbit MQ](#rabbit-mq) | `RabbitMQEvent` | | [S3](#s3) | `S3Event` | +| [S3 Batch Operations](#s3-batch-operations) | `S3BatchOperationEvent` | | [S3 Object Lambda](#s3-object-lambda) | `S3ObjectLambdaEvent` | | [S3 EventBridge Notification](#s3-eventbridge-notification) | `S3EventBridgeNotificationEvent` | | [SES](#ses) | `SESEvent` | @@ -1076,6 +1077,92 @@ for more details. do_something_with(f"{bucket_name}/{object_key}") ``` +### S3 Batch Operations + +This example is based on the AWS S3 Batch Operations documentation [Example Lambda function for S3 Batch Operations](https://docs.aws.amazon.com/AmazonS3/latest/userguide/batch-ops-invoke-lambda.html){target="_blank"}. + +=== "app.py" + + ```python hl_lines="4-8 17-19 24-26 38-40 55-57 61-63 72 75 77" + import boto3 + from botocore.exceptions import ClientError + + from aws_lambda_powertools.utilities.data_classes import ( + S3BatchOperationEvent, + S3BatchOperationResult, + S3BatchOperationResponse, + ) + + import logging + + logger = logging.getLogger(__name__) + logger.setLevel("INFO") + + s3 = boto3.client("s3") + + @event_route(data_class=S3BatchOperationEvent) + def lambda_handler(event: S3BatchOperationEvent, context): + response = S3BatchOperationResponse(event.invocation_schema_version, event.invocation_id) + result = None + task = event.task + + try: + obj_key = task.s3_key + obj_version_id = task.s3_version_id + bucket_name = task.s3_bucket + + logger.info( + "Got task: remove delete marker %s from object %s.", obj_version_id, obj_key + ) + + try: + # If this call does not raise an error, the object version is not a delete + # marker and should not be deleted. + head_response = s3.head_object( + Bucket=bucket_name, Key=obj_key, VersionId=obj_version_id + ) + result = S3BatchOperationResult.as_permanent_failure(task, + f"Object {obj_key}, ID {obj_version_id} is not a delete marker." + ) + + logger.debug(head_response) + except ClientError as error: + delete_marker = error.response["ResponseMetadata"]["HTTPHeaders"].get( + "x-amz-delete-marker", "false" + ) + if delete_marker == "true": + logger.info( + "Object %s, version %s is a delete marker.", obj_key, obj_version_id + ) + try: + s3.delete_object( + Bucket=bucket_name, Key=obj_key, VersionId=obj_version_id + ) + result = S3BatchOperationResult.as_succeeded(task, + f"Successfully removed delete marker {obj_version_id} from object {obj_key}." + ) + except ClientError as error: + # Mark request timeout as a temporary failure so it will be retried. + if error.response["Error"]["Code"] == "RequestTimeout": + result = S3BatchOperationResult.as_temporary_failure( + task, f"Attempt to remove delete marker from object {obj_key} timed out." + ) + else: + raise + else: + raise ValueError( + f"The x-amz-delete-marker header is either not present or is not 'true'." + ) + except Exception as error: + # Mark all other exceptions as permanent failures. + result = S3BatchOperationResult.as_permanent_failure(task, str(error)) + logger.exception(error) + finally: + response.add_result(result) + + return response.asdict() + ``` + ### S3 Object Lambda This example is based on the AWS Blog post [Introducing Amazon S3 Object Lambda – Use Your Code to Process Data as It Is Being Retrieved from S3](https://aws.amazon.com/blogs/aws/introducing-amazon-s3-object-lambda-use-your-code-to-process-data-as-it-is-being-retrieved-from-s3/){target="_blank"}. From d834c96459945608d200a0120c525b455da48ce3 Mon Sep 17 00:00:00 2001 From: Stephane Bailliez Date: Fri, 29 Dec 2023 19:37:59 -0500 Subject: [PATCH 03/12] Use unquote_plus and add unit test for key encoded with space --- .../data_classes/s3_batch_operation_event.py | 11 ++++++----- .../data_classes/test_s3_batch_operation_event.py | 14 ++++++++++++-- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/aws_lambda_powertools/utilities/data_classes/s3_batch_operation_event.py b/aws_lambda_powertools/utilities/data_classes/s3_batch_operation_event.py index b56508a2333..323c6b8c165 100644 --- a/aws_lambda_powertools/utilities/data_classes/s3_batch_operation_event.py +++ b/aws_lambda_powertools/utilities/data_classes/s3_batch_operation_event.py @@ -1,6 +1,6 @@ from dataclasses import dataclass, field from typing import Any, Dict, Iterator, List, Optional, Tuple -from urllib.parse import unquote +from urllib.parse import unquote_plus from typing_extensions import Literal @@ -26,10 +26,11 @@ def task_id(self) -> str: @property def s3_key(self) -> str: - """Get the object key unquote using strict utf-8 encoding""" - # note: is it unquote or unquote_plus ? Example uses unquote and is different from S3Event - # be strict to decode to avoid mangling the key transparently - return unquote(self["s3Key"], encoding="utf-8", errors="strict") + """Get the object key unquote_plus using strict utf-8 encoding""" + # note: AWS documentation example is using unquote but this actually + # contradicts what happens in practice. The key is url encoded with %20 + # in the inventory file but in the event it is sent with +. So use unquote_plus + return unquote_plus(self["s3Key"], encoding="utf-8", errors="strict") @property def s3_version_id(self) -> Optional[str]: diff --git a/tests/unit/data_classes/test_s3_batch_operation_event.py b/tests/unit/data_classes/test_s3_batch_operation_event.py index d78c2c2dac3..ed608b7a78a 100644 --- a/tests/unit/data_classes/test_s3_batch_operation_event.py +++ b/tests/unit/data_classes/test_s3_batch_operation_event.py @@ -1,5 +1,3 @@ -# zzz from urllib.parse import quote_plus - from aws_lambda_powertools.utilities.data_classes import S3BatchOperationEvent from tests.functional.utils import load_event @@ -48,3 +46,15 @@ def test_s3_batch_operation_schema_v2(): assert parsed_event.invocation_schema_version == raw_event["invocationSchemaVersion"] assert parsed_event.invocation_id == raw_event["invocationId"] + + +def test_s3_task_has_key_with_spaces(): + raw_event = load_event("s3BatchOperationEventSchemaV1.json") + + # When the inventory file is provided, the key must be url encoded + # and the file is will contain "object%20key%20with%20spaces.csv" + # however the event is sent with s3Key as "object+key+with+spaces.csv" + raw_event["tasks"][0]["s3Key"] = "object+key+with+spaces.csv" + parsed_event = S3BatchOperationEvent(raw_event) + + assert parsed_event.task.s3_key == "object key with spaces.csv" From a401507fde10da1a5f1c2532c00f9f3faaab5089 Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Tue, 16 Jan 2024 14:12:27 +0000 Subject: [PATCH 04/12] Initial refactor --- .../data_classes/s3_batch_operation_event.py | 12 +++++------- .../data_classes/test_s3_batch_operation_event.py | 4 ++-- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/aws_lambda_powertools/utilities/data_classes/s3_batch_operation_event.py b/aws_lambda_powertools/utilities/data_classes/s3_batch_operation_event.py index 323c6b8c165..e0f27523b74 100644 --- a/aws_lambda_powertools/utilities/data_classes/s3_batch_operation_event.py +++ b/aws_lambda_powertools/utilities/data_classes/s3_batch_operation_event.py @@ -9,7 +9,8 @@ class S3BatchOperationJob(DictWrapper): @property - def id(self) -> str: # noqa: A003 + def get_id(self) -> str: + # Note: this name conflicts with existing python builtins return self["id"] @property @@ -26,11 +27,8 @@ def task_id(self) -> str: @property def s3_key(self) -> str: - """Get the object key unquote_plus using strict utf-8 encoding""" - # note: AWS documentation example is using unquote but this actually - # contradicts what happens in practice. The key is url encoded with %20 - # in the inventory file but in the event it is sent with +. So use unquote_plus - return unquote_plus(self["s3Key"], encoding="utf-8", errors="strict") + """Get the object key using unquote_plus""" + return unquote_plus(self["s3Key"]) @property def s3_version_id(self) -> Optional[str]: @@ -67,7 +65,7 @@ def invocation_id(self) -> str: return self["invocationId"] @property - def invocation_schema_version(self) -> str: + def invocation_schema_version(self) -> Literal["1.0", "2.0"]: """ " Get the schema version for the payload that Batch Operations sends when invoking an AWS Lambda function. Either '1.0' or '2.0'. diff --git a/tests/unit/data_classes/test_s3_batch_operation_event.py b/tests/unit/data_classes/test_s3_batch_operation_event.py index ed608b7a78a..ca0d4ae635c 100644 --- a/tests/unit/data_classes/test_s3_batch_operation_event.py +++ b/tests/unit/data_classes/test_s3_batch_operation_event.py @@ -18,7 +18,7 @@ def test_s3_batch_operation_schema_v1(): assert task.s3_key == task_raw["s3Key"] job = parsed_event.job - assert job.id == raw_event["job"]["id"] + assert job.get_id == raw_event["job"]["id"] assert job.user_arguments is None assert parsed_event.invocation_schema_version == raw_event["invocationSchemaVersion"] @@ -41,7 +41,7 @@ def test_s3_batch_operation_schema_v2(): assert task.s3_key == task_raw["s3Key"] job = parsed_event.job - assert job.id == raw_event["job"]["id"] + assert job.get_id == raw_event["job"]["id"] assert job.user_arguments == raw_event["job"]["userArguments"] assert parsed_event.invocation_schema_version == raw_event["invocationSchemaVersion"] From bd29d0a9cdba4f3d36c10a1afbff63680c58469a Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Tue, 16 Jan 2024 15:39:24 +0000 Subject: [PATCH 05/12] Changing the DX to improve usability --- .../utilities/data_classes/__init__.py | 4 +- .../data_classes/s3_batch_operation_event.py | 280 ++++++++++-------- .../test_s3_batch_operation_response.py | 94 ++++-- 3 files changed, 215 insertions(+), 163 deletions(-) diff --git a/aws_lambda_powertools/utilities/data_classes/__init__.py b/aws_lambda_powertools/utilities/data_classes/__init__.py index 11b8debab77..38274f0bab4 100644 --- a/aws_lambda_powertools/utilities/data_classes/__init__.py +++ b/aws_lambda_powertools/utilities/data_classes/__init__.py @@ -26,7 +26,7 @@ from .s3_batch_operation_event import ( S3BatchOperationEvent, S3BatchOperationResponse, - S3BatchOperationResult, + S3BatchOperationResponseRecord, ) from .s3_event import S3Event, S3EventBridgeNotificationEvent from .secrets_manager_event import SecretsManagerEvent @@ -59,7 +59,7 @@ "S3EventBridgeNotificationEvent", "S3BatchOperationEvent", "S3BatchOperationResponse", - "S3BatchOperationResult", + "S3BatchOperationResponseRecord", "SESEvent", "SNSEvent", "SQSEvent", diff --git a/aws_lambda_powertools/utilities/data_classes/s3_batch_operation_event.py b/aws_lambda_powertools/utilities/data_classes/s3_batch_operation_event.py index e0f27523b74..7ae6a5fad3e 100644 --- a/aws_lambda_powertools/utilities/data_classes/s3_batch_operation_event.py +++ b/aws_lambda_powertools/utilities/data_classes/s3_batch_operation_event.py @@ -1,3 +1,4 @@ +import warnings from dataclasses import dataclass, field from typing import Any, Dict, Iterator, List, Optional, Tuple from urllib.parse import unquote_plus @@ -6,6 +7,135 @@ from aws_lambda_powertools.utilities.data_classes.common import DictWrapper +# list of valid result code. Used both in S3BatchOperationResponse and S3BatchOperationResponseRecord +VALID_RESULT_CODE_TYPES: Tuple[str, str, str] = ("Succeeded", "TemporaryFailure", "PermanentFailure") + + +@dataclass(repr=False, order=False) +class S3BatchOperationResponseRecord: + task_id: str + result_code: Literal["Succeeded", "TemporaryFailure", "PermanentFailure"] + result_string: Optional[str] = None + + def asdict(self) -> Dict[str, Any]: + if self.result_code not in VALID_RESULT_CODE_TYPES: + warnings.warn( + stacklevel=2, + message=f"The resultCode {self.result_code} is not valid. " + "Choose from 'Ok', 'Dropped' or 'ProcessingFailed '", + ) + + return { + "taskId": self.task_id, + "resultCode": self.result_code, + "resultString": self.result_string, + } + + +@dataclass(repr=False, order=False) +class S3BatchOperationResponse: + """S3 Batch Operations response object + + Documentation: + -------------- + - https://docs.aws.amazon.com/lambda/latest/dg/services-s3-batch.html + - https://docs.aws.amazon.com/AmazonS3/latest/userguide/batch-ops-invoke-lambda.html#batch-ops-invoke-lambda-custom-functions + - https://docs.aws.amazon.com/AmazonS3/latest/API/API_control_LambdaInvokeOperation.html#AmazonS3-Type-control_LambdaInvokeOperation-InvocationSchemaVersion + + Parameters + ---------- + invocation_schema_version : str + Specifies the schema version for the payload that Batch Operations sends when invoking + an AWS Lambda function., either '1.0' or '2.0'. This must be copied from the event. + + invocation_id : str + The identifier of the invocation request. This must be copied from the event. + + treat_missing_keys_as : Literal["Succeeded", "TemporaryFailure", "PermanentFailure"] + undocumented parameter, defaults to "PermanentFailure" + + results : List[S3BatchOperationResult] + results of each S3 Batch Operations task, + optional parameter at start. can be added later using `add_result` function. + + Examples + -------- + + **S3 Batch Operations** + + ```python + import boto3 + + from botocore.exceptions import ClientError + + from aws_lambda_powertools.utilities.data_classes import ( + S3BatchOperationEvent, + S3BatchOperationResponse, + event_source + ) + from aws_lambda_powertools.utilities.typing import LambdaContext + + + @event_source(data_class=S3BatchOperationEvent) + def lambda_handler(event: S3BatchOperationEvent, context: LambdaContext): + response = S3BatchOperationResponse(event.invocation_schema_version, event.invocation_id, "PermanentFailure") + + result = None + task = event.task + src_key: str = task.s3_key + src_bucket: str = task.s3_bucket + + s3 = boto3.client("s3", region_name='us-east-1') + + try: + dest_bucket, dest_key = do_some_work(s3, src_bucket, src_key) + result = task.build_task_batch_response("Succeeded", f"s3://{dest_bucket}/{dest_key}") + except ClientError as e: + error_code = e.response['Error']['Code'] + error_message = e.response['Error']['Message'] + if error_code == 'RequestTimeout': + result = task.build_task_batch_response("TemporaryFailure", "Retry request to Amazon S3 due to timeout.") + else: + result = task.build_task_batch_response("PermanentFailure", f"{error_code}: {error_message}") + except Exception as e: + result = task.build_task_batch_response("PermanentFailure", str(e)) + finally: + response.add_result(result) + + return response.asdict() + + ``` + """ # noqa: E501 + + invocation_schema_version: str + invocation_id: str + treat_missing_keys_as: Literal["Succeeded", "TemporaryFailure", "PermanentFailure"] = "Succeeded" + results: List[S3BatchOperationResponseRecord] = field(default_factory=list) + + def __post_init__(self): + if self.treat_missing_keys_as not in VALID_RESULT_CODE_TYPES: + warnings.warn( + stacklevel=2, + message=f"The value {self.treat_missing_keys_as} is not valid for treat_missing_keys_as, " + "Choose from 'Succeeded', 'TemporaryFailure', 'PermanentFailure'", + ) + + def add_result(self, result: S3BatchOperationResponseRecord): + self.results.append(result) + + def asdict(self) -> Dict: + result_count = len(self.results) + + if result_count != 1: + raise ValueError(f"Response must have exactly one result, but got {result_count}") + + return { + "invocationSchemaVersion": self.invocation_schema_version, + "treatMissingKeysAs": self.treat_missing_keys_as, + "invocationId": self.invocation_id, + "results": [result.asdict() for result in self.results], + } + class S3BatchOperationJob(DictWrapper): @property @@ -50,6 +180,26 @@ def s3_bucket(self) -> str: return self.s3_bucket_arn.split(":::")[-1] return self["s3Bucket"] + def build_task_batch_response( + self, + result_code: Literal["Succeeded", "TemporaryFailure", "PermanentFailure"] = "Succeeded", + result_string: str = "", + ) -> S3BatchOperationResponseRecord: + """Create a S3BatchOperationResponseRecord directly using the record_id and given values + + Parameters + ---------- + result_code : Literal["Succeeded", "TemporaryFailure", "PermanentFailure"] = "Succeeded" + task result, supported value: "Succeeded", "TemporaryFailure", "PermanentFailure" + result_string : str + string to identify in the report + """ + return S3BatchOperationResponseRecord( + task_id=self.task_id, + result_code=result_code, + result_string=result_string, + ) + class S3BatchOperationEvent(DictWrapper): """Amazon S3BatchOperation Event @@ -86,133 +236,3 @@ def task(self) -> S3BatchOperationTask: def job(self) -> S3BatchOperationJob: """Get the s3 batch operation job""" return S3BatchOperationJob(self["job"]) - - -# list of valid result code. Used both in S3BatchOperationResult and S3BatchOperationResponse -VALID_RESULT_CODE_TYPES: Tuple[str, str, str] = ("Succeeded", "TemporaryFailure", "PermanentFailure") - - -@dataclass(repr=False, order=False) -class S3BatchOperationResult: - task_id: str - result_code: Literal["Succeeded", "TemporaryFailure", "PermanentFailure"] - result_string: Optional[str] = None - - def __post_init__(self): - if self.result_code not in VALID_RESULT_CODE_TYPES: - raise ValueError(f"Invalid result_code: {self.result_code}") - - def asdict(self) -> Dict[str, Any]: - return { - "taskId": self.task_id, - "resultCode": self.result_code, - "resultString": self.result_string, - } - - @classmethod - def as_succeeded(cls, task: S3BatchOperationTask, result_string: Optional[str] = None) -> "S3BatchOperationResult": - """Create a `Succeeded` result for a given task""" - return S3BatchOperationResult(task.task_id, "Succeeded", result_string) - - @classmethod - def as_permanent_failure( - cls, - task: S3BatchOperationTask, - result_string: Optional[str] = None, - ) -> "S3BatchOperationResult": - """Create a `PermanentFailure` result for a given task""" - return S3BatchOperationResult(task.task_id, "PermanentFailure", result_string) - - @classmethod - def as_temporary_failure( - cls, - task: S3BatchOperationTask, - result_string: Optional[str] = None, - ) -> "S3BatchOperationResult": - """Create a `TemporaryFailure` result for a given task""" - return S3BatchOperationResult(task.task_id, "TemporaryFailure", result_string) - - -@dataclass(repr=False, order=False) -class S3BatchOperationResponse: - """S3 Batch Operations response object - - Documentation: - -------------- - - https://docs.aws.amazon.com/lambda/latest/dg/services-s3-batch.html - - https://docs.aws.amazon.com/AmazonS3/latest/userguide/batch-ops-invoke-lambda.html#batch-ops-invoke-lambda-custom-functions - - https://docs.aws.amazon.com/AmazonS3/latest/API/API_control_LambdaInvokeOperation.html#AmazonS3-Type-control_LambdaInvokeOperation-InvocationSchemaVersion - - Parameters - ---------- - invocation_schema_version : str - Specifies the schema version for the payload that Batch Operations sends when invoking - an AWS Lambda function., either '1.0' or '2.0'. This must be copied from the event. - - invocation_id : str - The identifier of the invocation request. This must be copied from the event. - - treat_missing_keys_as : Literal["Succeeded", "TemporaryFailure", "PermanentFailure"] - undocumented parameter, defaults to "PermanentFailure" - - results : List[S3BatchOperationResult] - results of each S3 Batch Operations task, - optional parameter at start. can be added later using `add_result` function. - - Examples - -------- - - **S3 Batch Operations** - - ```python - from aws_lambda_powertools.utilities.typing import LambdaContext - from aws_lambda_powertools.utilities.data_classes import ( - S3BatchOperationEvent, - S3BatchOperationResponse, - S3BatchOperationResult - ) - - def lambda_handler(event: dict, context: LambdaContext): - s3_event = S3BatchOperationEvent(event) - response = S3BatchOperationResponse(s3_event.invocation_schema_version, s3_event.invocation_id) - result = None - - task = s3_event.task - try: - do_work(task.s3_bucket, task.s3_key) - result = S3BatchOperationResult.as_succeeded(task) - except TimeoutError as e: - result = S3BatchOperationResult.as_temporary_failure(task, str(e)) - except Exception as e: - result = S3BatchOperationResult.as_permanent_failure(task, str(e)) - finally: - response.add_result(result) - - return response.asdict() - ``` - """ - - invocation_schema_version: str - invocation_id: str - treat_missing_keys_as: Literal["Succeeded", "TemporaryFailure", "PermanentFailure"] = "PermanentFailure" - results: List[S3BatchOperationResult] = field(default_factory=list) - - def __post_init__(self): - if self.treat_missing_keys_as not in VALID_RESULT_CODE_TYPES: - raise ValueError(f"Invalid treat_missing_keys_as: {self.treat_missing_keys_as}") - - def add_result(self, result: S3BatchOperationResult): - self.results.append(result) - - def asdict(self) -> Dict: - if not self.results: - raise ValueError("Response must have one result") - if len(self.results) > 1: - raise ValueError("Response cannot have more than one result") - - return { - "invocationSchemaVersion": self.invocation_schema_version, - "treatMissingKeysAs": self.treat_missing_keys_as, - "invocationId": self.invocation_id, - "results": [result.asdict() for result in self.results], - } diff --git a/tests/unit/data_classes/test_s3_batch_operation_response.py b/tests/unit/data_classes/test_s3_batch_operation_response.py index a0feccbcaee..c7106e0bfb7 100644 --- a/tests/unit/data_classes/test_s3_batch_operation_response.py +++ b/tests/unit/data_classes/test_s3_batch_operation_response.py @@ -3,7 +3,7 @@ from aws_lambda_powertools.utilities.data_classes import ( S3BatchOperationEvent, S3BatchOperationResponse, - S3BatchOperationResult, + S3BatchOperationResponseRecord, ) from tests.functional.utils import load_event @@ -12,11 +12,10 @@ def test_result_as_succeeded(): raw_event = load_event("s3BatchOperationEventSchemaV1.json") parsed_event = S3BatchOperationEvent(raw_event) - result_string = "successful op" - result = S3BatchOperationResult.as_succeeded( - parsed_event.task, - result_string, - ) + task = parsed_event.task + + result_string = "Successfully processed" + result = task.build_task_batch_response("Succeeded", result_string) assert_result(result, parsed_event.task.task_id, "Succeeded", result_string) @@ -25,11 +24,10 @@ def test_result_as_temporary_failure(): raw_event = load_event("s3BatchOperationEventSchemaV1.json") parsed_event = S3BatchOperationEvent(raw_event) - result_string = "failure op" - result = S3BatchOperationResult.as_temporary_failure( - parsed_event.task, - result_string, - ) + task = parsed_event.task + + result_string = "Temporary failure" + result = task.build_task_batch_response("TemporaryFailure", result_string) assert_result(result, parsed_event.task.task_id, "TemporaryFailure", result_string) @@ -38,16 +36,20 @@ def test_result_as_permanent_failure(): raw_event = load_event("s3BatchOperationEventSchemaV1.json") parsed_event = S3BatchOperationEvent(raw_event) - result_string = "failure op" - result = S3BatchOperationResult.as_permanent_failure( - parsed_event.task, - result_string, - ) + task = parsed_event.task + + result_string = "Permanent failure" + result = task.build_task_batch_response("PermanentFailure", result_string) assert_result(result, parsed_event.task.task_id, "PermanentFailure", result_string) -def assert_result(result: S3BatchOperationResult, task_id: str, expected_result_code: str, expected_result_string: str): +def assert_result( + result: S3BatchOperationResponseRecord, + task_id: str, + expected_result_code: str, + expected_result_string: str, +): assert result.result_code == expected_result_code assert result.result_string == expected_result_string @@ -64,12 +66,17 @@ def test_response(): raw_event = load_event("s3BatchOperationEventSchemaV1.json") parsed_event = S3BatchOperationEvent(raw_event) - response = S3BatchOperationResponse(parsed_event.invocation_schema_version, parsed_event.invocation_id) - result_string = "successful op" - result = S3BatchOperationResult.as_succeeded( - parsed_event.task, - result_string, + task = parsed_event.task + + response = S3BatchOperationResponse( + parsed_event.invocation_schema_version, + parsed_event.invocation_id, + "PermanentFailure", ) + + result_string = "Successfully processed" + result = task.build_task_batch_response("Succeeded", result_string) + response.add_result(result) assert len(response.results) == 1 @@ -95,18 +102,19 @@ def test_response_multiple_results(): raw_event = load_event("s3BatchOperationEventSchemaV1.json") parsed_event = S3BatchOperationEvent(raw_event) - response = S3BatchOperationResponse(parsed_event.invocation_schema_version, parsed_event.invocation_id) - result_string = "successful op" - result = S3BatchOperationResult.as_succeeded( - parsed_event.task, - result_string, - ) + task = parsed_event.task + + response = S3BatchOperationResponse(parsed_event.invocation_schema_version, parsed_event.invocation_id, "Succeeded") + + result_string = "Successfully processed" + result = task.build_task_batch_response("Succeeded", result_string) + response.add_result(result) # add another result response.add_result(result) - with pytest.raises(ValueError, match=r"Response cannot have more than one result"): + with pytest.raises(ValueError, match=r"Response must have exactly one result, but got *"): response.asdict() @@ -114,7 +122,31 @@ def test_response_no_results(): raw_event = load_event("s3BatchOperationEventSchemaV1.json") parsed_event = S3BatchOperationEvent(raw_event) - response = S3BatchOperationResponse(parsed_event.invocation_schema_version, parsed_event.invocation_id) + response = S3BatchOperationResponse(parsed_event.invocation_schema_version, parsed_event.invocation_id, "Succeeded") + + with pytest.raises(ValueError, match=r"Response must have exactly one result, but got *"): + response.asdict() + + +def test_invalid_treating_missing_key(): + raw_event = load_event("s3BatchOperationEventSchemaV1.json") + parsed_event = S3BatchOperationEvent(raw_event) + + with pytest.warns(UserWarning, match="The value *"): + S3BatchOperationResponse(parsed_event.invocation_schema_version, parsed_event.invocation_id, "invalid_value") + + +def test_invalid_record_status(): + raw_event = load_event("s3BatchOperationEventSchemaV1.json") + parsed_event = S3BatchOperationEvent(raw_event) + + task = parsed_event.task + + response = S3BatchOperationResponse(parsed_event.invocation_schema_version, parsed_event.invocation_id, "Succeeded") + + result_string = "Successfully processed" + result = task.build_task_batch_response("invalid_value", result_string) + response.add_result(result) - with pytest.raises(ValueError, match=r"Response must have one result"): + with pytest.warns(UserWarning, match="The resultCode *"): response.asdict() From dfb461855c4f46468279ea44c5105f3f202f7c2a Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Tue, 16 Jan 2024 15:43:02 +0000 Subject: [PATCH 06/12] Documentation --- docs/utilities/data_classes.md | 82 +++++++++------------------------- 1 file changed, 22 insertions(+), 60 deletions(-) diff --git a/docs/utilities/data_classes.md b/docs/utilities/data_classes.md index 14b92f94122..63decf7ad49 100644 --- a/docs/utilities/data_classes.md +++ b/docs/utilities/data_classes.md @@ -1083,80 +1083,42 @@ This example is based on the AWS S3 Batch Operations documentation [Example Lamb === "app.py" - ```python hl_lines="4-8 17-19 24-26 38-40 55-57 61-63 72 75 77" + ```python hl_lines="5-8 13 15 26 31 33 35" import boto3 + from botocore.exceptions import ClientError from aws_lambda_powertools.utilities.data_classes import ( S3BatchOperationEvent, - S3BatchOperationResult, S3BatchOperationResponse, + event_source ) + from aws_lambda_powertools.utilities.typing import LambdaContext - import logging - - logger = logging.getLogger(__name__) - logger.setLevel("INFO") - s3 = boto3.client("s3") + @event_source(data_class=S3BatchOperationEvent) + def lambda_handler(event: S3BatchOperationEvent, context: LambdaContext): + response = S3BatchOperationResponse(event.invocation_schema_version, event.invocation_id, "PermanentFailure") - @event_route(data_class=S3BatchOperationEvent) - def lambda_handler(event: S3BatchOperationEvent, context): - response = S3BatchOperationResponse(event.invocation_schema_version, event.invocation_id) result = None task = event.task + src_key: str = task.s3_key + src_bucket: str = task.s3_bucket + + s3 = boto3.client("s3", region_name='us-east-1') try: - obj_key = task.s3_key - obj_version_id = task.s3_version_id - bucket_name = task.s3_bucket - - logger.info( - "Got task: remove delete marker %s from object %s.", obj_version_id, obj_key - ) - - try: - # If this call does not raise an error, the object version is not a delete - # marker and should not be deleted. - head_response = s3.head_object( - Bucket=bucket_name, Key=obj_key, VersionId=obj_version_id - ) - result = S3BatchOperationResult.as_permanent_failure(task, - f"Object {obj_key}, ID {obj_version_id} is not a delete marker." - ) - - logger.debug(head_response) - except ClientError as error: - delete_marker = error.response["ResponseMetadata"]["HTTPHeaders"].get( - "x-amz-delete-marker", "false" - ) - if delete_marker == "true": - logger.info( - "Object %s, version %s is a delete marker.", obj_key, obj_version_id - ) - try: - s3.delete_object( - Bucket=bucket_name, Key=obj_key, VersionId=obj_version_id - ) - result = S3BatchOperationResult.as_succeeded(task, - f"Successfully removed delete marker {obj_version_id} from object {obj_key}." - ) - except ClientError as error: - # Mark request timeout as a temporary failure so it will be retried. - if error.response["Error"]["Code"] == "RequestTimeout": - result = S3BatchOperationResult.as_temporary_failure( - task, f"Attempt to remove delete marker from object {obj_key} timed out." - ) - else: - raise - else: - raise ValueError( - f"The x-amz-delete-marker header is either not present or is not 'true'." - ) - except Exception as error: - # Mark all other exceptions as permanent failures. - result = S3BatchOperationResult.as_permanent_failure(task, str(error)) - logger.exception(error) + dest_bucket, dest_key = do_some_work(s3, src_bucket, src_key) + result = task.build_task_batch_response("Succeeded", f"s3://{dest_bucket}/{dest_key}") + except ClientError as e: + error_code = e.response['Error']['Code'] + error_message = e.response['Error']['Message'] + if error_code == 'RequestTimeout': + result = task.build_task_batch_response("TemporaryFailure", "Retry request to Amazon S3 due to timeout.") + else: + result = task.build_task_batch_response("PermanentFailure", f"{error_code}: {error_message}") + except Exception as e: + result = task.build_task_batch_response("PermanentFailure", str(e)) finally: response.add_result(result) From b7786090c634c09baac519814c8bbf2efd4f176b Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Tue, 16 Jan 2024 16:23:37 +0000 Subject: [PATCH 07/12] Adding parser --- .../utilities/parser/models/__init__.py | 4 ++ .../parser/models/s3_batch_operation.py | 34 +++++++++++++++++ docs/utilities/parser.md | 1 + tests/unit/parser/test_s3_batch_operation.py | 38 +++++++++++++++++++ ...bject_event.py => test_s3_object_event.py} | 0 5 files changed, 77 insertions(+) create mode 100644 aws_lambda_powertools/utilities/parser/models/s3_batch_operation.py create mode 100644 tests/unit/parser/test_s3_batch_operation.py rename tests/unit/parser/{test_s3 object_event.py => test_s3_object_event.py} (100%) diff --git a/aws_lambda_powertools/utilities/parser/models/__init__.py b/aws_lambda_powertools/utilities/parser/models/__init__.py index db3aa524377..c036490ec53 100644 --- a/aws_lambda_powertools/utilities/parser/models/__init__.py +++ b/aws_lambda_powertools/utilities/parser/models/__init__.py @@ -68,6 +68,7 @@ S3Model, S3RecordModel, ) +from .s3_batch_operation import S3BatchOperationJobModel, S3BatchOperationModel, S3BatchOperationTaskModel from .s3_event_notification import ( S3SqsEventNotificationModel, S3SqsEventNotificationRecordModel, @@ -177,4 +178,7 @@ "BedrockAgentEventModel", "BedrockAgentRequestBodyModel", "BedrockAgentRequestMediaModel", + "S3BatchOperationJobModel", + "S3BatchOperationModel", + "S3BatchOperationTaskModel", ] diff --git a/aws_lambda_powertools/utilities/parser/models/s3_batch_operation.py b/aws_lambda_powertools/utilities/parser/models/s3_batch_operation.py new file mode 100644 index 00000000000..1b7961999bd --- /dev/null +++ b/aws_lambda_powertools/utilities/parser/models/s3_batch_operation.py @@ -0,0 +1,34 @@ +from typing import Any, Dict, List, Optional + +from pydantic import BaseModel, validator + +from aws_lambda_powertools.utilities.parser.types import Literal + + +class S3BatchOperationTaskModel(BaseModel): + taskId: str + s3Key: str + s3VersionId: Optional[str] = None + s3BucketArn: Optional[str] = None + s3Bucket: Optional[str] = None + + @validator("s3Bucket", pre=True, always=True) + def validate_bucket(cls, current_value, values): + # Get the s3 bucket, either from 's3Bucket' property (invocationSchemaVersion '2.0') + # or from 's3BucketArn' (invocationSchemaVersion '1.0') + if values.get("s3BucketArn") and not current_value: + # Replace s3Bucket value with the value from s3BucketArn + return values["s3BucketArn"].split(":::")[-1] + return current_value + + +class S3BatchOperationJobModel(BaseModel): + id: str + userArguments: Optional[Dict[str, Any]] = None + + +class S3BatchOperationModel(BaseModel): + invocationId: str + invocationSchemaVersion: Literal["1.0", "2.0"] + job: S3BatchOperationJobModel + tasks: List[S3BatchOperationTaskModel] diff --git a/docs/utilities/parser.md b/docs/utilities/parser.md index bfd64f8b7ef..4a91d5aa13c 100644 --- a/docs/utilities/parser.md +++ b/docs/utilities/parser.md @@ -191,6 +191,7 @@ Parser comes with the following built-in models: | **KinesisFirehoseModel** | Lambda Event Source payload for Amazon Kinesis Firehose | | **KinesisFirehoseSqsModel** | Lambda Event Source payload for SQS messages wrapped in Kinesis Firehose records | | **LambdaFunctionUrlModel** | Lambda Event Source payload for Lambda Function URL payload | +| **S3BatchOperationModel** | Lambda Event Source payload for Amazon S3 Batch Operation | | **S3EventNotificationEventBridgeModel** | Lambda Event Source payload for Amazon S3 Event Notification to EventBridge. | | **S3Model** | Lambda Event Source payload for Amazon S3 | | **S3ObjectLambdaEvent** | Lambda Event Source payload for Amazon S3 Object Lambda | diff --git a/tests/unit/parser/test_s3_batch_operation.py b/tests/unit/parser/test_s3_batch_operation.py new file mode 100644 index 00000000000..9a27ccc4ef1 --- /dev/null +++ b/tests/unit/parser/test_s3_batch_operation.py @@ -0,0 +1,38 @@ +from aws_lambda_powertools.utilities.parser.models import S3BatchOperationModel +from tests.functional.utils import load_event + + +def test_s3_batch_operation_v1_trigger_event(): + raw_event = load_event("s3BatchOperationEventSchemaV1.json") + parsed_event: S3BatchOperationModel = S3BatchOperationModel(**raw_event) + + tasks = list(parsed_event.tasks) + assert len(tasks) == 1 + + assert parsed_event.invocationId == raw_event["invocationId"] + assert parsed_event.invocationSchemaVersion == raw_event["invocationSchemaVersion"] + assert parsed_event.job.id == raw_event["job"]["id"] + + assert tasks[0].taskId == raw_event["tasks"][0]["taskId"] + assert tasks[0].s3Key == raw_event["tasks"][0]["s3Key"] + assert tasks[0].s3VersionId == raw_event["tasks"][0]["s3VersionId"] + assert tasks[0].s3BucketArn == raw_event["tasks"][0]["s3BucketArn"] + assert tasks[0].s3Bucket == "powertools-dataset" + + +def test_s3_batch_operation_v2_trigger_event(): + raw_event = load_event("s3BatchOperationEventSchemaV2.json") + parsed_event: S3BatchOperationModel = S3BatchOperationModel(**raw_event) + + tasks = list(parsed_event.tasks) + assert len(tasks) == 1 + + assert parsed_event.invocationId == raw_event["invocationId"] + assert parsed_event.invocationSchemaVersion == raw_event["invocationSchemaVersion"] + assert parsed_event.job.id == raw_event["job"]["id"] + assert parsed_event.job.userArguments == raw_event["job"]["userArguments"] + + assert tasks[0].taskId == raw_event["tasks"][0]["taskId"] + assert tasks[0].s3Key == raw_event["tasks"][0]["s3Key"] + assert tasks[0].s3VersionId == raw_event["tasks"][0]["s3VersionId"] + assert tasks[0].s3Bucket == raw_event["tasks"][0]["s3Bucket"] diff --git a/tests/unit/parser/test_s3 object_event.py b/tests/unit/parser/test_s3_object_event.py similarity index 100% rename from tests/unit/parser/test_s3 object_event.py rename to tests/unit/parser/test_s3_object_event.py From fe5424f9fe344f572bdeaebc05f34fe0b1265de7 Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Tue, 16 Jan 2024 16:32:07 +0000 Subject: [PATCH 08/12] Small refactor --- .../utilities/data_classes/s3_batch_operation_event.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/aws_lambda_powertools/utilities/data_classes/s3_batch_operation_event.py b/aws_lambda_powertools/utilities/data_classes/s3_batch_operation_event.py index 7ae6a5fad3e..cdf1de7170a 100644 --- a/aws_lambda_powertools/utilities/data_classes/s3_batch_operation_event.py +++ b/aws_lambda_powertools/utilities/data_classes/s3_batch_operation_event.py @@ -3,8 +3,7 @@ from typing import Any, Dict, Iterator, List, Optional, Tuple from urllib.parse import unquote_plus -from typing_extensions import Literal - +from aws_lambda_powertools.shared.types import Literal from aws_lambda_powertools.utilities.data_classes.common import DictWrapper # list of valid result code. Used both in S3BatchOperationResponse and S3BatchOperationResponseRecord From deb270a598ba4799bb0ff625efd1a620f07f951d Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Wed, 17 Jan 2024 11:50:41 +0000 Subject: [PATCH 09/12] Addressing Ruben's feedback - Docs and examples --- .../data_classes/s3_batch_operation_event.py | 22 ++++++---- docs/utilities/data_classes.md | 42 +------------------ .../event_sources/src/s3_batch_operation.py | 37 ++++++++++++++++ 3 files changed, 52 insertions(+), 49 deletions(-) create mode 100644 examples/event_sources/src/s3_batch_operation.py diff --git a/aws_lambda_powertools/utilities/data_classes/s3_batch_operation_event.py b/aws_lambda_powertools/utilities/data_classes/s3_batch_operation_event.py index cdf1de7170a..019856ad2e9 100644 --- a/aws_lambda_powertools/utilities/data_classes/s3_batch_operation_event.py +++ b/aws_lambda_powertools/utilities/data_classes/s3_batch_operation_event.py @@ -51,11 +51,11 @@ class S3BatchOperationResponse: The identifier of the invocation request. This must be copied from the event. treat_missing_keys_as : Literal["Succeeded", "TemporaryFailure", "PermanentFailure"] - undocumented parameter, defaults to "PermanentFailure" + Undocumented parameter, defaults to "Succeeded" results : List[S3BatchOperationResult] - results of each S3 Batch Operations task, - optional parameter at start. can be added later using `add_result` function. + Results of each S3 Batch Operations task, + optional parameter at start. Can be added later using `add_result` function. Examples -------- @@ -77,7 +77,11 @@ class S3BatchOperationResponse: @event_source(data_class=S3BatchOperationEvent) def lambda_handler(event: S3BatchOperationEvent, context: LambdaContext): - response = S3BatchOperationResponse(event.invocation_schema_version, event.invocation_id, "PermanentFailure") + response = S3BatchOperationResponse( + event.invocation_schema_version, + event.invocation_id, + "PermanentFailure" + ) result = None task = event.task @@ -93,7 +97,7 @@ def lambda_handler(event: S3BatchOperationEvent, context: LambdaContext): error_code = e.response['Error']['Code'] error_message = e.response['Error']['Message'] if error_code == 'RequestTimeout': - result = task.build_task_batch_response("TemporaryFailure", "Retry request to Amazon S3 due to timeout.") + result = task.build_task_batch_response("TemporaryFailure", "Timeout - trying again") else: result = task.build_task_batch_response("PermanentFailure", f"{error_code}: {error_message}") except Exception as e: @@ -102,9 +106,8 @@ def lambda_handler(event: S3BatchOperationEvent, context: LambdaContext): response.add_result(result) return response.asdict() - ``` - """ # noqa: E501 + """ invocation_schema_version: str invocation_id: str @@ -184,7 +187,7 @@ def build_task_batch_response( result_code: Literal["Succeeded", "TemporaryFailure", "PermanentFailure"] = "Succeeded", result_string: str = "", ) -> S3BatchOperationResponseRecord: - """Create a S3BatchOperationResponseRecord directly using the record_id and given values + """Create a S3BatchOperationResponseRecord directly using the task_id and given values Parameters ---------- @@ -215,7 +218,7 @@ def invocation_id(self) -> str: @property def invocation_schema_version(self) -> Literal["1.0", "2.0"]: - """ " + """ Get the schema version for the payload that Batch Operations sends when invoking an AWS Lambda function. Either '1.0' or '2.0'. """ @@ -223,6 +226,7 @@ def invocation_schema_version(self) -> Literal["1.0", "2.0"]: @property def tasks(self) -> Iterator[S3BatchOperationTask]: + """Get s3 batch operation tasks""" for task in self["tasks"]: yield S3BatchOperationTask(task) diff --git a/docs/utilities/data_classes.md b/docs/utilities/data_classes.md index 63decf7ad49..97b7a5dfda2 100644 --- a/docs/utilities/data_classes.md +++ b/docs/utilities/data_classes.md @@ -1083,46 +1083,8 @@ This example is based on the AWS S3 Batch Operations documentation [Example Lamb === "app.py" - ```python hl_lines="5-8 13 15 26 31 33 35" - import boto3 - - from botocore.exceptions import ClientError - - from aws_lambda_powertools.utilities.data_classes import ( - S3BatchOperationEvent, - S3BatchOperationResponse, - event_source - ) - from aws_lambda_powertools.utilities.typing import LambdaContext - - - @event_source(data_class=S3BatchOperationEvent) - def lambda_handler(event: S3BatchOperationEvent, context: LambdaContext): - response = S3BatchOperationResponse(event.invocation_schema_version, event.invocation_id, "PermanentFailure") - - result = None - task = event.task - src_key: str = task.s3_key - src_bucket: str = task.s3_bucket - - s3 = boto3.client("s3", region_name='us-east-1') - - try: - dest_bucket, dest_key = do_some_work(s3, src_bucket, src_key) - result = task.build_task_batch_response("Succeeded", f"s3://{dest_bucket}/{dest_key}") - except ClientError as e: - error_code = e.response['Error']['Code'] - error_message = e.response['Error']['Message'] - if error_code == 'RequestTimeout': - result = task.build_task_batch_response("TemporaryFailure", "Retry request to Amazon S3 due to timeout.") - else: - result = task.build_task_batch_response("PermanentFailure", f"{error_code}: {error_message}") - except Exception as e: - result = task.build_task_batch_response("PermanentFailure", str(e)) - finally: - response.add_result(result) - - return response.asdict() + ```python hl_lines="4 8 10 20 25 27 29 33" + --8<-- "examples/event_sources/src/s3_batch_operation.py" ``` ### S3 Object Lambda diff --git a/examples/event_sources/src/s3_batch_operation.py b/examples/event_sources/src/s3_batch_operation.py new file mode 100644 index 00000000000..e292d8cae47 --- /dev/null +++ b/examples/event_sources/src/s3_batch_operation.py @@ -0,0 +1,37 @@ +import boto3 +from botocore.exceptions import ClientError + +from aws_lambda_powertools.utilities.data_classes import S3BatchOperationEvent, S3BatchOperationResponse, event_source +from aws_lambda_powertools.utilities.typing import LambdaContext + + +@event_source(data_class=S3BatchOperationEvent) +def lambda_handler(event: S3BatchOperationEvent, context: LambdaContext): + response = S3BatchOperationResponse(event.invocation_schema_version, event.invocation_id, "PermanentFailure") + + task = event.task + src_key: str = task.s3_key + src_bucket: str = task.s3_bucket + + s3 = boto3.client("s3", region_name="us-east-1") + + try: + dest_bucket, dest_key = do_some_work(s3, src_bucket, src_key) + result = task.build_task_batch_response("Succeeded", f"s3://{dest_bucket}/{dest_key}") + except ClientError as e: + error_code = e.response["Error"]["Code"] + error_message = e.response["Error"]["Message"] + if error_code == "RequestTimeout": + result = task.build_task_batch_response("TemporaryFailure", "Retry request to Amazon S3 due to timeout.") + else: + result = task.build_task_batch_response("PermanentFailure", f"{error_code}: {error_message}") + except Exception as e: + result = task.build_task_batch_response("PermanentFailure", str(e)) + finally: + response.add_result(result) + + return response.asdict() + + +def do_some_work(s3_client, src_bucket: str, src_key: str): + ... From 12e81d4e2894db105bba69c1602893a9392c6ddb Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Wed, 17 Jan 2024 11:52:10 +0000 Subject: [PATCH 10/12] Addressing Ruben's feedback - Docs and examples --- .../utilities/data_classes/s3_batch_operation_event.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws_lambda_powertools/utilities/data_classes/s3_batch_operation_event.py b/aws_lambda_powertools/utilities/data_classes/s3_batch_operation_event.py index 019856ad2e9..1b344368aa5 100644 --- a/aws_lambda_powertools/utilities/data_classes/s3_batch_operation_event.py +++ b/aws_lambda_powertools/utilities/data_classes/s3_batch_operation_event.py @@ -174,7 +174,7 @@ def s3_bucket_arn(self) -> Optional[str]: @property def s3_bucket(self) -> str: - """ " + """ Get the s3 bucket, either from 's3Bucket' property (invocationSchemaVersion '2.0') or from 's3BucketArn' (invocationSchemaVersion '1.0') """ From 57681c4a3b19b5f802b28232676928670389f6fc Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Wed, 17 Jan 2024 12:07:14 +0000 Subject: [PATCH 11/12] Addressing Ruben's feedback - Code --- .../data_classes/s3_batch_operation_event.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/aws_lambda_powertools/utilities/data_classes/s3_batch_operation_event.py b/aws_lambda_powertools/utilities/data_classes/s3_batch_operation_event.py index 1b344368aa5..c62267aa351 100644 --- a/aws_lambda_powertools/utilities/data_classes/s3_batch_operation_event.py +++ b/aws_lambda_powertools/utilities/data_classes/s3_batch_operation_event.py @@ -7,21 +7,22 @@ from aws_lambda_powertools.utilities.data_classes.common import DictWrapper # list of valid result code. Used both in S3BatchOperationResponse and S3BatchOperationResponseRecord -VALID_RESULT_CODE_TYPES: Tuple[str, str, str] = ("Succeeded", "TemporaryFailure", "PermanentFailure") +VALID_RESULT_CODES: Tuple[str, str, str] = ("Succeeded", "TemporaryFailure", "PermanentFailure") +RESULT_CODE_TYPE = Literal["Succeeded", "TemporaryFailure", "PermanentFailure"] @dataclass(repr=False, order=False) class S3BatchOperationResponseRecord: task_id: str - result_code: Literal["Succeeded", "TemporaryFailure", "PermanentFailure"] + result_code: RESULT_CODE_TYPE result_string: Optional[str] = None def asdict(self) -> Dict[str, Any]: - if self.result_code not in VALID_RESULT_CODE_TYPES: + if self.result_code not in VALID_RESULT_CODES: warnings.warn( stacklevel=2, message=f"The resultCode {self.result_code} is not valid. " - "Choose from 'Ok', 'Dropped' or 'ProcessingFailed '", + "Choose from 'Succeeded', 'TemporaryFailure', 'PermanentFailure'", ) return { @@ -111,11 +112,11 @@ def lambda_handler(event: S3BatchOperationEvent, context: LambdaContext): invocation_schema_version: str invocation_id: str - treat_missing_keys_as: Literal["Succeeded", "TemporaryFailure", "PermanentFailure"] = "Succeeded" + treat_missing_keys_as: RESULT_CODE_TYPE = "Succeeded" results: List[S3BatchOperationResponseRecord] = field(default_factory=list) def __post_init__(self): - if self.treat_missing_keys_as not in VALID_RESULT_CODE_TYPES: + if self.treat_missing_keys_as not in VALID_RESULT_CODES: warnings.warn( stacklevel=2, message=f"The value {self.treat_missing_keys_as} is not valid for treat_missing_keys_as, " From a99594f27087907f558241299e631bb3620693cc Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Wed, 17 Jan 2024 12:32:12 +0000 Subject: [PATCH 12/12] Addressing Ruben's feedback - Code --- .../utilities/data_classes/s3_batch_operation_event.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aws_lambda_powertools/utilities/data_classes/s3_batch_operation_event.py b/aws_lambda_powertools/utilities/data_classes/s3_batch_operation_event.py index c62267aa351..9c742e0c553 100644 --- a/aws_lambda_powertools/utilities/data_classes/s3_batch_operation_event.py +++ b/aws_lambda_powertools/utilities/data_classes/s3_batch_operation_event.py @@ -22,7 +22,7 @@ def asdict(self) -> Dict[str, Any]: warnings.warn( stacklevel=2, message=f"The resultCode {self.result_code} is not valid. " - "Choose from 'Succeeded', 'TemporaryFailure', 'PermanentFailure'", + f"Choose from {', '.join(map(repr, VALID_RESULT_CODES))}.", ) return { @@ -120,7 +120,7 @@ def __post_init__(self): warnings.warn( stacklevel=2, message=f"The value {self.treat_missing_keys_as} is not valid for treat_missing_keys_as, " - "Choose from 'Succeeded', 'TemporaryFailure', 'PermanentFailure'", + f"Choose from {', '.join(map(repr, VALID_RESULT_CODES))}.", ) def add_result(self, result: S3BatchOperationResponseRecord):