diff --git a/Makefile b/Makefile index 0ee0ee76fbd..4cee795ea30 100644 --- a/Makefile +++ b/Makefile @@ -90,3 +90,10 @@ changelog: mypy: poetry run mypy --pretty aws_lambda_powertools + +format-examples: + poetry run isort docs/examples + poetry run black docs/examples/*/*/*.py + +lint-examples: + poetry run python3 -m py_compile docs/examples/*/*/*.py diff --git a/docs/examples/utilities/data_classes/app_active_mq.py b/docs/examples/utilities/data_classes/app_active_mq.py new file mode 100644 index 00000000000..fd169637f35 --- /dev/null +++ b/docs/examples/utilities/data_classes/app_active_mq.py @@ -0,0 +1,15 @@ +from typing import Dict + +from aws_lambda_powertools import Logger +from aws_lambda_powertools.utilities.data_classes import event_source +from aws_lambda_powertools.utilities.data_classes.active_mq_event import ActiveMQEvent + +logger = Logger() + + +@event_source(data_class=ActiveMQEvent) +def lambda_handler(event: ActiveMQEvent, context): + for message in event.messages: + logger.debug(f"MessageID: {message.message_id}") + data: Dict = message.json_data + logger.debug("Process json in base64 encoded data str", data) diff --git a/docs/examples/utilities/data_classes/app_alb.py b/docs/examples/utilities/data_classes/app_alb.py new file mode 100644 index 00000000000..3703b02a997 --- /dev/null +++ b/docs/examples/utilities/data_classes/app_alb.py @@ -0,0 +1,7 @@ +from aws_lambda_powertools.utilities.data_classes import ALBEvent, event_source + + +@event_source(data_class=ALBEvent) +def lambda_handler(event: ALBEvent, context): + if "helloworld" in event.path and event.http_method == "POST": + do_something_with(event.json_body, event.query_string_parameters) diff --git a/docs/examples/utilities/data_classes/app_appsync_authorizer.py b/docs/examples/utilities/data_classes/app_appsync_authorizer.py new file mode 100644 index 00000000000..012f7beb016 --- /dev/null +++ b/docs/examples/utilities/data_classes/app_appsync_authorizer.py @@ -0,0 +1,33 @@ +from typing import Dict + +from aws_lambda_powertools.logging import correlation_paths +from aws_lambda_powertools.logging.logger import Logger +from aws_lambda_powertools.utilities.data_classes.appsync_authorizer_event import ( + AppSyncAuthorizerEvent, + AppSyncAuthorizerResponse, +) +from aws_lambda_powertools.utilities.data_classes.event_source import event_source + +logger = Logger() + + +def get_user_by_token(token: str): + """Look a user by token""" + ... + + +@logger.inject_lambda_context(correlation_id_path=correlation_paths.APPSYNC_AUTHORIZER) +@event_source(data_class=AppSyncAuthorizerEvent) +def lambda_handler(event: AppSyncAuthorizerEvent, context) -> Dict: + user = get_user_by_token(event.authorization_token) + + if not user: + # No user found, return not authorized + return AppSyncAuthorizerResponse().asdict() + + return AppSyncAuthorizerResponse( + authorize=True, + resolver_context={"id": user.id}, + # Only allow admins to delete events + deny_fields=None if user.is_admin else ["Mutation.deleteEvent"], + ).asdict() diff --git a/docs/examples/utilities/data_classes/app_appsync_resolver.py b/docs/examples/utilities/data_classes/app_appsync_resolver.py new file mode 100644 index 00000000000..25d39e96829 --- /dev/null +++ b/docs/examples/utilities/data_classes/app_appsync_resolver.py @@ -0,0 +1,36 @@ +from aws_lambda_powertools.logging import Logger, correlation_paths +from aws_lambda_powertools.utilities.data_classes.appsync_resolver_event import ( + AppSyncIdentityCognito, + AppSyncResolverEvent, +) + +logger = Logger() + + +def get_locations(name: str = None, size: int = 0, page: int = 0): + """Your resolver logic here""" + + +@logger.inject_lambda_context(correlation_id_path=correlation_paths.APPSYNC_RESOLVER) +def lambda_handler(event, context): + event: AppSyncResolverEvent = AppSyncResolverEvent(event) + + # Case insensitive look up of request headers + x_forwarded_for = event.get_header_value("x-forwarded-for") + + # Support for AppSyncIdentityCognito or AppSyncIdentityIAM identity types + assert isinstance(event.identity, AppSyncIdentityCognito) + identity: AppSyncIdentityCognito = event.identity + + # Logging with correlation_id + logger.debug( + { + "x-forwarded-for": x_forwarded_for, + "username": identity.username, + } + ) + + if event.type_name == "Merchant" and event.field_name == "locations": + return get_locations(**event.arguments) + + raise ValueError(f"Unsupported field resolver: {event.field_name}") diff --git a/docs/examples/utilities/data_classes/app_cloudwatch_logs.py b/docs/examples/utilities/data_classes/app_cloudwatch_logs.py new file mode 100644 index 00000000000..88008c87b59 --- /dev/null +++ b/docs/examples/utilities/data_classes/app_cloudwatch_logs.py @@ -0,0 +1,10 @@ +from aws_lambda_powertools.utilities.data_classes import CloudWatchLogsEvent, event_source +from aws_lambda_powertools.utilities.data_classes.cloud_watch_logs_event import CloudWatchLogsDecodedData + + +@event_source(data_class=CloudWatchLogsEvent) +def lambda_handler(event: CloudWatchLogsEvent, context): + decompressed_log: CloudWatchLogsDecodedData = event.parse_logs_data + log_events = decompressed_log.log_events + for event in log_events: + do_something_with(event.timestamp, event.message) diff --git a/docs/examples/utilities/data_classes/app_codepipeline_job.py b/docs/examples/utilities/data_classes/app_codepipeline_job.py new file mode 100644 index 00000000000..eb4c2125f56 --- /dev/null +++ b/docs/examples/utilities/data_classes/app_codepipeline_job.py @@ -0,0 +1,43 @@ +from aws_lambda_powertools import Logger +from aws_lambda_powertools.utilities.data_classes import CodePipelineJobEvent, event_source + +logger = Logger() + + +@event_source(data_class=CodePipelineJobEvent) +def lambda_handler(event, context): + """The Lambda function handler + + If a continuing job then checks the CloudFormation stack status + and updates the job accordingly. + + If a new job then kick of an update or creation of the target + CloudFormation stack. + """ + + # Extract the Job ID + job_id = event.get_id + + # Extract the params + params: dict = event.decoded_user_parameters + stack = params["stack"] + artifact_name = params["artifact"] + template_file = params["file"] + + try: + if event.data.continuation_token: + # If we're continuing then the create/update has already been triggered + # we just need to check if it has finished. + check_stack_update_status(job_id, stack) + else: + template = event.get_artifact(artifact_name, template_file) + # Kick off a stack update or create + start_update_or_create(job_id, stack, template) + except Exception as e: + # If any other exceptions which we didn't expect are raised + # then fail the job and log the exception message. + logger.exception("Function failed due to exception.") + put_job_failure(job_id, "Function exception: " + str(e)) + + logger.debug("Function complete.") + return "Complete." diff --git a/docs/examples/utilities/data_classes/app_cognito_create_auth_challenge.py b/docs/examples/utilities/data_classes/app_cognito_create_auth_challenge.py new file mode 100644 index 00000000000..9f57743f053 --- /dev/null +++ b/docs/examples/utilities/data_classes/app_cognito_create_auth_challenge.py @@ -0,0 +1,11 @@ +from aws_lambda_powertools.utilities.data_classes import event_source +from aws_lambda_powertools.utilities.data_classes.cognito_user_pool_event import CreateAuthChallengeTriggerEvent + + +@event_source(data_class=CreateAuthChallengeTriggerEvent) +def handler(event: CreateAuthChallengeTriggerEvent, context) -> dict: + if event.request.challenge_name == "CUSTOM_CHALLENGE": + event.response.public_challenge_parameters = {"captchaUrl": "url/123.jpg"} + event.response.private_challenge_parameters = {"answer": "5"} + event.response.challenge_metadata = "CAPTCHA_CHALLENGE" + return event.raw_event diff --git a/docs/examples/utilities/data_classes/app_cognito_define_auth_challenge.py b/docs/examples/utilities/data_classes/app_cognito_define_auth_challenge.py new file mode 100644 index 00000000000..e7efad058a9 --- /dev/null +++ b/docs/examples/utilities/data_classes/app_cognito_define_auth_challenge.py @@ -0,0 +1,29 @@ +from aws_lambda_powertools.utilities.data_classes.cognito_user_pool_event import DefineAuthChallengeTriggerEvent + + +def handler(event: dict, context) -> dict: + event: DefineAuthChallengeTriggerEvent = DefineAuthChallengeTriggerEvent(event) + if len(event.request.session) == 1 and event.request.session[0].challenge_name == "SRP_A": + event.response.issue_tokens = False + event.response.fail_authentication = False + event.response.challenge_name = "PASSWORD_VERIFIER" + elif ( + len(event.request.session) == 2 + and event.request.session[1].challenge_name == "PASSWORD_VERIFIER" + and event.request.session[1].challenge_result + ): + event.response.issue_tokens = False + event.response.fail_authentication = False + event.response.challenge_name = "CUSTOM_CHALLENGE" + elif ( + len(event.request.session) == 3 + and event.request.session[2].challenge_name == "CUSTOM_CHALLENGE" + and event.request.session[2].challenge_result + ): + event.response.issue_tokens = True + event.response.fail_authentication = False + else: + event.response.issue_tokens = False + event.response.fail_authentication = True + + return event.raw_event diff --git a/docs/examples/utilities/data_classes/app_cognito_post_confirmation.py b/docs/examples/utilities/data_classes/app_cognito_post_confirmation.py new file mode 100644 index 00000000000..bae89629a0c --- /dev/null +++ b/docs/examples/utilities/data_classes/app_cognito_post_confirmation.py @@ -0,0 +1,8 @@ +from aws_lambda_powertools.utilities.data_classes.cognito_user_pool_event import PostConfirmationTriggerEvent + + +def lambda_handler(event, context): + event: PostConfirmationTriggerEvent = PostConfirmationTriggerEvent(event) + + user_attributes = event.request.user_attributes + do_something_with(user_attributes) diff --git a/docs/examples/utilities/data_classes/app_cognito_verify_auth_challenge_response.py b/docs/examples/utilities/data_classes/app_cognito_verify_auth_challenge_response.py new file mode 100644 index 00000000000..95b96c4d2f9 --- /dev/null +++ b/docs/examples/utilities/data_classes/app_cognito_verify_auth_challenge_response.py @@ -0,0 +1,10 @@ +from aws_lambda_powertools.utilities.data_classes import event_source +from aws_lambda_powertools.utilities.data_classes.cognito_user_pool_event import VerifyAuthChallengeResponseTriggerEvent + + +@event_source(data_class=VerifyAuthChallengeResponseTriggerEvent) +def handler(event: VerifyAuthChallengeResponseTriggerEvent, context) -> dict: + event.response.answer_correct = ( + event.request.private_challenge_parameters.get("answer") == event.request.challenge_answer + ) + return event.raw_event diff --git a/docs/examples/utilities/data_classes/app_connect_contact_flow.py b/docs/examples/utilities/data_classes/app_connect_contact_flow.py new file mode 100644 index 00000000000..53d120a4c4b --- /dev/null +++ b/docs/examples/utilities/data_classes/app_connect_contact_flow.py @@ -0,0 +1,14 @@ +from aws_lambda_powertools.utilities.data_classes.connect_contact_flow_event import ( + ConnectContactFlowChannel, + ConnectContactFlowEndpointType, + ConnectContactFlowEvent, + ConnectContactFlowInitiationMethod, +) + + +def lambda_handler(event, context): + event: ConnectContactFlowEvent = ConnectContactFlowEvent(event) + assert event.contact_data.attributes == {"Language": "en-US"} + assert event.contact_data.channel == ConnectContactFlowChannel.VOICE + assert event.contact_data.customer_endpoint.endpoint_type == ConnectContactFlowEndpointType.TELEPHONE_NUMBER + assert event.contact_data.initiation_method == ConnectContactFlowInitiationMethod.API diff --git a/docs/examples/utilities/data_classes/app_dynamodb.py b/docs/examples/utilities/data_classes/app_dynamodb.py new file mode 100644 index 00000000000..e9bc49da03e --- /dev/null +++ b/docs/examples/utilities/data_classes/app_dynamodb.py @@ -0,0 +1,14 @@ +from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import ( + DynamoDBRecordEventName, + DynamoDBStreamEvent, +) + + +def lambda_handler(event, context): + event: DynamoDBStreamEvent = DynamoDBStreamEvent(event) + + # Multiple records can be delivered in a single event + for record in event.records: + if record.event_name == DynamoDBRecordEventName.MODIFY: + do_something_with(record.dynamodb.new_image) + do_something_with(record.dynamodb.old_image) diff --git a/docs/examples/utilities/data_classes/app_dynamodb_multiple_records_types.py b/docs/examples/utilities/data_classes/app_dynamodb_multiple_records_types.py new file mode 100644 index 00000000000..53b38918260 --- /dev/null +++ b/docs/examples/utilities/data_classes/app_dynamodb_multiple_records_types.py @@ -0,0 +1,16 @@ +from aws_lambda_powertools.utilities.data_classes import DynamoDBStreamEvent, event_source +from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import AttributeValue, AttributeValueType +from aws_lambda_powertools.utilities.typing import LambdaContext + + +@event_source(data_class=DynamoDBStreamEvent) +def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext): + for record in event.records: + key: AttributeValue = record.dynamodb.keys["id"] + if key == AttributeValueType.Number: + # {"N": "123.45"} => "123.45" + assert key.get_value == key.n_value + print(key.get_value) + elif key == AttributeValueType.Map: + assert key.get_value == key.map_value + print(key.get_value) diff --git a/docs/examples/utilities/data_classes/app_event_bridge.py b/docs/examples/utilities/data_classes/app_event_bridge.py new file mode 100644 index 00000000000..6570add4578 --- /dev/null +++ b/docs/examples/utilities/data_classes/app_event_bridge.py @@ -0,0 +1,6 @@ +from aws_lambda_powertools.utilities.data_classes import EventBridgeEvent, event_source + + +@event_source(data_class=EventBridgeEvent) +def lambda_handler(event: EventBridgeEvent, context): + do_something_with(event.detail) diff --git a/docs/examples/utilities/data_classes/app_http_api.py b/docs/examples/utilities/data_classes/app_http_api.py new file mode 100644 index 00000000000..970c4ce6f73 --- /dev/null +++ b/docs/examples/utilities/data_classes/app_http_api.py @@ -0,0 +1,7 @@ +from aws_lambda_powertools.utilities.data_classes import APIGatewayProxyEventV2, event_source + + +@event_source(data_class=APIGatewayProxyEventV2) +def lambda_handler(event: APIGatewayProxyEventV2, context): + if "helloworld" in event.path and event.http_method == "POST": + do_something_with(event.json_body, event.query_string_parameters) diff --git a/docs/examples/utilities/data_classes/app_http_api_authorizer.py b/docs/examples/utilities/data_classes/app_http_api_authorizer.py new file mode 100644 index 00000000000..a067e0179a0 --- /dev/null +++ b/docs/examples/utilities/data_classes/app_http_api_authorizer.py @@ -0,0 +1,25 @@ +from secrets import compare_digest + +from aws_lambda_powertools.utilities.data_classes import event_source +from aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event import ( + APIGatewayAuthorizerEventV2, + APIGatewayAuthorizerResponseV2, +) + + +def get_user_by_token(token): + if compare_digest(token, "Foo"): + return {"name": "Foo"} + return None + + +@event_source(data_class=APIGatewayAuthorizerEventV2) +def handler(event: APIGatewayAuthorizerEventV2, context): + user = get_user_by_token(event.get_header_value("x-token")) + + if user is None: + # No user was found, so we return not authorized + return APIGatewayAuthorizerResponseV2().asdict() + + # Found the user and setting the details in the context + return APIGatewayAuthorizerResponseV2(authorize=True, context=user).asdict() diff --git a/docs/examples/utilities/data_classes/app_kinesis_data_streams.py b/docs/examples/utilities/data_classes/app_kinesis_data_streams.py new file mode 100644 index 00000000000..77a6fcfeb7a --- /dev/null +++ b/docs/examples/utilities/data_classes/app_kinesis_data_streams.py @@ -0,0 +1,14 @@ +from aws_lambda_powertools.utilities.data_classes import KinesisStreamEvent, event_source + + +@event_source(data_class=KinesisStreamEvent) +def lambda_handler(event: KinesisStreamEvent, context): + kinesis_record = next(event.records).kinesis + + # if data was delivered as text + data = kinesis_record.data_as_text() + + # if data was delivered as json + data = kinesis_record.data_as_json() + + do_something_with(data) diff --git a/docs/examples/utilities/data_classes/app_rabbit_mq.py b/docs/examples/utilities/data_classes/app_rabbit_mq.py new file mode 100644 index 00000000000..e4d241b1bdb --- /dev/null +++ b/docs/examples/utilities/data_classes/app_rabbit_mq.py @@ -0,0 +1,17 @@ +from typing import Dict + +from aws_lambda_powertools import Logger +from aws_lambda_powertools.utilities.data_classes import event_source +from aws_lambda_powertools.utilities.data_classes.rabbit_mq_event import RabbitMQEvent + +logger = Logger() + + +@event_source(data_class=RabbitMQEvent) +def lambda_handler(event: RabbitMQEvent, context): + for queue_name, messages in event.rmq_messages_by_queue.items(): + logger.debug(f"Messages for queue: {queue_name}") + for message in messages: + logger.debug(f"MessageID: {message.basic_properties.message_id}") + data: Dict = message.json_data + logger.debug("Process json in base64 encoded data str", data) diff --git a/docs/examples/utilities/data_classes/app_rest_api.py b/docs/examples/utilities/data_classes/app_rest_api.py new file mode 100644 index 00000000000..21921a6c09a --- /dev/null +++ b/docs/examples/utilities/data_classes/app_rest_api.py @@ -0,0 +1,10 @@ +from aws_lambda_powertools.utilities.data_classes import APIGatewayProxyEvent, event_source + + +@event_source(data_class=APIGatewayProxyEvent) +def lambda_handler(event: APIGatewayProxyEvent, context): + if "helloworld" in event.path and event.http_method == "GET": + request_context = event.request_context + identity = request_context.identity + user = identity.user + do_something_with(event.json_body, user) diff --git a/docs/examples/utilities/data_classes/app_rest_api_type_request.py b/docs/examples/utilities/data_classes/app_rest_api_type_request.py new file mode 100644 index 00000000000..d7a96bdd4bf --- /dev/null +++ b/docs/examples/utilities/data_classes/app_rest_api_type_request.py @@ -0,0 +1,52 @@ +from secrets import compare_digest + +from aws_lambda_powertools.utilities.data_classes import event_source +from aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event import ( + DENY_ALL_RESPONSE, + APIGatewayAuthorizerRequestEvent, + APIGatewayAuthorizerResponse, + HttpVerb, +) + + +def get_user_by_token(token): + if compare_digest(token, "admin-foo"): + return {"id": 0, "name": "Admin", "isAdmin": True} + elif compare_digest(token, "regular-foo"): + return {"id": 1, "name": "Joe"} + else: + return None + + +@event_source(data_class=APIGatewayAuthorizerRequestEvent) +def handler(event: APIGatewayAuthorizerRequestEvent, context): + user = get_user_by_token(event.get_header_value("Authorization")) + + if user is None: + # No user was found + # to return 401 - `{"message":"Unauthorized"}`, but pollutes lambda error count metrics + # raise Exception("Unauthorized") + # to return 403 - `{"message":"Forbidden"}` + return DENY_ALL_RESPONSE + + # parse the `methodArn` as an `APIGatewayRouteArn` + arn = event.parsed_arn + + # Create the response builder from parts of the `methodArn` + # and set the logged in user id and context + policy = APIGatewayAuthorizerResponse( + principal_id=user["id"], + context=user, + region=arn.region, + aws_account_id=arn.aws_account_id, + api_id=arn.api_id, + stage=arn.stage, + ) + + # Conditional IAM Policy + if user.get("isAdmin", False): + policy.allow_all_routes() + else: + policy.allow_route(HttpVerb.GET, "/user-profile") + + return policy.asdict() diff --git a/docs/examples/utilities/data_classes/app_rest_api_type_token.py b/docs/examples/utilities/data_classes/app_rest_api_type_token.py new file mode 100644 index 00000000000..d3af20957e1 --- /dev/null +++ b/docs/examples/utilities/data_classes/app_rest_api_type_token.py @@ -0,0 +1,24 @@ +from aws_lambda_powertools.utilities.data_classes import event_source +from aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event import ( + APIGatewayAuthorizerResponse, + APIGatewayAuthorizerTokenEvent, +) + + +@event_source(data_class=APIGatewayAuthorizerTokenEvent) +def handler(event: APIGatewayAuthorizerTokenEvent, context): + arn = event.parsed_arn + + policy = APIGatewayAuthorizerResponse( + principal_id="user", + region=arn.region, + aws_account_id=arn.aws_account_id, + api_id=arn.api_id, + stage=arn.stage, + ) + + if event.authorization_token == "42": + policy.allow_all_routes() + else: + policy.deny_all_routes() + return policy.asdict() diff --git a/docs/examples/utilities/data_classes/app_s3.py b/docs/examples/utilities/data_classes/app_s3.py new file mode 100644 index 00000000000..42c7e847ce8 --- /dev/null +++ b/docs/examples/utilities/data_classes/app_s3.py @@ -0,0 +1,14 @@ +from urllib.parse import unquote_plus + +from aws_lambda_powertools.utilities.data_classes import S3Event, event_source + + +@event_source(data_class=S3Event) +def lambda_handler(event: S3Event, context): + bucket_name = event.bucket_name + + # Multiple records can be delivered in a single event + for record in event.records: + object_key = unquote_plus(record.s3.get_object.key) + + do_something_with(f"{bucket_name}/{object_key}") diff --git a/docs/examples/utilities/data_classes/app_s3_object_lambda.py b/docs/examples/utilities/data_classes/app_s3_object_lambda.py new file mode 100644 index 00000000000..7b27e766ad7 --- /dev/null +++ b/docs/examples/utilities/data_classes/app_s3_object_lambda.py @@ -0,0 +1,31 @@ +import boto3 +import requests + +from aws_lambda_powertools import Logger +from aws_lambda_powertools.logging.correlation_paths import S3_OBJECT_LAMBDA +from aws_lambda_powertools.utilities.data_classes.s3_object_event import S3ObjectLambdaEvent + +logger = Logger() +session = boto3.Session() +s3 = session.client("s3") + + +@logger.inject_lambda_context(correlation_id_path=S3_OBJECT_LAMBDA, log_event=True) +def lambda_handler(event, context): + event = S3ObjectLambdaEvent(event) + + # Get object from S3 + response = requests.get(event.input_s3_url) + original_object = response.content.decode("utf-8") + + # Make changes to the object about to be returned + transformed_object = original_object.upper() + + # Write object back to S3 Object Lambda + s3.write_get_object_response( + Body=transformed_object, + RequestRoute=event.request_route, + RequestToken=event.request_token, + ) + + return {"status_code": 200} diff --git a/docs/examples/utilities/data_classes/app_ses.py b/docs/examples/utilities/data_classes/app_ses.py new file mode 100644 index 00000000000..94982887857 --- /dev/null +++ b/docs/examples/utilities/data_classes/app_ses.py @@ -0,0 +1,11 @@ +from aws_lambda_powertools.utilities.data_classes import SESEvent, event_source + + +@event_source(data_class=SESEvent) +def lambda_handler(event: SESEvent, context): + # Multiple records can be delivered in a single event + for record in event.records: + mail = record.ses.mail + common_headers = mail.common_headers + + do_something_with(common_headers.to, common_headers.subject) diff --git a/docs/examples/utilities/data_classes/app_sns.py b/docs/examples/utilities/data_classes/app_sns.py new file mode 100644 index 00000000000..08b04c82504 --- /dev/null +++ b/docs/examples/utilities/data_classes/app_sns.py @@ -0,0 +1,11 @@ +from aws_lambda_powertools.utilities.data_classes import SNSEvent, event_source + + +@event_source(data_class=SNSEvent) +def lambda_handler(event: SNSEvent, context): + # Multiple records can be delivered in a single event + for record in event.records: + message = record.sns.message + subject = record.sns.subject + + do_something_with(subject, message) diff --git a/docs/examples/utilities/data_classes/app_sqs.py b/docs/examples/utilities/data_classes/app_sqs.py new file mode 100644 index 00000000000..d5031150558 --- /dev/null +++ b/docs/examples/utilities/data_classes/app_sqs.py @@ -0,0 +1,8 @@ +from aws_lambda_powertools.utilities.data_classes import SQSEvent, event_source + + +@event_source(data_class=SQSEvent) +def lambda_handler(event: SQSEvent, context): + # Multiple records can be delivered in a single event + for record in event.records: + do_something_with(record.body) diff --git a/docs/examples/utilities/data_classes/using_data_classes.py b/docs/examples/utilities/data_classes/using_data_classes.py new file mode 100644 index 00000000000..004379acdae --- /dev/null +++ b/docs/examples/utilities/data_classes/using_data_classes.py @@ -0,0 +1,7 @@ +from aws_lambda_powertools.utilities.data_classes import APIGatewayProxyEvent + + +def lambda_handler(event: dict, context): + event = APIGatewayProxyEvent(event) + if "helloworld" in event.path and event.http_method == "GET": + do_something_with(event.body, user) diff --git a/docs/examples/utilities/data_classes/using_data_classes_event_source.py b/docs/examples/utilities/data_classes/using_data_classes_event_source.py new file mode 100644 index 00000000000..d1688e5f898 --- /dev/null +++ b/docs/examples/utilities/data_classes/using_data_classes_event_source.py @@ -0,0 +1,7 @@ +from aws_lambda_powertools.utilities.data_classes import APIGatewayProxyEvent, event_source + + +@event_source(data_class=APIGatewayProxyEvent) +def lambda_handler(event: APIGatewayProxyEvent, context): + if "helloworld" in event.path and event.http_method == "GET": + do_something_with(event.body, user) diff --git a/docs/utilities/data_classes.md b/docs/utilities/data_classes.md index 8353d904bb1..c317c149342 100644 --- a/docs/utilities/data_classes.md +++ b/docs/utilities/data_classes.md @@ -29,27 +29,18 @@ For example, if your Lambda function is being triggered by an API Gateway proxy === "app.py" - ```python hl_lines="1 4" - from aws_lambda_powertools.utilities.data_classes import APIGatewayProxyEvent - - def lambda_handler(event: dict, context): - event = APIGatewayProxyEvent(event) - if 'helloworld' in event.path and event.http_method == 'GET': - do_something_with(event.body, user) + ```python hl_lines="1 5" + --8<-- "docs/examples/utilities/data_classes/using_data_classes.py" ``` Same example as above, but using the `event_source` decorator === "app.py" - ```python hl_lines="1 3" - from aws_lambda_powertools.utilities.data_classes import event_source, APIGatewayProxyEvent - - @event_source(data_class=APIGatewayProxyEvent) - def lambda_handler(event: APIGatewayProxyEvent, context): - if 'helloworld' in event.path and event.http_method == 'GET': - do_something_with(event.body, user) + ```python hl_lines="1 4" + --8<-- "docs/examples/utilities/data_classes/using_data_classes_event_source.py" ``` + **Autocomplete with self-documented properties and methods** ![Utilities Data Classes](../media/utilities_data_classes.png) @@ -92,21 +83,8 @@ for more details. === "app.py" - ```python hl_lines="4-5 9-10" - from typing import Dict - - from aws_lambda_powertools import Logger - from aws_lambda_powertools.utilities.data_classes import event_source - from aws_lambda_powertools.utilities.data_classes.active_mq_event import ActiveMQEvent - - logger = Logger() - - @event_source(data_class=ActiveMQEvent) - def lambda_handler(event: ActiveMQEvent, context): - for message in event.messages: - logger.debug(f"MessageID: {message.message_id}") - data: Dict = message.json_data - logger.debug("Process json in base64 encoded data str", data) + ```python hl_lines="4-5 10-11" + --8<-- "docs/examples/utilities/data_classes/app_active_mq.py" ``` ### API Gateway Authorizer @@ -123,86 +101,13 @@ Use **`APIGatewayAuthorizerRequestEvent`** for type `REQUEST` and **`APIGatewayA When the user is found, it includes the user details in the request context that will be available to the back-end, and returns a full access policy for admin users. - ```python hl_lines="2-6 29 36-42 47 49" - from aws_lambda_powertools.utilities.data_classes import event_source - from aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event import ( - DENY_ALL_RESPONSE, - APIGatewayAuthorizerRequestEvent, - APIGatewayAuthorizerResponse, - HttpVerb, - ) - from secrets import compare_digest - - - def get_user_by_token(token): - if compare_digest(token, "admin-foo"): - return {"id": 0, "name": "Admin", "isAdmin": True} - elif compare_digest(token, "regular-foo"): - return {"id": 1, "name": "Joe"} - else: - return None - - - @event_source(data_class=APIGatewayAuthorizerRequestEvent) - def handler(event: APIGatewayAuthorizerRequestEvent, context): - user = get_user_by_token(event.get_header_value("Authorization")) - - if user is None: - # No user was found - # to return 401 - `{"message":"Unauthorized"}`, but pollutes lambda error count metrics - # raise Exception("Unauthorized") - # to return 403 - `{"message":"Forbidden"}` - return DENY_ALL_RESPONSE - - # parse the `methodArn` as an `APIGatewayRouteArn` - arn = event.parsed_arn - - # Create the response builder from parts of the `methodArn` - # and set the logged in user id and context - policy = APIGatewayAuthorizerResponse( - principal_id=user["id"], - context=user, - region=arn.region, - aws_account_id=arn.aws_account_id, - api_id=arn.api_id, - stage=arn.stage, - ) - - # Conditional IAM Policy - if user.get("isAdmin", False): - policy.allow_all_routes() - else: - policy.allow_route(HttpVerb.GET, "/user-profile") - - return policy.asdict() + ```python hl_lines="4-9 30 37-44 48 50 52" + --8<-- "docs/examples/utilities/data_classes/app_rest_api_type_request.py" ``` === "app_type_token.py" ```python hl_lines="2-5 12-18 21 23-24" - from aws_lambda_powertools.utilities.data_classes import event_source - from aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event import ( - APIGatewayAuthorizerTokenEvent, - APIGatewayAuthorizerResponse, - ) - - - @event_source(data_class=APIGatewayAuthorizerTokenEvent) - def handler(event: APIGatewayAuthorizerTokenEvent, context): - arn = event.parsed_arn - - policy = APIGatewayAuthorizerResponse( - principal_id="user", - region=arn.region, - aws_account_id=arn.aws_account_id, - api_id=arn.api_id, - stage=arn.stage - ) - - if event.authorization_token == "42": - policy.allow_all_routes() - else: - policy.deny_all_routes() - return policy.asdict() + --8<-- "docs/examples/utilities/data_classes/app_rest_api_type_token.py" ``` ### API Gateway Authorizer V2 @@ -216,31 +121,8 @@ See also [this blog post](https://aws.amazon.com/blogs/compute/introducing-iam-a This example looks up user details via `x-token` header. It uses `APIGatewayAuthorizerResponseV2` to return a deny policy when user is not found or authorized. - ```python hl_lines="2-5 21 24" - from aws_lambda_powertools.utilities.data_classes import event_source - from aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event import ( - APIGatewayAuthorizerEventV2, - APIGatewayAuthorizerResponseV2, - ) - from secrets import compare_digest - - - def get_user_by_token(token): - if compare_digest(token, "Foo"): - return {"name": "Foo"} - return None - - - @event_source(data_class=APIGatewayAuthorizerEventV2) - def handler(event: APIGatewayAuthorizerEventV2, context): - user = get_user_by_token(event.get_header_value("x-token")) - - if user is None: - # No user was found, so we return not authorized - return APIGatewayAuthorizerResponseV2().asdict() - - # Found the user and setting the details in the context - return APIGatewayAuthorizerResponseV2(authorize=True, context=user).asdict() + ```python hl_lines="4-7 21 24" + --8<-- "docs/examples/utilities/data_classes/app_http_api_authorizer.py" ``` ### API Gateway Proxy @@ -250,15 +132,7 @@ It is used for either API Gateway REST API or HTTP API using v1 proxy event. === "app.py" ```python - from aws_lambda_powertools.utilities.data_classes import event_source, APIGatewayProxyEvent - - @event_source(data_class=APIGatewayProxyEvent) - def lambda_handler(event: APIGatewayProxyEvent, context): - if "helloworld" in event.path and event.http_method == "GET": - request_context = event.request_context - identity = request_context.identity - user = identity.user - do_something_with(event.json_body, user) + --8<-- "docs/examples/utilities/data_classes/app_rest_api.py" ``` ### API Gateway Proxy V2 @@ -268,12 +142,7 @@ It is used for HTTP API using v2 proxy event. === "app.py" ```python - from aws_lambda_powertools.utilities.data_classes import event_source, APIGatewayProxyEventV2 - - @event_source(data_class=APIGatewayProxyEventV2) - def lambda_handler(event: APIGatewayProxyEventV2, context): - if "helloworld" in event.path and event.http_method == "POST": - do_something_with(event.json_body, event.query_string_parameters) + --8<-- "docs/examples/utilities/data_classes/app_http_api.py" ``` ### Application Load Balancer @@ -283,12 +152,7 @@ Is it used for Application load balancer event. === "app.py" ```python - from aws_lambda_powertools.utilities.data_classes import event_source, ALBEvent - - @event_source(data_class=ALBEvent) - def lambda_handler(event: ALBEvent, context): - if "helloworld" in event.path and event.http_method == "POST": - do_something_with(event.json_body, event.query_string_parameters) + --8<-- "docs/examples/utilities/data_classes/app_alb.py" ``` ### AppSync Authorizer @@ -304,39 +168,7 @@ In this example extract the `requestId` as the `correlation_id` for logging, use === "app.py" ```python - from typing import Dict - - from aws_lambda_powertools.logging import correlation_paths - from aws_lambda_powertools.logging.logger import Logger - from aws_lambda_powertools.utilities.data_classes.appsync_authorizer_event import ( - AppSyncAuthorizerEvent, - AppSyncAuthorizerResponse, - ) - from aws_lambda_powertools.utilities.data_classes.event_source import event_source - - logger = Logger() - - - def get_user_by_token(token: str): - """Look a user by token""" - ... - - - @logger.inject_lambda_context(correlation_id_path=correlation_paths.APPSYNC_AUTHORIZER) - @event_source(data_class=AppSyncAuthorizerEvent) - def lambda_handler(event: AppSyncAuthorizerEvent, context) -> Dict: - user = get_user_by_token(event.authorization_token) - - if not user: - # No user found, return not authorized - return AppSyncAuthorizerResponse().asdict() - - return AppSyncAuthorizerResponse( - authorize=True, - resolver_context={"id": user.id}, - # Only allow admins to delete events - deny_fields=None if user.is_admin else ["Mutation.deleteEvent"], - ).asdict() + --8<-- "docs/examples/utilities/data_classes/app_appsync_authorizer.py" ``` ### AppSync Resolver @@ -350,40 +182,8 @@ In this example, we also use the new Logger `correlation_id` and built-in `corre === "app.py" - ```python hl_lines="2-5 12 14 19 21 29-30" - from aws_lambda_powertools.logging import Logger, correlation_paths - from aws_lambda_powertools.utilities.data_classes.appsync_resolver_event import ( - AppSyncResolverEvent, - AppSyncIdentityCognito - ) - - logger = Logger() - - def get_locations(name: str = None, size: int = 0, page: int = 0): - """Your resolver logic here""" - - @logger.inject_lambda_context(correlation_id_path=correlation_paths.APPSYNC_RESOLVER) - def lambda_handler(event, context): - event: AppSyncResolverEvent = AppSyncResolverEvent(event) - - # Case insensitive look up of request headers - x_forwarded_for = event.get_header_value("x-forwarded-for") - - # Support for AppSyncIdentityCognito or AppSyncIdentityIAM identity types - assert isinstance(event.identity, AppSyncIdentityCognito) - identity: AppSyncIdentityCognito = event.identity - - # Logging with correlation_id - logger.debug({ - "x-forwarded-for": x_forwarded_for, - "username": identity.username - }) - - if event.type_name == "Merchant" and event.field_name == "locations": - return get_locations(**event.arguments) - - raise ValueError(f"Unsupported field resolver: {event.field_name}") - + ```python hl_lines="2-5 14 16 21 23 33-34" + --8<-- "docs/examples/utilities/data_classes/app_appsync_resolver.py" ``` === "Example AppSync Event" @@ -446,15 +246,7 @@ decompress and parse json data from the event. === "app.py" ```python - from aws_lambda_powertools.utilities.data_classes import event_source, CloudWatchLogsEvent - from aws_lambda_powertools.utilities.data_classes.cloud_watch_logs_event import CloudWatchLogsDecodedData - - @event_source(data_class=CloudWatchLogsEvent) - def lambda_handler(event: CloudWatchLogsEvent, context): - decompressed_log: CloudWatchLogsDecodedData = event.parse_logs_data - log_events = decompressed_log.log_events - for event in log_events: - do_something_with(event.timestamp, event.message) + --8<-- "docs/examples/utilities/data_classes/app_cloudwatch_logs.py" ``` ### CodePipeline Job @@ -464,48 +256,7 @@ Data classes and utility functions to help create continuous delivery pipelines === "app.py" ```python - from aws_lambda_powertools import Logger - from aws_lambda_powertools.utilities.data_classes import event_source, CodePipelineJobEvent - - logger = Logger() - - @event_source(data_class=CodePipelineJobEvent) - def lambda_handler(event, context): - """The Lambda function handler - - If a continuing job then checks the CloudFormation stack status - and updates the job accordingly. - - If a new job then kick of an update or creation of the target - CloudFormation stack. - """ - - # Extract the Job ID - job_id = event.get_id - - # Extract the params - params: dict = event.decoded_user_parameters - stack = params["stack"] - artifact_name = params["artifact"] - template_file = params["file"] - - try: - if event.data.continuation_token: - # If we're continuing then the create/update has already been triggered - # we just need to check if it has finished. - check_stack_update_status(job_id, stack) - else: - template = event.get_artifact(artifact_name, template_file) - # Kick off a stack update or create - start_update_or_create(job_id, stack, template) - except Exception as e: - # If any other exceptions which we didn't expect are raised - # then fail the job and log the exception message. - logger.exception("Function failed due to exception.") - put_job_failure(job_id, "Function exception: " + str(e)) - - logger.debug("Function complete.") - return "Complete." + --8<-- "docs/examples/utilities/data_classes/app_codepipeline_job.py" ``` ### Cognito User Pool @@ -531,13 +282,7 @@ Verify Auth Challenge | `data_classes.cognito_user_pool_event.VerifyAuthChalleng === "app.py" ```python - from aws_lambda_powertools.utilities.data_classes.cognito_user_pool_event import PostConfirmationTriggerEvent - - def lambda_handler(event, context): - event: PostConfirmationTriggerEvent = PostConfirmationTriggerEvent(event) - - user_attributes = event.request.user_attributes - do_something_with(user_attributes) + --8<-- "docs/examples/utilities/data_classes/app_cognito_post_confirmation.py" ``` #### Define Auth Challenge Example @@ -550,37 +295,7 @@ This example is based on the AWS Cognito docs for [Define Auth Challenge Lambda === "app.py" ```python - from aws_lambda_powertools.utilities.data_classes.cognito_user_pool_event import DefineAuthChallengeTriggerEvent - - def handler(event: dict, context) -> dict: - event: DefineAuthChallengeTriggerEvent = DefineAuthChallengeTriggerEvent(event) - if ( - len(event.request.session) == 1 - and event.request.session[0].challenge_name == "SRP_A" - ): - event.response.issue_tokens = False - event.response.fail_authentication = False - event.response.challenge_name = "PASSWORD_VERIFIER" - elif ( - len(event.request.session) == 2 - and event.request.session[1].challenge_name == "PASSWORD_VERIFIER" - and event.request.session[1].challenge_result - ): - event.response.issue_tokens = False - event.response.fail_authentication = False - event.response.challenge_name = "CUSTOM_CHALLENGE" - elif ( - len(event.request.session) == 3 - and event.request.session[2].challenge_name == "CUSTOM_CHALLENGE" - and event.request.session[2].challenge_result - ): - event.response.issue_tokens = True - event.response.fail_authentication = False - else: - event.response.issue_tokens = False - event.response.fail_authentication = True - - return event.raw_event + --8<-- "docs/examples/utilities/data_classes/app_cognito_define_auth_challenge.py" ``` === "SPR_A response" @@ -704,16 +419,7 @@ This example is based on the AWS Cognito docs for [Create Auth Challenge Lambda === "app.py" ```python - from aws_lambda_powertools.utilities.data_classes import event_source - from aws_lambda_powertools.utilities.data_classes.cognito_user_pool_event import CreateAuthChallengeTriggerEvent - - @event_source(data_class=CreateAuthChallengeTriggerEvent) - def handler(event: CreateAuthChallengeTriggerEvent, context) -> dict: - if event.request.challenge_name == "CUSTOM_CHALLENGE": - event.response.public_challenge_parameters = {"captchaUrl": "url/123.jpg"} - event.response.private_challenge_parameters = {"answer": "5"} - event.response.challenge_metadata = "CAPTCHA_CHALLENGE" - return event.raw_event + --8<-- "docs/examples/utilities/data_classes/app_cognito_create_auth_challenge.py" ``` #### Verify Auth Challenge Response Example @@ -723,15 +429,7 @@ This example is based on the AWS Cognito docs for [Verify Auth Challenge Respons === "app.py" ```python - from aws_lambda_powertools.utilities.data_classes import event_source - from aws_lambda_powertools.utilities.data_classes.cognito_user_pool_event import VerifyAuthChallengeResponseTriggerEvent - - @event_source(data_class=VerifyAuthChallengeResponseTriggerEvent) - def handler(event: VerifyAuthChallengeResponseTriggerEvent, context) -> dict: - event.response.answer_correct = ( - event.request.private_challenge_parameters.get("answer") == event.request.challenge_answer - ) - return event.raw_event + --8<-- "docs/examples/utilities/data_classes/app_cognito_verify_auth_challenge_response.py" ``` ### Connect Contact Flow @@ -741,19 +439,7 @@ This example is based on the AWS Cognito docs for [Verify Auth Challenge Respons === "app.py" ```python - from aws_lambda_powertools.utilities.data_classes.connect_contact_flow_event import ( - ConnectContactFlowChannel, - ConnectContactFlowEndpointType, - ConnectContactFlowEvent, - ConnectContactFlowInitiationMethod, - ) - - def lambda_handler(event, context): - event: ConnectContactFlowEvent = ConnectContactFlowEvent(event) - assert event.contact_data.attributes == {"Language": "en-US"} - assert event.contact_data.channel == ConnectContactFlowChannel.VOICE - assert event.contact_data.customer_endpoint.endpoint_type == ConnectContactFlowEndpointType.TELEPHONE_NUMBER - assert event.contact_data.initiation_method == ConnectContactFlowInitiationMethod.API + --8<-- "docs/examples/utilities/data_classes/app_connect_contact_flow.py" ``` ### DynamoDB Streams @@ -765,40 +451,13 @@ attributes values (`AttributeValue`), as well as enums for stream view type (`St === "app.py" ```python - from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import ( - DynamoDBStreamEvent, - DynamoDBRecordEventName - ) - - def lambda_handler(event, context): - event: DynamoDBStreamEvent = DynamoDBStreamEvent(event) - - # Multiple records can be delivered in a single event - for record in event.records: - if record.event_name == DynamoDBRecordEventName.MODIFY: - do_something_with(record.dynamodb.new_image) - do_something_with(record.dynamodb.old_image) + --8<-- "docs/examples/utilities/data_classes/app_dynamodb.py" ``` === "multiple_records_types.py" ```python - from aws_lambda_powertools.utilities.data_classes import event_source, DynamoDBStreamEvent - from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import AttributeValueType, AttributeValue - from aws_lambda_powertools.utilities.typing import LambdaContext - - - @event_source(data_class=DynamoDBStreamEvent) - def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext): - for record in event.records: - key: AttributeValue = record.dynamodb.keys["id"] - if key == AttributeValueType.Number: - # {"N": "123.45"} => "123.45" - assert key.get_value == key.n_value - print(key.get_value) - elif key == AttributeValueType.Map: - assert key.get_value == key.map_value - print(key.get_value) + --8<-- "docs/examples/utilities/data_classes/app_dynamodb_multiple_records_types.py" ``` ### EventBridge @@ -806,12 +465,7 @@ attributes values (`AttributeValue`), as well as enums for stream view type (`St === "app.py" ```python - from aws_lambda_powertools.utilities.data_classes import event_source, EventBridgeEvent - - @event_source(data_class=EventBridgeEvent) - def lambda_handler(event: EventBridgeEvent, context): - do_something_with(event.detail) - + --8<-- "docs/examples/utilities/data_classes/app_event_bridge.py" ``` ### Kinesis streams @@ -822,19 +476,7 @@ or plain text, depending on the original payload. === "app.py" ```python - from aws_lambda_powertools.utilities.data_classes import event_source, KinesisStreamEvent - - @event_source(data_class=KinesisStreamEvent) - def lambda_handler(event: KinesisStreamEvent, context): - kinesis_record = next(event.records).kinesis - - # if data was delivered as text - data = kinesis_record.data_as_text() - - # if data was delivered as json - data = kinesis_record.data_as_json() - - do_something_with(data) + --8<-- "docs/examples/utilities/data_classes/app_kinesis_data_streams.py" ``` ### Rabbit MQ @@ -845,23 +487,8 @@ for more details. === "app.py" - ```python hl_lines="4-5 9-10" - from typing import Dict - - from aws_lambda_powertools import Logger - from aws_lambda_powertools.utilities.data_classes import event_source - from aws_lambda_powertools.utilities.data_classes.rabbit_mq_event import RabbitMQEvent - - logger = Logger() - - @event_source(data_class=RabbitMQEvent) - def lambda_handler(event: RabbitMQEvent, context): - for queue_name, messages in event.rmq_messages_by_queue.items(): - logger.debug(f"Messages for queue: {queue_name}") - for message in messages: - logger.debug(f"MessageID: {message.basic_properties.message_id}") - data: Dict = message.json_data - logger.debug("Process json in base64 encoded data str", data) + ```python hl_lines="4-5 10-11" + --8<-- "docs/examples/utilities/data_classes/app_rabbit_mq.py" ``` ### S3 @@ -869,18 +496,7 @@ for more details. === "app.py" ```python - from urllib.parse import unquote_plus - from aws_lambda_powertools.utilities.data_classes import event_source, S3Event - - @event_source(data_class=S3Event) - def lambda_handler(event: S3Event, context): - bucket_name = event.bucket_name - - # Multiple records can be delivered in a single event - for record in event.records: - object_key = unquote_plus(record.s3.get_object.key) - - do_something_with(f"{bucket_name}/{object_key}") + --8<-- "docs/examples/utilities/data_classes/app_s3.py" ``` ### S3 Object Lambda @@ -889,35 +505,8 @@ This example is based on the AWS Blog post [Introducing Amazon S3 Object Lambda === "app.py" - ```python hl_lines="5-6 12 14" - import boto3 - import requests - - from aws_lambda_powertools import Logger - from aws_lambda_powertools.logging.correlation_paths import S3_OBJECT_LAMBDA - from aws_lambda_powertools.utilities.data_classes.s3_object_event import S3ObjectLambdaEvent - - logger = Logger() - session = boto3.Session() - s3 = session.client("s3") - - @logger.inject_lambda_context(correlation_id_path=S3_OBJECT_LAMBDA, log_event=True) - def lambda_handler(event, context): - event = S3ObjectLambdaEvent(event) - - # Get object from S3 - response = requests.get(event.input_s3_url) - original_object = response.content.decode("utf-8") - - # Make changes to the object about to be returned - transformed_object = original_object.upper() - - # Write object back to S3 Object Lambda - s3.write_get_object_response( - Body=transformed_object, RequestRoute=event.request_route, RequestToken=event.request_token - ) - - return {"status_code": 200} + ```python hl_lines="5-6 13 15" + --8<-- "docs/examples/utilities/data_classes/app_s3_object_lambda.py" ``` ### SES @@ -925,16 +514,7 @@ This example is based on the AWS Blog post [Introducing Amazon S3 Object Lambda === "app.py" ```python - from aws_lambda_powertools.utilities.data_classes import event_source, SESEvent - - @event_source(data_class=SESEvent) - def lambda_handler(event: SESEvent, context): - # Multiple records can be delivered in a single event - for record in event.records: - mail = record.ses.mail - common_headers = mail.common_headers - - do_something_with(common_headers.to, common_headers.subject) + --8<-- "docs/examples/utilities/data_classes/app_ses.py" ``` ### SNS @@ -942,16 +522,7 @@ This example is based on the AWS Blog post [Introducing Amazon S3 Object Lambda === "app.py" ```python - from aws_lambda_powertools.utilities.data_classes import event_source, SNSEvent - - @event_source(data_class=SNSEvent) - def lambda_handler(event: SNSEvent, context): - # Multiple records can be delivered in a single event - for record in event.records: - message = record.sns.message - subject = record.sns.subject - - do_something_with(subject, message) + --8<-- "docs/examples/utilities/data_classes/app_sns.py" ``` ### SQS @@ -959,11 +530,5 @@ This example is based on the AWS Blog post [Introducing Amazon S3 Object Lambda === "app.py" ```python - from aws_lambda_powertools.utilities.data_classes import event_source, SQSEvent - - @event_source(data_class=SQSEvent) - def lambda_handler(event: SQSEvent, context): - # Multiple records can be delivered in a single event - for record in event.records: - do_something_with(record.body) + --8<-- "docs/examples/utilities/data_classes/app_sqs.py" ```