diff --git a/aws_lambda_powertools/utilities/data_classes/alb_event.py b/aws_lambda_powertools/utilities/data_classes/alb_event.py index 1e403d6f692..84cd51bc4b1 100644 --- a/aws_lambda_powertools/utilities/data_classes/alb_event.py +++ b/aws_lambda_powertools/utilities/data_classes/alb_event.py @@ -18,7 +18,7 @@ class ALBEventRequestContext(DictWrapper): @property def elb_target_group_arn(self) -> str: """Target group arn for your Lambda function""" - return self["requestContext"]["elb"]["targetGroupArn"] + return self["elb"]["targetGroupArn"] class ALBEvent(BaseProxyEvent): @@ -32,7 +32,7 @@ class ALBEvent(BaseProxyEvent): @property def request_context(self) -> ALBEventRequestContext: - return ALBEventRequestContext(self._data) + return ALBEventRequestContext(self["requestContext"]) @property def multi_value_query_string_parameters(self) -> dict[str, list[str]]: diff --git a/aws_lambda_powertools/utilities/data_classes/api_gateway_authorizer_event.py b/aws_lambda_powertools/utilities/data_classes/api_gateway_authorizer_event.py index f77cb467996..e9b77209860 100644 --- a/aws_lambda_powertools/utilities/data_classes/api_gateway_authorizer_event.py +++ b/aws_lambda_powertools/utilities/data_classes/api_gateway_authorizer_event.py @@ -183,7 +183,7 @@ def stage_variables(self) -> dict[str, str]: @property def request_context(self) -> BaseRequestContext: - return BaseRequestContext(self._data) + return BaseRequestContext(self["requestContext"]) @overload def get_header_value( @@ -306,7 +306,7 @@ def query_string_parameters(self) -> dict[str, str]: @property def request_context(self) -> BaseRequestContextV2: - return BaseRequestContextV2(self._data) + return BaseRequestContextV2(self["requestContext"]) @property def path_parameters(self) -> dict[str, str]: diff --git a/aws_lambda_powertools/utilities/data_classes/api_gateway_proxy_event.py b/aws_lambda_powertools/utilities/data_classes/api_gateway_proxy_event.py index f173742fff3..78a7b8fcd67 100644 --- a/aws_lambda_powertools/utilities/data_classes/api_gateway_proxy_event.py +++ b/aws_lambda_powertools/utilities/data_classes/api_gateway_proxy_event.py @@ -61,42 +61,41 @@ class APIGatewayEventRequestContext(BaseRequestContext): @property def connected_at(self) -> int | None: """The Epoch-formatted connection time. (WebSocket API)""" - return self["requestContext"].get("connectedAt") + return self.get("connectedAt") @property def connection_id(self) -> str | None: """A unique ID for the connection that can be used to make a callback to the client. (WebSocket API)""" - return self["requestContext"].get("connectionId") + return self.get("connectionId") @property def event_type(self) -> str | None: """The event type: `CONNECT`, `MESSAGE`, or `DISCONNECT`. (WebSocket API)""" - return self["requestContext"].get("eventType") + return self.get("eventType") @property def message_direction(self) -> str | None: """Message direction (WebSocket API)""" - return self["requestContext"].get("messageDirection") + return self.get("messageDirection") @property def message_id(self) -> str | None: """A unique server-side ID for a message. Available only when the `eventType` is `MESSAGE`.""" - return self["requestContext"].get("messageId") + return self.get("messageId") @property def operation_name(self) -> str | None: """The name of the operation being performed""" - return self["requestContext"].get("operationName") + return self.get("operationName") @property def route_key(self) -> str | None: """The selected route key.""" - return self["requestContext"].get("routeKey") + return self.get("routeKey") @property def authorizer(self) -> APIGatewayEventAuthorizer: - authz_data = self._data.get("requestContext", {}).get("authorizer", {}) - return APIGatewayEventAuthorizer(authz_data) + return APIGatewayEventAuthorizer(self.get("authorizer") or {}) class APIGatewayProxyEvent(BaseProxyEvent): @@ -136,7 +135,7 @@ def resolved_headers_field(self) -> dict[str, Any]: @property def request_context(self) -> APIGatewayEventRequestContext: - return APIGatewayEventRequestContext(self._data) + return APIGatewayEventRequestContext(self["requestContext"]) @property def path_parameters(self) -> dict[str, str]: @@ -248,8 +247,7 @@ def iam(self) -> RequestContextV2AuthorizerIam: class RequestContextV2(BaseRequestContextV2): @property def authorizer(self) -> RequestContextV2Authorizer: - ctx = self.get("requestContext") or {} # key might exist but can be `null` - return RequestContextV2Authorizer(ctx.get("authorizer", {})) + return RequestContextV2Authorizer(self.get("authorizer") or {}) class APIGatewayProxyEventV2(BaseProxyEvent): @@ -291,7 +289,7 @@ def cookies(self) -> list[str]: @property def request_context(self) -> RequestContextV2: - return RequestContextV2(self._data) + return RequestContextV2(self["requestContext"]) @property def path_parameters(self) -> dict[str, str]: diff --git a/aws_lambda_powertools/utilities/data_classes/appsync_authorizer_event.py b/aws_lambda_powertools/utilities/data_classes/appsync_authorizer_event.py index a111084b306..c8f8c0e9bbf 100644 --- a/aws_lambda_powertools/utilities/data_classes/appsync_authorizer_event.py +++ b/aws_lambda_powertools/utilities/data_classes/appsync_authorizer_event.py @@ -11,32 +11,32 @@ class AppSyncAuthorizerEventRequestContext(DictWrapper): @property def api_id(self) -> str: """AppSync API ID""" - return self["requestContext"]["apiId"] + return self["apiId"] @property def account_id(self) -> str: """AWS Account ID""" - return self["requestContext"]["accountId"] + return self["accountId"] @property def request_id(self) -> str: """Requestt ID""" - return self["requestContext"]["requestId"] + return self["requestId"] @property def query_string(self) -> str: """GraphQL query string""" - return self["requestContext"]["queryString"] + return self["queryString"] @property def operation_name(self) -> str | None: """GraphQL operation name, optional""" - return self["requestContext"].get("operationName") + return self.get("operationName") @property def variables(self) -> dict: """GraphQL variables""" - return self["requestContext"]["variables"] + return self["variables"] class AppSyncAuthorizerEvent(DictWrapper): @@ -57,7 +57,7 @@ def authorization_token(self) -> str: @property def request_context(self) -> AppSyncAuthorizerEventRequestContext: """Request context""" - return AppSyncAuthorizerEventRequestContext(self._data) + return AppSyncAuthorizerEventRequestContext(self["requestContext"]) class AppSyncAuthorizerResponse: diff --git a/aws_lambda_powertools/utilities/data_classes/cognito_user_pool_event.py b/aws_lambda_powertools/utilities/data_classes/cognito_user_pool_event.py index 0734a98750e..ddac1109dcc 100644 --- a/aws_lambda_powertools/utilities/data_classes/cognito_user_pool_event.py +++ b/aws_lambda_powertools/utilities/data_classes/cognito_user_pool_event.py @@ -9,12 +9,12 @@ class CallerContext(DictWrapper): @property def aws_sdk_version(self) -> str: """The AWS SDK version number.""" - return self["callerContext"]["awsSdkVersion"] + return self["awsSdkVersion"] @property def client_id(self) -> str: """The ID of the client associated with the user pool.""" - return self["callerContext"]["clientId"] + return self["clientId"] class BaseTriggerEvent(DictWrapper): @@ -53,54 +53,54 @@ def user_name(self) -> str: @property def caller_context(self) -> CallerContext: """The caller context""" - return CallerContext(self._data) + return CallerContext(self["callerContext"]) class PreSignUpTriggerEventRequest(DictWrapper): @property def user_attributes(self) -> dict[str, str]: """One or more name-value pairs representing user attributes. The attribute names are the keys.""" - return self["request"]["userAttributes"] + return self["userAttributes"] @property def validation_data(self) -> dict[str, str]: """One or more name-value pairs containing the validation data in the request to register a user.""" - return self["request"].get("validationData") or {} + return self.get("validationData") or {} @property def client_metadata(self) -> dict[str, str]: """One or more key-value pairs that you can provide as custom input to the Lambda function that you specify for the pre sign-up trigger.""" - return self["request"].get("clientMetadata") or {} + return self.get("clientMetadata") or {} class PreSignUpTriggerEventResponse(DictWrapper): @property def auto_confirm_user(self) -> bool: - return bool(self["response"]["autoConfirmUser"]) + return bool(self["autoConfirmUser"]) @auto_confirm_user.setter def auto_confirm_user(self, value: bool): """Set to true to auto-confirm the user, or false otherwise.""" - self["response"]["autoConfirmUser"] = value + self._data["autoConfirmUser"] = value @property def auto_verify_email(self) -> bool: - return bool(self["response"]["autoVerifyEmail"]) + return bool(self["autoVerifyEmail"]) @auto_verify_email.setter def auto_verify_email(self, value: bool): """Set to true to set as verified the email of a user who is signing up, or false otherwise.""" - self["response"]["autoVerifyEmail"] = value + self._data["autoVerifyEmail"] = value @property def auto_verify_phone(self) -> bool: - return bool(self["response"]["autoVerifyPhone"]) + return bool(self["autoVerifyPhone"]) @auto_verify_phone.setter def auto_verify_phone(self, value: bool): """Set to true to set as verified the phone number of a user who is signing up, or false otherwise.""" - self["response"]["autoVerifyPhone"] = value + self._data["autoVerifyPhone"] = value class PreSignUpTriggerEvent(BaseTriggerEvent): @@ -121,24 +121,24 @@ class PreSignUpTriggerEvent(BaseTriggerEvent): @property def request(self) -> PreSignUpTriggerEventRequest: - return PreSignUpTriggerEventRequest(self._data) + return PreSignUpTriggerEventRequest(self["request"]) @property def response(self) -> PreSignUpTriggerEventResponse: - return PreSignUpTriggerEventResponse(self._data) + return PreSignUpTriggerEventResponse(self["response"]) class PostConfirmationTriggerEventRequest(DictWrapper): @property def user_attributes(self) -> dict[str, str]: """One or more name-value pairs representing user attributes. The attribute names are the keys.""" - return self["request"]["userAttributes"] + return self["userAttributes"] @property def client_metadata(self) -> dict[str, str]: """One or more key-value pairs that you can provide as custom input to the Lambda function that you specify for the post confirmation trigger.""" - return self["request"].get("clientMetadata") or {} + return self.get("clientMetadata") or {} class PostConfirmationTriggerEvent(BaseTriggerEvent): @@ -158,41 +158,41 @@ class PostConfirmationTriggerEvent(BaseTriggerEvent): @property def request(self) -> PostConfirmationTriggerEventRequest: - return PostConfirmationTriggerEventRequest(self._data) + return PostConfirmationTriggerEventRequest(self["request"]) class UserMigrationTriggerEventRequest(DictWrapper): @property def password(self) -> str: - return self["request"]["password"] + return self["password"] @property def validation_data(self) -> dict[str, str]: """One or more name-value pairs containing the validation data in the request to register a user.""" - return self["request"].get("validationData") or {} + return self.get("validationData") or {} @property def client_metadata(self) -> dict[str, str]: """One or more key-value pairs that you can provide as custom input to the Lambda function that you specify for the pre sign-up trigger.""" - return self["request"].get("clientMetadata") or {} + return self.get("clientMetadata") or {} class UserMigrationTriggerEventResponse(DictWrapper): @property def user_attributes(self) -> dict[str, str]: - return self["response"]["userAttributes"] + return self["userAttributes"] @user_attributes.setter def user_attributes(self, value: dict[str, str]): """It must contain one or more name-value pairs representing user attributes to be stored in the user profile in your user pool. You can include both standard and custom user attributes. Custom attributes require the custom: prefix to distinguish them from standard attributes.""" - self["response"]["userAttributes"] = value + self._data["userAttributes"] = value @property def final_user_status(self) -> str | None: - return self["response"].get("finalUserStatus") + return self.get("finalUserStatus") @final_user_status.setter def final_user_status(self, value: str): @@ -202,31 +202,31 @@ def final_user_status(self, value: str): If this attribute is set to RESET_REQUIRED, the user is required to change his or her password immediately after migration at the time of sign-in, and your client app needs to handle the PasswordResetRequiredException during the authentication flow.""" - self["response"]["finalUserStatus"] = value + self._data["finalUserStatus"] = value @property def message_action(self) -> str | None: - return self["response"].get("messageAction") + return self.get("messageAction") @message_action.setter def message_action(self, value: str): """This attribute can be set to "SUPPRESS" to suppress the welcome message usually sent by Amazon Cognito to new users. If this attribute is not returned, the welcome message will be sent.""" - self["response"]["messageAction"] = value + self._data["messageAction"] = value @property def desired_delivery_mediums(self) -> list[str]: - return self["response"].get("desiredDeliveryMediums") or [] + return self.get("desiredDeliveryMediums") or [] @desired_delivery_mediums.setter def desired_delivery_mediums(self, value: list[str]): """This attribute can be set to "EMAIL" to send the welcome message by email, or "SMS" to send the welcome message by SMS. If this attribute is not returned, the welcome message will be sent by SMS.""" - self["response"]["desiredDeliveryMediums"] = value + self._data["desiredDeliveryMediums"] = value @property def force_alias_creation(self) -> bool | None: - return self["response"].get("forceAliasCreation") + return self.get("forceAliasCreation") @force_alias_creation.setter def force_alias_creation(self, value: bool): @@ -239,11 +239,11 @@ def force_alias_creation(self, value: bool): If this attribute is not returned, it is assumed to be "false". """ - self["response"]["forceAliasCreation"] = value + self._data["forceAliasCreation"] = value @property def enable_sms_mfa(self) -> bool | None: - return self["response"].get("enableSMSMFA") + return self.get("enableSMSMFA") @enable_sms_mfa.setter def enable_sms_mfa(self, value: bool): @@ -251,7 +251,7 @@ def enable_sms_mfa(self, value: bool): authentication (MFA) to sign in. Your user pool must have MFA enabled. Your user's attributes in the request parameters must include a phone number, or else the migration of that user will fail. """ - self["response"]["enableSMSMFA"] = value + self._data["enableSMSMFA"] = value class UserMigrationTriggerEvent(BaseTriggerEvent): @@ -271,70 +271,70 @@ class UserMigrationTriggerEvent(BaseTriggerEvent): @property def request(self) -> UserMigrationTriggerEventRequest: - return UserMigrationTriggerEventRequest(self._data) + return UserMigrationTriggerEventRequest(self["request"]) @property def response(self) -> UserMigrationTriggerEventResponse: - return UserMigrationTriggerEventResponse(self._data) + return UserMigrationTriggerEventResponse(self["response"]) class CustomMessageTriggerEventRequest(DictWrapper): @property def code_parameter(self) -> str: """A string for you to use as the placeholder for the verification code in the custom message.""" - return self["request"]["codeParameter"] + return self["codeParameter"] @property def link_parameter(self) -> str: """A string for you to use as a placeholder for the verification link in the custom message.""" - return self["request"]["linkParameter"] + return self["linkParameter"] @property def username_parameter(self) -> str: """The username parameter. It is a required request parameter for the admin create user flow.""" - return self["request"]["usernameParameter"] + return self["usernameParameter"] @property def user_attributes(self) -> dict[str, str]: """One or more name-value pairs representing user attributes. The attribute names are the keys.""" - return self["request"]["userAttributes"] + return self["userAttributes"] @property def client_metadata(self) -> dict[str, str]: """One or more key-value pairs that you can provide as custom input to the Lambda function that you specify for the pre sign-up trigger.""" - return self["request"].get("clientMetadata") or {} + return self.get("clientMetadata") or {} class CustomMessageTriggerEventResponse(DictWrapper): @property def sms_message(self) -> str: - return self["response"]["smsMessage"] + return self["smsMessage"] @sms_message.setter def sms_message(self, value: str): """The custom SMS message to be sent to your users. Must include the codeParameter value received in the request.""" - self["response"]["smsMessage"] = value + self._data["smsMessage"] = value @property def email_message(self) -> str: - return self["response"]["emailMessage"] + return self["emailMessage"] @email_message.setter def email_message(self, value: str): """The custom email message to be sent to your users. Must include the codeParameter value received in the request.""" - self["response"]["emailMessage"] = value + self._data["emailMessage"] = value @property def email_subject(self) -> str: - return self["response"]["emailSubject"] + return self["emailSubject"] @email_subject.setter def email_subject(self, value: str): """The subject line for the custom message.""" - self["response"]["emailSubject"] = value + self._data["emailSubject"] = value class CustomMessageTriggerEvent(BaseTriggerEvent): @@ -361,28 +361,28 @@ class CustomMessageTriggerEvent(BaseTriggerEvent): @property def request(self) -> CustomMessageTriggerEventRequest: - return CustomMessageTriggerEventRequest(self._data) + return CustomMessageTriggerEventRequest(self["request"]) @property def response(self) -> CustomMessageTriggerEventResponse: - return CustomMessageTriggerEventResponse(self._data) + return CustomMessageTriggerEventResponse(self["response"]) class PreAuthenticationTriggerEventRequest(DictWrapper): @property def user_not_found(self) -> bool | None: """This boolean is populated when PreventUserExistenceErrors is set to ENABLED for your User Pool client.""" - return self["request"].get("userNotFound") + return self.get("userNotFound") @property def user_attributes(self) -> dict[str, str]: """One or more name-value pairs representing user attributes.""" - return self["request"]["userAttributes"] + return self["userAttributes"] @property def validation_data(self) -> dict[str, str]: """One or more key-value pairs containing the validation data in the user's sign-in request.""" - return self["request"].get("validationData") or {} + return self.get("validationData") or {} class PreAuthenticationTriggerEvent(BaseTriggerEvent): @@ -405,7 +405,7 @@ class PreAuthenticationTriggerEvent(BaseTriggerEvent): @property def request(self) -> PreAuthenticationTriggerEventRequest: """Pre Authentication Request Parameters""" - return PreAuthenticationTriggerEventRequest(self._data) + return PreAuthenticationTriggerEventRequest(self["request"]) class PostAuthenticationTriggerEventRequest(DictWrapper): @@ -413,18 +413,18 @@ class PostAuthenticationTriggerEventRequest(DictWrapper): def new_device_used(self) -> bool: """This flag indicates if the user has signed in on a new device. It is set only if the remembered devices value of the user pool is set to `Always` or User `Opt-In`.""" - return self["request"]["newDeviceUsed"] + return self["newDeviceUsed"] @property def user_attributes(self) -> dict[str, str]: """One or more name-value pairs representing user attributes.""" - return self["request"]["userAttributes"] + return self["userAttributes"] @property def client_metadata(self) -> dict[str, str]: """One or more key-value pairs that you can provide as custom input to the Lambda function that you specify for the post authentication trigger.""" - return self["request"].get("clientMetadata") or {} + return self.get("clientMetadata") or {} class PostAuthenticationTriggerEvent(BaseTriggerEvent): @@ -447,7 +447,7 @@ class PostAuthenticationTriggerEvent(BaseTriggerEvent): @property def request(self) -> PostAuthenticationTriggerEventRequest: """Post Authentication Request Parameters""" - return PostAuthenticationTriggerEventRequest(self._data) + return PostAuthenticationTriggerEventRequest(self["request"]) class GroupOverrideDetails(DictWrapper): @@ -471,18 +471,18 @@ class PreTokenGenerationTriggerEventRequest(DictWrapper): @property def group_configuration(self) -> GroupOverrideDetails: """The input object containing the current group configuration""" - return GroupOverrideDetails(self["request"]["groupConfiguration"]) + return GroupOverrideDetails(self["groupConfiguration"]) @property def user_attributes(self) -> dict[str, str]: """One or more name-value pairs representing user attributes.""" - return self["request"].get("userAttributes") or {} + return self.get("userAttributes") or {} @property def client_metadata(self) -> dict[str, str]: """One or more key-value pairs that you can provide as custom input to the Lambda function that you specify for the pre token generation trigger.""" - return self["request"].get("clientMetadata") or {} + return self.get("clientMetadata") or {} class PreTokenGenerationTriggerV2EventRequest(PreTokenGenerationTriggerEventRequest): @@ -492,7 +492,7 @@ def scopes(self) -> list[str]: the user pool standard and custom scopes that your user requested, and that you authorized your app client to issue. """ - return self["request"].get("scopes") + return self.get("scopes") or [] class ClaimsOverrideDetails(DictWrapper): @@ -669,19 +669,13 @@ def set_group_configuration_preferred_role(self, value: str): class PreTokenGenerationTriggerEventResponse(DictWrapper): @property def claims_override_details(self) -> ClaimsOverrideDetails: - # Ensure we have a `claimsOverrideDetails` element and is not set to None - if self._data["response"].get("claimsOverrideDetails") is None: - self._data["response"]["claimsOverrideDetails"] = {} - return ClaimsOverrideDetails(self._data["response"]["claimsOverrideDetails"]) + return ClaimsOverrideDetails(self.get("claimsOverrideDetails") or {}) class PreTokenGenerationTriggerV2EventResponse(DictWrapper): @property def claims_scope_override_details(self) -> ClaimsAndScopeOverrideDetails: - # Ensure we have a `claimsAndScopeOverrideDetails` element and is not set to None - if self._data["response"].get("claimsAndScopeOverrideDetails") is None: - self._data["response"]["claimsAndScopeOverrideDetails"] = {} - return ClaimsAndScopeOverrideDetails(self._data["response"]["claimsAndScopeOverrideDetails"]) + return ClaimsAndScopeOverrideDetails(self.get("claimsAndScopeOverrideDetails") or {}) class PreTokenGenerationTriggerEvent(BaseTriggerEvent): @@ -708,12 +702,12 @@ class PreTokenGenerationTriggerEvent(BaseTriggerEvent): @property def request(self) -> PreTokenGenerationTriggerEventRequest: """Pre Token Generation Request Parameters""" - return PreTokenGenerationTriggerEventRequest(self._data) + return PreTokenGenerationTriggerEventRequest(self["request"]) @property def response(self) -> PreTokenGenerationTriggerEventResponse: """Pre Token Generation Response Parameters""" - return PreTokenGenerationTriggerEventResponse(self._data) + return PreTokenGenerationTriggerEventResponse(self["response"]) class PreTokenGenerationV2TriggerEvent(BaseTriggerEvent): @@ -740,12 +734,12 @@ class PreTokenGenerationV2TriggerEvent(BaseTriggerEvent): @property def request(self) -> PreTokenGenerationTriggerV2EventRequest: """Pre Token Generation Request V2 Parameters""" - return PreTokenGenerationTriggerV2EventRequest(self._data) + return PreTokenGenerationTriggerV2EventRequest(self["request"]) @property def response(self) -> PreTokenGenerationTriggerV2EventResponse: """Pre Token Generation Response V2 Parameters""" - return PreTokenGenerationTriggerV2EventResponse(self._data) + return PreTokenGenerationTriggerV2EventResponse(self["response"]) class ChallengeResult(DictWrapper): @@ -772,55 +766,55 @@ class DefineAuthChallengeTriggerEventRequest(DictWrapper): @property def user_attributes(self) -> dict[str, str]: """One or more name-value pairs representing user attributes. The attribute names are the keys.""" - return self["request"]["userAttributes"] + return self["userAttributes"] @property def user_not_found(self) -> bool | None: """A Boolean that is populated when PreventUserExistenceErrors is set to ENABLED for your user pool client. A value of true means that the user id (username, email address, etc.) did not match any existing users.""" - return self["request"].get("userNotFound") + return self.get("userNotFound") @property def session(self) -> list[ChallengeResult]: """An array of ChallengeResult elements, each of which contains the following elements:""" - return [ChallengeResult(result) for result in self["request"]["session"]] + return [ChallengeResult(result) for result in self["session"]] @property def client_metadata(self) -> dict[str, str]: """One or more key-value pairs that you can provide as custom input to the Lambda function that you specify for the defined auth challenge trigger.""" - return self["request"].get("clientMetadata") or {} + return self.get("clientMetadata") or {} class DefineAuthChallengeTriggerEventResponse(DictWrapper): @property def challenge_name(self) -> str: - return self["response"]["challengeName"] + return self["challengeName"] @challenge_name.setter def challenge_name(self, value: str): """A string containing the name of the next challenge. If you want to present a new challenge to your user, specify the challenge name here.""" - self["response"]["challengeName"] = value + self._data["challengeName"] = value @property def fail_authentication(self) -> bool: - return bool(self["response"]["failAuthentication"]) + return bool(self["failAuthentication"]) @fail_authentication.setter def fail_authentication(self, value: bool): """Set to true if you want to terminate the current authentication process, or false otherwise.""" - self["response"]["failAuthentication"] = value + self._data["failAuthentication"] = value @property def issue_tokens(self) -> bool: - return bool(self["response"]["issueTokens"]) + return bool(self["issueTokens"]) @issue_tokens.setter def issue_tokens(self, value: bool): """Set to true if you determine that the user has been sufficiently authenticated by completing the challenges, or false otherwise.""" - self["response"]["issueTokens"] = value + self._data["issueTokens"] = value class DefineAuthChallengeTriggerEvent(BaseTriggerEvent): @@ -842,57 +836,57 @@ class DefineAuthChallengeTriggerEvent(BaseTriggerEvent): @property def request(self) -> DefineAuthChallengeTriggerEventRequest: """Define Auth Challenge Request Parameters""" - return DefineAuthChallengeTriggerEventRequest(self._data) + return DefineAuthChallengeTriggerEventRequest(self["request"]) @property def response(self) -> DefineAuthChallengeTriggerEventResponse: """Define Auth Challenge Response Parameters""" - return DefineAuthChallengeTriggerEventResponse(self._data) + return DefineAuthChallengeTriggerEventResponse(self["response"]) class CreateAuthChallengeTriggerEventRequest(DictWrapper): @property def user_attributes(self) -> dict[str, str]: """One or more name-value pairs representing user attributes. The attribute names are the keys.""" - return self["request"]["userAttributes"] + return self["userAttributes"] @property def user_not_found(self) -> bool | None: """This boolean is populated when PreventUserExistenceErrors is set to ENABLED for your User Pool client.""" - return self["request"].get("userNotFound") + return self.get("userNotFound") @property def challenge_name(self) -> str: """The name of the new challenge.""" - return self["request"]["challengeName"] + return self["challengeName"] @property def session(self) -> list[ChallengeResult]: """An array of ChallengeResult elements, each of which contains the following elements:""" - return [ChallengeResult(result) for result in self["request"]["session"]] + return [ChallengeResult(result) for result in self["session"]] @property def client_metadata(self) -> dict[str, str]: """One or more key-value pairs that you can provide as custom input to the Lambda function that you specify for the creation auth challenge trigger.""" - return self["request"].get("clientMetadata") or {} + return self.get("clientMetadata") or {} class CreateAuthChallengeTriggerEventResponse(DictWrapper): @property def public_challenge_parameters(self) -> dict[str, str]: - return self["response"]["publicChallengeParameters"] + return self["publicChallengeParameters"] @public_challenge_parameters.setter def public_challenge_parameters(self, value: dict[str, str]): """One or more key-value pairs for the client app to use in the challenge to be presented to the user. This parameter should contain all the necessary information to accurately present the challenge to the user.""" - self["response"]["publicChallengeParameters"] = value + self._data["publicChallengeParameters"] = value @property def private_challenge_parameters(self) -> dict[str, str]: - return self["response"]["privateChallengeParameters"] + return self["privateChallengeParameters"] @private_challenge_parameters.setter def private_challenge_parameters(self, value: dict[str, str]): @@ -901,16 +895,16 @@ def private_challenge_parameters(self, value: dict[str, str]): response to the challenge. In other words, the publicChallengeParameters parameter contains the question that is presented to the user and privateChallengeParameters contains the valid answers for the question.""" - self["response"]["privateChallengeParameters"] = value + self._data["privateChallengeParameters"] = value @property def challenge_metadata(self) -> str: - return self["response"]["challengeMetadata"] + return self["challengeMetadata"] @challenge_metadata.setter def challenge_metadata(self, value: str): """Your name for the custom challenge, if this is a custom challenge.""" - self["response"]["challengeMetadata"] = value + self._data["challengeMetadata"] = value class CreateAuthChallengeTriggerEvent(BaseTriggerEvent): @@ -934,52 +928,52 @@ class CreateAuthChallengeTriggerEvent(BaseTriggerEvent): @property def request(self) -> CreateAuthChallengeTriggerEventRequest: """Create Auth Challenge Request Parameters""" - return CreateAuthChallengeTriggerEventRequest(self._data) + return CreateAuthChallengeTriggerEventRequest(self["request"]) @property def response(self) -> CreateAuthChallengeTriggerEventResponse: """Create Auth Challenge Response Parameters""" - return CreateAuthChallengeTriggerEventResponse(self._data) + return CreateAuthChallengeTriggerEventResponse(self["response"]) class VerifyAuthChallengeResponseTriggerEventRequest(DictWrapper): @property def user_attributes(self) -> dict[str, str]: """One or more name-value pairs representing user attributes. The attribute names are the keys.""" - return self["request"]["userAttributes"] + return self["userAttributes"] @property def private_challenge_parameters(self) -> dict[str, str]: """This parameter comes from the Create Auth Challenge trigger, and is compared against a user’s challengeAnswer to determine whether the user passed the challenge.""" - return self["request"]["privateChallengeParameters"] + return self["privateChallengeParameters"] @property def challenge_answer(self) -> Any: """The answer from the user's response to the challenge.""" - return self["request"]["challengeAnswer"] + return self["challengeAnswer"] @property def client_metadata(self) -> dict[str, str]: """One or more key-value pairs that you can provide as custom input to the Lambda function that you specify for the "Verify Auth Challenge" trigger.""" - return self["request"].get("clientMetadata") or {} + return self.get("clientMetadata") or {} @property def user_not_found(self) -> bool | None: """This boolean is populated when PreventUserExistenceErrors is set to ENABLED for your User Pool client.""" - return self["request"].get("userNotFound") + return self.get("userNotFound") class VerifyAuthChallengeResponseTriggerEventResponse(DictWrapper): @property def answer_correct(self) -> bool: - return bool(self["response"]["answerCorrect"]) + return bool(self["answerCorrect"]) @answer_correct.setter def answer_correct(self, value: bool): """Set to true if the user has successfully completed the challenge, or false otherwise.""" - self["response"]["answerCorrect"] = value + self._data["answerCorrect"] = value class VerifyAuthChallengeResponseTriggerEvent(BaseTriggerEvent): @@ -1003,12 +997,12 @@ class VerifyAuthChallengeResponseTriggerEvent(BaseTriggerEvent): @property def request(self) -> VerifyAuthChallengeResponseTriggerEventRequest: """Verify Auth Challenge Request Parameters""" - return VerifyAuthChallengeResponseTriggerEventRequest(self._data) + return VerifyAuthChallengeResponseTriggerEventRequest(self["request"]) @property def response(self) -> VerifyAuthChallengeResponseTriggerEventResponse: """Verify Auth Challenge Response Parameters""" - return VerifyAuthChallengeResponseTriggerEventResponse(self._data) + return VerifyAuthChallengeResponseTriggerEventResponse(self["response"]) class CustomEmailSenderTriggerEventRequest(DictWrapper): @@ -1017,17 +1011,17 @@ def type(self) -> str: """The request version. For a custom email sender event, the value of this string is always customEmailSenderRequestV1. """ - return self["request"]["type"] + return self["type"] @property def code(self) -> str: """The encrypted code that your function can decrypt and send to your user.""" - return self["request"]["code"] + return self["code"] @property def user_attributes(self) -> dict[str, str]: """One or more name-value pairs representing user attributes. The attribute names are the keys.""" - return self["request"]["userAttributes"] + return self["userAttributes"] @property def client_metadata(self) -> dict[str, str]: @@ -1038,14 +1032,14 @@ def client_metadata(self) -> dict[str, str]: ClientMetadata parameter in AdminInitiateAuth and InitiateAuth API operations in the request that it passes to the post authentication function. """ - return self["request"].get("clientMetadata") or {} + return self.get("clientMetadata") or {} class CustomEmailSenderTriggerEvent(BaseTriggerEvent): @property def request(self) -> CustomEmailSenderTriggerEventRequest: """Custom Email Sender Request Parameters""" - return CustomEmailSenderTriggerEventRequest(self._data) + return CustomEmailSenderTriggerEventRequest(self["request"]) class CustomSMSSenderTriggerEventRequest(DictWrapper): @@ -1054,17 +1048,17 @@ def type(self) -> str: """The request version. For a custom SMS sender event, the value of this string is always customSMSSenderRequestV1. """ - return self["request"]["type"] + return self["type"] @property def code(self) -> str: """The encrypted code that your function can decrypt and send to your user.""" - return self["request"]["code"] + return self["code"] @property def user_attributes(self) -> dict[str, str]: """One or more name-value pairs representing user attributes. The attribute names are the keys.""" - return self["request"].get("userAttributes") or {} + return self.get("userAttributes") or {} @property def client_metadata(self) -> dict[str, str]: @@ -1075,11 +1069,11 @@ def client_metadata(self) -> dict[str, str]: ClientMetadata parameter in AdminInitiateAuth and InitiateAuth API operations in the request that it passes to the post authentication function. """ - return self["request"].get("clientMetadata") or {} + return self.get("clientMetadata") or {} class CustomSMSSenderTriggerEvent(BaseTriggerEvent): @property def request(self) -> CustomSMSSenderTriggerEventRequest: """Custom SMS Sender Request Parameters""" - return CustomSMSSenderTriggerEventRequest(self._data) + return CustomSMSSenderTriggerEventRequest(self["request"]) diff --git a/aws_lambda_powertools/utilities/data_classes/common.py b/aws_lambda_powertools/utilities/data_classes/common.py index 8374bf5ee08..5af24c27a13 100644 --- a/aws_lambda_powertools/utilities/data_classes/common.py +++ b/aws_lambda_powertools/utilities/data_classes/common.py @@ -205,7 +205,6 @@ def json_body(self) -> Any: """Parses the submitted body as json""" if self.decoded_body: return self._json_deserializer(self.decoded_body) - return None @cached_property @@ -370,81 +369,81 @@ def validity_not_before(self) -> str: class APIGatewayEventIdentity(DictWrapper): @property def access_key(self) -> str | None: - return self["requestContext"]["identity"].get("accessKey") + return self.get("accessKey") @property def account_id(self) -> str | None: """The AWS account ID associated with the request.""" - return self["requestContext"]["identity"].get("accountId") + return self.get("accountId") @property def api_key(self) -> str | None: """For API methods that require an API key, this variable is the API key associated with the method request. For methods that don't require an API key, this variable is null.""" - return self["requestContext"]["identity"].get("apiKey") + return self.get("apiKey") @property def api_key_id(self) -> str | None: """The API key ID associated with an API request that requires an API key.""" - return self["requestContext"]["identity"].get("apiKeyId") + return self.get("apiKeyId") @property def caller(self) -> str | None: """The principal identifier of the caller making the request.""" - return self["requestContext"]["identity"].get("caller") + return self.get("caller") @property def cognito_authentication_provider(self) -> str | None: """A comma-separated list of the Amazon Cognito authentication providers used by the caller making the request. Available only if the request was signed with Amazon Cognito credentials.""" - return self["requestContext"]["identity"].get("cognitoAuthenticationProvider") + return self.get("cognitoAuthenticationProvider") @property def cognito_authentication_type(self) -> str | None: """The Amazon Cognito authentication type of the caller making the request. Available only if the request was signed with Amazon Cognito credentials.""" - return self["requestContext"]["identity"].get("cognitoAuthenticationType") + return self.get("cognitoAuthenticationType") @property def cognito_identity_id(self) -> str | None: """The Amazon Cognito identity ID of the caller making the request. Available only if the request was signed with Amazon Cognito credentials.""" - return self["requestContext"]["identity"].get("cognitoIdentityId") + return self.get("cognitoIdentityId") @property def cognito_identity_pool_id(self) -> str | None: """The Amazon Cognito identity pool ID of the caller making the request. Available only if the request was signed with Amazon Cognito credentials.""" - return self["requestContext"]["identity"].get("cognitoIdentityPoolId") + return self.get("cognitoIdentityPoolId") @property def principal_org_id(self) -> str | None: """The AWS organization ID.""" - return self["requestContext"]["identity"].get("principalOrgId") + return self.get("principalOrgId") @property def source_ip(self) -> str: """The source IP address of the TCP connection making the request to API Gateway.""" - return self["requestContext"]["identity"]["sourceIp"] + return self["sourceIp"] @property def user(self) -> str | None: """The principal identifier of the user making the request.""" - return self["requestContext"]["identity"].get("user") + return self.get("user") @property def user_agent(self) -> str | None: """The User Agent of the API caller.""" - return self["requestContext"]["identity"].get("userAgent") + return self.get("userAgent") @property def user_arn(self) -> str | None: """The Amazon Resource Name (ARN) of the effective user identified after authentication.""" - return self["requestContext"]["identity"].get("userArn") + return self.get("userArn") @property def client_cert(self) -> RequestContextClientCert | None: - client_cert = self["requestContext"]["identity"].get("clientCert") + client_cert = self.get("clientCert") return None if client_cert is None else RequestContextClientCert(client_cert) @@ -452,153 +451,153 @@ class BaseRequestContext(DictWrapper): @property def account_id(self) -> str: """The AWS account ID associated with the request.""" - return self["requestContext"]["accountId"] + return self["accountId"] @property def api_id(self) -> str: """The identifier API Gateway assigns to your API.""" - return self["requestContext"]["apiId"] + return self["apiId"] @property def domain_name(self) -> str | None: """A domain name""" - return self["requestContext"].get("domainName") + return self.get("domainName") @property def domain_prefix(self) -> str | None: - return self["requestContext"].get("domainPrefix") + return self.get("domainPrefix") @property def extended_request_id(self) -> str | None: """An automatically generated ID for the API call, which contains more useful information for debugging/troubleshooting.""" - return self["requestContext"].get("extendedRequestId") + return self.get("extendedRequestId") @property def protocol(self) -> str: """The request protocol, for example, HTTP/1.1.""" - return self["requestContext"]["protocol"] + return self["protocol"] @property def http_method(self) -> str: """The HTTP method used. Valid values include: DELETE, GET, HEAD, OPTIONS, PATCH, POST, and PUT.""" - return self["requestContext"]["httpMethod"] + return self["httpMethod"] @property def identity(self) -> APIGatewayEventIdentity: - return APIGatewayEventIdentity(self._data) + return APIGatewayEventIdentity(self["identity"]) @property def path(self) -> str: - return self["requestContext"]["path"] + return self["path"] @property def stage(self) -> str: """The deployment stage of the API request""" - return self["requestContext"]["stage"] + return self["stage"] @property def request_id(self) -> str: """The ID that API Gateway assigns to the API request.""" - return self["requestContext"]["requestId"] + return self["requestId"] @property def request_time(self) -> str | None: """The CLF-formatted request time (dd/MMM/yyyy:HH:mm:ss +-hhmm)""" - return self["requestContext"].get("requestTime") + return self.get("requestTime") @property def request_time_epoch(self) -> int: """The Epoch-formatted request time.""" - return self["requestContext"]["requestTimeEpoch"] + return self["requestTimeEpoch"] @property def resource_id(self) -> str: - return self["requestContext"]["resourceId"] + return self["resourceId"] @property def resource_path(self) -> str: - return self["requestContext"]["resourcePath"] + return self["resourcePath"] class RequestContextV2Http(DictWrapper): @property def method(self) -> str: - return self["requestContext"]["http"]["method"] + return self["method"] @property def path(self) -> str: - return self["requestContext"]["http"]["path"] + return self["path"] @property def protocol(self) -> str: """The request protocol, for example, HTTP/1.1.""" - return self["requestContext"]["http"]["protocol"] + return self["protocol"] @property def source_ip(self) -> str: """The source IP address of the TCP connection making the request to API Gateway.""" - return self["requestContext"]["http"]["sourceIp"] + return self["sourceIp"] @property def user_agent(self) -> str: """The User Agent of the API caller.""" - return self["requestContext"]["http"]["userAgent"] + return self["userAgent"] class BaseRequestContextV2(DictWrapper): @property def account_id(self) -> str: """The AWS account ID associated with the request.""" - return self["requestContext"]["accountId"] + return self["accountId"] @property def api_id(self) -> str: """The identifier API Gateway assigns to your API.""" - return self["requestContext"]["apiId"] + return self["apiId"] @property def domain_name(self) -> str: """A domain name""" - return self["requestContext"]["domainName"] + return self["domainName"] @property def domain_prefix(self) -> str: - return self["requestContext"]["domainPrefix"] + return self["domainPrefix"] @property def http(self) -> RequestContextV2Http: - return RequestContextV2Http(self._data) + return RequestContextV2Http(self["http"]) @property def request_id(self) -> str: """The ID that API Gateway assigns to the API request.""" - return self["requestContext"]["requestId"] + return self["requestId"] @property def route_key(self) -> str: """The selected route key.""" - return self["requestContext"]["routeKey"] + return self["routeKey"] @property def stage(self) -> str: """The deployment stage of the API request""" - return self["requestContext"]["stage"] + return self["stage"] @property def time(self) -> str: """The CLF-formatted request time (dd/MMM/yyyy:HH:mm:ss +-hhmm).""" - return self["requestContext"]["time"] + return self["time"] @property def time_epoch(self) -> int: """The Epoch-formatted request time.""" - return self["requestContext"]["timeEpoch"] + return self["timeEpoch"] @property def authentication(self) -> RequestContextClientCert | None: """Optional when using mutual TLS authentication""" # FunctionURL might have NONE as AuthZ - authentication = self["requestContext"].get("authentication") or {} + authentication = self.get("authentication") or {} client_cert = authentication.get("clientCert") return None if client_cert is None else RequestContextClientCert(client_cert) diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py index 85e75e198f6..bc61cd69cf0 100644 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py @@ -177,30 +177,25 @@ def asdict(self) -> dict: class KinesisFirehoseRecordMetadata(DictWrapper): - @property - def _metadata(self) -> dict: - """Optional: metadata associated with this record; present only when Kinesis Stream is source""" - return self["kinesisRecordMetadata"] # could raise KeyError - @property def shard_id(self) -> str: """Kinesis stream shard ID; present only when Kinesis Stream is source""" - return self._metadata["shardId"] + return self["shardId"] @property def partition_key(self) -> str: """Kinesis stream partition key; present only when Kinesis Stream is source""" - return self._metadata["partitionKey"] + return self["partitionKey"] @property def approximate_arrival_timestamp(self) -> int: """Kinesis stream approximate arrival ISO timestamp; present only when Kinesis Stream is source""" - return self._metadata["approximateArrivalTimestamp"] + return self["approximateArrivalTimestamp"] @property def sequence_number(self) -> str: """Kinesis stream sequence number; present only when Kinesis Stream is source""" - return self._metadata["sequenceNumber"] + return self["sequenceNumber"] @property def subsequence_number(self) -> int: @@ -208,7 +203,7 @@ def subsequence_number(self) -> int: Note: this will only be present for Kinesis streams using record aggregation """ - return self._metadata["subsequenceNumber"] + return self["subsequenceNumber"] class KinesisFirehoseRecord(DictWrapper): @@ -230,7 +225,8 @@ def data(self) -> str: @property def metadata(self) -> KinesisFirehoseRecordMetadata | None: """Optional: metadata associated with this record; present only when Kinesis Stream is source""" - return KinesisFirehoseRecordMetadata(self._data) if self.get("kinesisRecordMetadata") else None + metadata = self.get("kinesisRecordMetadata") + return KinesisFirehoseRecordMetadata(metadata) if metadata else None @property def data_as_bytes(self) -> bytes: diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py index ba2300b34be..8d6c96c557d 100644 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py @@ -15,27 +15,27 @@ class KinesisStreamRecordPayload(DictWrapper): @property def approximate_arrival_timestamp(self) -> float: """The approximate time that the record was inserted into the stream""" - return float(self["kinesis"]["approximateArrivalTimestamp"]) + return float(self["approximateArrivalTimestamp"]) @property def data(self) -> str: """The data blob""" - return self["kinesis"]["data"] + return self["data"] @property def kinesis_schema_version(self) -> str: """Schema version for the record""" - return self["kinesis"]["kinesisSchemaVersion"] + return self["kinesisSchemaVersion"] @property def partition_key(self) -> str: """Identifies which shard in the stream the data record is assigned to""" - return self["kinesis"]["partitionKey"] + return self["partitionKey"] @property def sequence_number(self) -> str: """The unique identifier of the record within its shard""" - return self["kinesis"]["sequenceNumber"] + return self["sequenceNumber"] def data_as_bytes(self) -> bytes: """Decode binary encoded data as bytes""" @@ -94,7 +94,7 @@ def invoke_identity_arn(self) -> str: @property def kinesis(self) -> KinesisStreamRecordPayload: """Underlying Kinesis record associated with the event""" - return KinesisStreamRecordPayload(self._data) + return KinesisStreamRecordPayload(self["kinesis"]) class KinesisStreamEvent(DictWrapper): diff --git a/aws_lambda_powertools/utilities/data_classes/s3_event.py b/aws_lambda_powertools/utilities/data_classes/s3_event.py index f3c0fa2adf9..db5debc8db1 100644 --- a/aws_lambda_powertools/utilities/data_classes/s3_event.py +++ b/aws_lambda_powertools/utilities/data_classes/s3_event.py @@ -18,7 +18,7 @@ def principal_id(self) -> str: class S3RequestParameters(DictWrapper): @property def source_ip_address(self) -> str: - return self["requestParameters"]["sourceIPAddress"] + return self["sourceIPAddress"] class S3EventNotificationEventBridgeBucket(DictWrapper): @@ -40,8 +40,8 @@ def size(self) -> int | None: @property def etag(self) -> str: - """Object etag. Object deletion event doesn't contain etag; we default to empty string""" - return self.get("etag", "") # type: ignore[return-value] # false positive + """Object eTag. Object deletion event doesn't contain eTag; we default to empty string""" + return self.get("etag") or "" @property def version_id(self) -> str: @@ -156,77 +156,77 @@ def detail(self) -> S3EventBridgeNotificationDetail: # type: ignore[override] class S3Bucket(DictWrapper): @property def name(self) -> str: - return self["s3"]["bucket"]["name"] + return self["name"] @property def owner_identity(self) -> S3Identity: - return S3Identity(self["s3"]["bucket"]["ownerIdentity"]) + return S3Identity(self["ownerIdentity"]) @property def arn(self) -> str: - return self["s3"]["bucket"]["arn"] + return self["arn"] class S3Object(DictWrapper): @property def key(self) -> str: """Object key""" - return self["s3"]["object"]["key"] + return self["key"] @property def size(self) -> int: """Object byte size""" - return int(self["s3"]["object"]["size"]) + return int(self["size"]) @property def etag(self) -> str: """Object eTag. Object deletion event doesn't contain eTag; we default to empty string""" - return self["s3"]["object"].get("eTag", "") + return self.get("eTag") or "" @property def version_id(self) -> str | None: """Object version if bucket is versioning-enabled, otherwise null""" - return self["s3"]["object"].get("versionId") + return self.get("versionId") @property def sequencer(self) -> str: """A string representation of a hexadecimal value used to determine event sequence, only used with PUTs and DELETEs """ - return self["s3"]["object"]["sequencer"] + return self["sequencer"] class S3Message(DictWrapper): @property def s3_schema_version(self) -> str: - return self["s3"]["s3SchemaVersion"] + return self["s3SchemaVersion"] @property def configuration_id(self) -> str: """ID found in the bucket notification configuration""" - return self["s3"]["configurationId"] + return self["configurationId"] @property def bucket(self) -> S3Bucket: - return S3Bucket(self._data) + return S3Bucket(self["bucket"]) @property def get_object(self) -> S3Object: """Get the `object` property as an S3Object""" # Note: this name conflicts with existing python builtins - return S3Object(self._data) + return S3Object(self["object"]) class S3EventRecordGlacierRestoreEventData(DictWrapper): @property def lifecycle_restoration_expiry_time(self) -> str: """Time when the object restoration will be expired.""" - return self["restoreEventData"]["lifecycleRestorationExpiryTime"] + return self["lifecycleRestorationExpiryTime"] @property def lifecycle_restore_storage_class(self) -> str: """Source storage class for restore""" - return self["restoreEventData"]["lifecycleRestoreStorageClass"] + return self["lifecycleRestoreStorageClass"] class S3EventRecordGlacierEventData(DictWrapper): @@ -236,7 +236,7 @@ def restore_event_data(self) -> S3EventRecordGlacierRestoreEventData: The glacierEventData key is only visible for s3:ObjectRestore:Completed events """ - return S3EventRecordGlacierRestoreEventData(self._data) + return S3EventRecordGlacierRestoreEventData(self["restoreEventData"]) class S3EventRecord(DictWrapper): @@ -272,7 +272,7 @@ def user_identity(self) -> S3Identity: @property def request_parameters(self) -> S3RequestParameters: - return S3RequestParameters(self._data) + return S3RequestParameters(self["requestParameters"]) @property def response_elements(self) -> dict[str, str]: @@ -286,7 +286,7 @@ def response_elements(self) -> dict[str, str]: @property def s3(self) -> S3Message: - return S3Message(self._data) + return S3Message(self["s3"]) @property def glacier_event_data(self) -> S3EventRecordGlacierEventData | None: diff --git a/aws_lambda_powertools/utilities/data_classes/ses_event.py b/aws_lambda_powertools/utilities/data_classes/ses_event.py index e50ec9ccc56..1c405e540a7 100644 --- a/aws_lambda_powertools/utilities/data_classes/ses_event.py +++ b/aws_lambda_powertools/utilities/data_classes/ses_event.py @@ -212,11 +212,11 @@ def action(self) -> SESReceiptAction: class SESMessage(DictWrapper): @property def mail(self) -> SESMail: - return SESMail(self["ses"]["mail"]) + return SESMail(self["mail"]) @property def receipt(self) -> SESReceipt: - return SESReceipt(self["ses"]["receipt"]) + return SESReceipt(self["receipt"]) class SESEventRecord(DictWrapper): @@ -232,7 +232,7 @@ def event_version(self) -> str: @property def ses(self) -> SESMessage: - return SESMessage(self._data) + return SESMessage(self["ses"]) class SESEvent(DictWrapper): diff --git a/tests/unit/data_classes/required_dependencies/test_cognito_user_pool_event.py b/tests/unit/data_classes/required_dependencies/test_cognito_user_pool_event.py index ee019605725..99984a3d87d 100644 --- a/tests/unit/data_classes/required_dependencies/test_cognito_user_pool_event.py +++ b/tests/unit/data_classes/required_dependencies/test_cognito_user_pool_event.py @@ -103,11 +103,11 @@ def test_cognito_custom_message_trigger_event(): assert parsed_event.request.client_metadata == {} parsed_event.response.sms_message = "sms" - assert parsed_event.response.sms_message == parsed_event["response"]["smsMessage"] + assert parsed_event.response.sms_message == raw_event["response"]["smsMessage"] parsed_event.response.email_message = "email" - assert parsed_event.response.email_message == parsed_event["response"]["emailMessage"] + assert parsed_event.response.email_message == raw_event["response"]["emailMessage"] parsed_event.response.email_subject = "subject" - assert parsed_event.response.email_subject == parsed_event["response"]["emailSubject"] + assert parsed_event.response.email_subject == raw_event["response"]["emailSubject"] def test_cognito_custom_email_sender_trigger_event(): @@ -141,7 +141,7 @@ def test_cognito_pre_authentication_trigger_event(): assert parsed_event.trigger_source == raw_event["triggerSource"] assert parsed_event.request.user_not_found is None - parsed_event["request"]["userNotFound"] = True + raw_event["request"]["userNotFound"] = True assert parsed_event.request.user_not_found is True assert parsed_event.request.user_attributes.get("email") == raw_event["request"]["userAttributes"]["email"] assert parsed_event.request.validation_data == {} @@ -171,57 +171,42 @@ def test_cognito_pre_token_generation_trigger_event(): assert parsed_event.request.user_attributes.get("email") == raw_event["request"]["userAttributes"]["email"] assert parsed_event.request.client_metadata == {} - parsed_event["request"]["groupConfiguration"]["preferredRole"] = "temp" + raw_event["request"]["groupConfiguration"]["preferredRole"] = "temp" group_configuration = parsed_event.request.group_configuration assert group_configuration.preferred_role == "temp" - assert parsed_event["response"].get("claimsOverrideDetails") is None claims_override_details = parsed_event.response.claims_override_details - assert parsed_event["response"]["claimsOverrideDetails"] == {} - assert claims_override_details.claims_to_add_or_override == {} assert claims_override_details.claims_to_suppress == [] assert claims_override_details.group_configuration is None claims_override_details.group_configuration = {} assert claims_override_details.group_configuration._data == {} - assert parsed_event["response"]["claimsOverrideDetails"]["groupOverrideDetails"] == {} expected_claims = {"test": "value"} claims_override_details.claims_to_add_or_override = expected_claims assert claims_override_details.claims_to_add_or_override["test"] == "value" - assert parsed_event["response"]["claimsOverrideDetails"]["claimsToAddOrOverride"] == expected_claims claims_override_details.claims_to_suppress = ["email"] assert claims_override_details.claims_to_suppress[0] == "email" - assert parsed_event["response"]["claimsOverrideDetails"]["claimsToSuppress"] == ["email"] expected_groups = ["group-A", "group-B"] claims_override_details.set_group_configuration_groups_to_override(expected_groups) assert claims_override_details.group_configuration.groups_to_override == expected_groups - assert ( - parsed_event["response"]["claimsOverrideDetails"]["groupOverrideDetails"]["groupsToOverride"] == expected_groups - ) - claims_override_details = parsed_event.response.claims_override_details - assert claims_override_details["groupOverrideDetails"]["groupsToOverride"] == expected_groups claims_override_details.set_group_configuration_iam_roles_to_override(["role"]) assert claims_override_details.group_configuration.iam_roles_to_override == ["role"] - assert parsed_event["response"]["claimsOverrideDetails"]["groupOverrideDetails"]["iamRolesToOverride"] == ["role"] claims_override_details.set_group_configuration_preferred_role("role_name") assert claims_override_details.group_configuration.preferred_role == "role_name" - assert parsed_event["response"]["claimsOverrideDetails"]["groupOverrideDetails"]["preferredRole"] == "role_name" # Ensure that even if "claimsOverrideDetails" was explicitly set to None # accessing `event.response.claims_override_details` would set it to `{}` - parsed_event["response"]["claimsOverrideDetails"] = None + raw_event["response"]["claimsOverrideDetails"] = None claims_override_details = parsed_event.response.claims_override_details assert claims_override_details._data == {} - assert parsed_event["response"]["claimsOverrideDetails"] == {} claims_override_details.claims_to_suppress = ["email"] assert claims_override_details.claims_to_suppress[0] == "email" - assert parsed_event["response"]["claimsOverrideDetails"]["claimsToSuppress"] == ["email"] def test_cognito_pre_token_v2_generation_trigger_event(): @@ -236,15 +221,12 @@ def test_cognito_pre_token_v2_generation_trigger_event(): assert parsed_event.request.user_attributes.get("email") == raw_event["request"]["userAttributes"]["email"] assert parsed_event.request.client_metadata == {} - parsed_event["request"]["groupConfiguration"]["preferredRole"] = "temp" + raw_event["request"]["groupConfiguration"]["preferredRole"] = "temp" group_configuration = parsed_event.request.group_configuration assert group_configuration.preferred_role == "temp" assert parsed_event.request.scopes == raw_event["request"]["scopes"] - assert parsed_event["response"].get("claimsAndScopeOverrideDetails") is None claims_scope_override_details = parsed_event.response.claims_scope_override_details - assert parsed_event["response"]["claimsAndScopeOverrideDetails"] == {} - claims_scope_override_details.id_token_generation = claims_scope_override_details.access_token_generation = {} assert claims_scope_override_details.id_token_generation.claims_to_add_or_override == {} assert claims_scope_override_details.id_token_generation.claims_to_suppress == [] @@ -258,45 +240,24 @@ def test_cognito_pre_token_v2_generation_trigger_event(): claims_scope_override_details.group_configuration = {} assert claims_scope_override_details.group_configuration._data == {} - assert parsed_event["response"]["claimsAndScopeOverrideDetails"]["groupOverrideDetails"] == {} expected_claims = {"test": "value"} claims_scope_override_details.id_token_generation.claims_to_add_or_override = expected_claims claims_scope_override_details.access_token_generation.claims_to_add_or_override = expected_claims assert claims_scope_override_details.id_token_generation.claims_to_add_or_override["test"] == "value" assert claims_scope_override_details.access_token_generation.claims_to_add_or_override["test"] == "value" - assert ( - parsed_event["response"]["claimsAndScopeOverrideDetails"]["idTokenGeneration"]["claimsToAddOrOverride"] - == expected_claims - ) - assert ( - parsed_event["response"]["claimsAndScopeOverrideDetails"]["accessTokenGeneration"]["claimsToAddOrOverride"] - == expected_claims - ) claims_scope_override_details.id_token_generation.claims_to_suppress = ( claims_scope_override_details.access_token_generation.claims_to_suppress ) = ["email"] assert claims_scope_override_details.id_token_generation.claims_to_suppress[0] == "email" assert claims_scope_override_details.access_token_generation.claims_to_suppress[0] == "email" - assert parsed_event["response"]["claimsAndScopeOverrideDetails"]["idTokenGeneration"]["claimsToSuppress"] == [ - "email", - ] - assert parsed_event["response"]["claimsAndScopeOverrideDetails"]["accessTokenGeneration"]["claimsToSuppress"] == [ - "email", - ] claims_scope_override_details.id_token_generation.scopes_to_suppress = ( claims_scope_override_details.access_token_generation.scopes_to_suppress ) = ["email"] assert claims_scope_override_details.id_token_generation.scopes_to_suppress[0] == "email" assert claims_scope_override_details.access_token_generation.scopes_to_suppress[0] == "email" - assert parsed_event["response"]["claimsAndScopeOverrideDetails"]["idTokenGeneration"]["scopesToSuppress"] == [ - "email", - ] - assert parsed_event["response"]["claimsAndScopeOverrideDetails"]["accessTokenGeneration"]["scopesToSuppress"] == [ - "email", - ] claims_scope_override_details.id_token_generation.scopes_to_add = ( claims_scope_override_details.access_token_generation.scopes_to_add @@ -305,47 +266,27 @@ def test_cognito_pre_token_v2_generation_trigger_event(): claims_scope_override_details.id_token_generation.scopes_to_add[0] == "email" and claims_scope_override_details.access_token_generation.scopes_to_add[0] == "email" ) - assert parsed_event["response"]["claimsAndScopeOverrideDetails"]["idTokenGeneration"]["scopesToAdd"] == ["email"] - assert parsed_event["response"]["claimsAndScopeOverrideDetails"]["accessTokenGeneration"]["scopesToAdd"] == [ - "email", - ] expected_groups = ["group-A", "group-B"] claims_scope_override_details.set_group_configuration_groups_to_override(expected_groups) assert claims_scope_override_details.group_configuration.groups_to_override == expected_groups - assert ( - parsed_event["response"]["claimsAndScopeOverrideDetails"]["groupOverrideDetails"]["groupsToOverride"] - == expected_groups - ) claims_scope_override_details = parsed_event.response.claims_scope_override_details - assert claims_scope_override_details["groupOverrideDetails"]["groupsToOverride"] == expected_groups claims_scope_override_details.set_group_configuration_iam_roles_to_override(["role"]) assert claims_scope_override_details.group_configuration.iam_roles_to_override == ["role"] - assert parsed_event["response"]["claimsAndScopeOverrideDetails"]["groupOverrideDetails"]["iamRolesToOverride"] == [ - "role", - ] claims_scope_override_details.set_group_configuration_preferred_role("role_name") assert claims_scope_override_details.group_configuration.preferred_role == "role_name" - assert ( - parsed_event["response"]["claimsAndScopeOverrideDetails"]["groupOverrideDetails"]["preferredRole"] - == "role_name" - ) # Ensure that even if "claimsAndScopeOverrideDetails" was explicitly set to None # accessing `event.response.claims_scope_override_details` would set it to `{}` - parsed_event["response"]["claimsAndScopeOverrideDetails"] = None + raw_event["response"]["claimsAndScopeOverrideDetails"] = None claims_scope_override_details = parsed_event.response.claims_scope_override_details assert claims_scope_override_details._data == {} - assert parsed_event["response"]["claimsAndScopeOverrideDetails"] == {} claims_scope_override_details.id_token_generation = {} claims_scope_override_details.id_token_generation.claims_to_suppress = ["email"] assert claims_scope_override_details.id_token_generation.claims_to_suppress[0] == "email" - assert parsed_event["response"]["claimsAndScopeOverrideDetails"]["idTokenGeneration"]["claimsToSuppress"] == [ - "email", - ] def test_cognito_define_auth_challenge_trigger_event(): @@ -367,14 +308,14 @@ def test_cognito_define_auth_challenge_trigger_event(): # Verify setters parsed_event.response.challenge_name = "CUSTOM_CHALLENGE" - assert parsed_event.response.challenge_name == parsed_event["response"]["challengeName"] + assert parsed_event.response.challenge_name == raw_event["response"]["challengeName"] assert parsed_event.response.challenge_name == "CUSTOM_CHALLENGE" parsed_event.response.fail_authentication = True assert parsed_event.response.fail_authentication - assert parsed_event.response.fail_authentication == parsed_event["response"]["failAuthentication"] + assert parsed_event.response.fail_authentication == raw_event["response"]["failAuthentication"] parsed_event.response.issue_tokens = True assert parsed_event.response.issue_tokens - assert parsed_event.response.issue_tokens == parsed_event["response"]["issueTokens"] + assert parsed_event.response.issue_tokens == raw_event["response"]["issueTokens"] def test_create_auth_challenge_trigger_event(): @@ -395,13 +336,13 @@ def test_create_auth_challenge_trigger_event(): # Verify setters parsed_event.response.public_challenge_parameters = {"test": "value"} - assert parsed_event.response.public_challenge_parameters == parsed_event["response"]["publicChallengeParameters"] + assert parsed_event.response.public_challenge_parameters == raw_event["response"]["publicChallengeParameters"] assert parsed_event.response.public_challenge_parameters["test"] == "value" parsed_event.response.private_challenge_parameters = {"private": "value"} - assert parsed_event.response.private_challenge_parameters == parsed_event["response"]["privateChallengeParameters"] + assert parsed_event.response.private_challenge_parameters == raw_event["response"]["privateChallengeParameters"] assert parsed_event.response.private_challenge_parameters["private"] == "value" parsed_event.response.challenge_metadata = "meta" - assert parsed_event.response.challenge_metadata == parsed_event["response"]["challengeMetadata"] + assert parsed_event.response.challenge_metadata == raw_event["response"]["challengeMetadata"] assert parsed_event.response.challenge_metadata == "meta" @@ -423,5 +364,5 @@ def test_verify_auth_challenge_response_trigger_event(): # Verify setters parsed_event.response.answer_correct = True - assert parsed_event.response.answer_correct == parsed_event["response"]["answerCorrect"] + assert parsed_event.response.answer_correct == raw_event["response"]["answerCorrect"] assert parsed_event.response.answer_correct diff --git a/tests/unit/data_classes/required_dependencies/test_kinesis_stream_event.py b/tests/unit/data_classes/required_dependencies/test_kinesis_stream_event.py index f136cdbc0be..e97f15a8277 100644 --- a/tests/unit/data_classes/required_dependencies/test_kinesis_stream_event.py +++ b/tests/unit/data_classes/required_dependencies/test_kinesis_stream_event.py @@ -29,7 +29,6 @@ def test_kinesis_stream_event(): kinesis = record.kinesis kinesis_raw = raw_event["Records"][0]["kinesis"] - assert kinesis._data["kinesis"] == kinesis_raw assert kinesis.approximate_arrival_timestamp == kinesis_raw["approximateArrivalTimestamp"] assert kinesis.data == kinesis_raw["data"]