diff --git a/.pylintrc b/.pylintrc new file mode 100644 index 00000000000..cf0445d7d27 --- /dev/null +++ b/.pylintrc @@ -0,0 +1,12 @@ +[MESSAGES CONTROL] +disable= + too-many-arguments, + too-many-instance-attributes, + too-few-public-methods, + anomalous-backslash-in-string, + missing-class-docstring, + missing-module-docstring, + missing-function-docstring, + +[FORMAT] +max-line-length=120 diff --git a/aws_lambda_powertools/utilities/data_classes/api_gateway_authorizer_event.py b/aws_lambda_powertools/utilities/data_classes/api_gateway_authorizer_event.py index 29694eacd97..4682711af92 100644 --- a/aws_lambda_powertools/utilities/data_classes/api_gateway_authorizer_event.py +++ b/aws_lambda_powertools/utilities/data_classes/api_gateway_authorizer_event.py @@ -234,10 +234,12 @@ def raw_query_string(self) -> str: @property def cookies(self) -> List[str]: + """Cookies""" return self["cookies"] @property def headers(self) -> Dict[str, str]: + """Http headers""" return self["headers"] @property @@ -314,6 +316,8 @@ def asdict(self) -> dict: class HttpVerb(enum.Enum): + """Enum of http methods / verbs""" + GET = "GET" POST = "POST" PUT = "PUT" @@ -324,15 +328,32 @@ class HttpVerb(enum.Enum): ALL = "*" +DENY_ALL_RESPONSE = { + "principalId": "deny-all-user", + "policyDocument": { + "Version": "2012-10-17", + "Statement": [ + { + "Action": "execute-api:Invoke", + "Effect": "Deny", + "Resource": ["*"], + } + ], + }, +} + + class APIGatewayAuthorizerResponse: - """Api Gateway HTTP API V1 payload or Rest api authorizer response helper + """The IAM Policy Response required for API Gateway REST APIs and HTTP APIs. Based on: - https://github.com/awslabs/aws-apigateway-lambda-authorizer-blueprints/blob/\ master/blueprints/python/api-gateway-authorizer-python.py - """ - version = "2012-10-17" - """The policy version used for the evaluation. This should always be '2012-10-17'""" + Documentation: + ------------- + - https://docs.aws.amazon.com/apigateway/latest/developerguide/http-api-lambda-authorizer.html + - https://docs.aws.amazon.com/apigateway/latest/developerguide/api-gateway-lambda-authorizer-output.html + """ path_regex = r"^[/.a-zA-Z0-9-\*]+$" """The regular expression used to validate resource paths for the policy""" @@ -345,6 +366,7 @@ def __init__( api_id: str, stage: str, context: Optional[Dict] = None, + usage_identifier_key: Optional[str] = None, ): """ Parameters @@ -373,6 +395,10 @@ def __init__( context : Dict, optional Optional, context. Note: only names of type string and values of type int, string or boolean are supported + usage_identifier_key: str, optional + If the API uses a usage plan (the apiKeySource is set to `AUTHORIZER`), the Lambda authorizer function + must return one of the usage plan's API keys as the usageIdentifierKey property value. + > **Note:** This only applies for REST APIs. """ self.principal_id = principal_id self.region = region @@ -380,25 +406,46 @@ def __init__( self.api_id = api_id self.stage = stage self.context = context + self.usage_identifier_key = usage_identifier_key self._allow_routes: List[Dict] = [] self._deny_routes: List[Dict] = [] + self._resource_pattern = re.compile(self.path_regex) - def _add_route(self, effect: str, verb: str, resource: str, conditions: List[Dict]): + @staticmethod + def from_route_arn( + arn: str, + principal_id: str, + context: Optional[Dict] = None, + usage_identifier_key: Optional[str] = None, + ) -> "APIGatewayAuthorizerResponse": + parsed_arn = parse_api_gateway_arn(arn) + return APIGatewayAuthorizerResponse( + principal_id, + parsed_arn.region, + parsed_arn.aws_account_id, + parsed_arn.api_id, + parsed_arn.stage, + context, + usage_identifier_key, + ) + + def _add_route(self, effect: str, http_method: str, resource: str, conditions: Optional[List[Dict]] = None): """Adds a route to the internal lists of allowed or denied routes. Each object in the internal list contains a resource ARN and a condition statement. The condition statement can be null.""" - if verb != "*" and verb not in HttpVerb.__members__: + if http_method != "*" and http_method not in HttpVerb.__members__: allowed_values = [verb.value for verb in HttpVerb] - raise ValueError(f"Invalid HTTP verb: '{verb}'. Use either '{allowed_values}'") + raise ValueError(f"Invalid HTTP verb: '{http_method}'. Use either '{allowed_values}'") - resource_pattern = re.compile(self.path_regex) - if not resource_pattern.match(resource): + if not self._resource_pattern.match(resource): raise ValueError(f"Invalid resource path: {resource}. Path should match {self.path_regex}") if resource[:1] == "/": resource = resource[1:] - resource_arn = APIGatewayRouteArn(self.region, self.aws_account_id, self.api_id, self.stage, verb, resource).arn + resource_arn = APIGatewayRouteArn( + self.region, self.aws_account_id, self.api_id, self.stage, http_method, resource + ).arn route = {"resourceArn": resource_arn, "conditions": conditions} @@ -412,24 +459,27 @@ def _get_empty_statement(effect: str) -> Dict[str, Any]: """Returns an empty statement object prepopulated with the correct action and the desired effect.""" return {"Action": "execute-api:Invoke", "Effect": effect.capitalize(), "Resource": []} - def _get_statement_for_effect(self, effect: str, methods: List) -> List: - """This function loops over an array of objects containing a resourceArn and - conditions statement and generates the array of statements for the policy.""" - if len(methods) == 0: + def _get_statement_for_effect(self, effect: str, routes: List[Dict]) -> List[Dict]: + """This function loops over an array of objects containing a `resourceArn` and + `conditions` statement and generates the array of statements for the policy.""" + if not routes: return [] - statements = [] - + statements: List[Dict] = [] statement = self._get_empty_statement(effect) - for method in methods: - if method["conditions"] is None or len(method["conditions"]) == 0: - statement["Resource"].append(method["resourceArn"]) - else: + + for route in routes: + resource_arn = route["resourceArn"] + conditions = route.get("conditions") + if conditions is not None and len(conditions) > 0: conditional_statement = self._get_empty_statement(effect) - conditional_statement["Resource"].append(method["resourceArn"]) - conditional_statement["Condition"] = method["conditions"] + conditional_statement["Resource"].append(resource_arn) + conditional_statement["Condition"] = conditions statements.append(conditional_statement) + else: + statement["Resource"].append(resource_arn) + if len(statement["Resource"]) > 0: statements.append(statement) @@ -442,7 +492,7 @@ def allow_all_routes(self, http_method: str = HttpVerb.ALL.value): ---------- http_method: str """ - self._add_route(effect="Allow", verb=http_method, resource="*", conditions=[]) + self._add_route(effect="Allow", http_method=http_method, resource="*") def deny_all_routes(self, http_method: str = HttpVerb.ALL.value): """Adds a '*' allow to the policy to deny access to all methods of an API @@ -452,7 +502,7 @@ def deny_all_routes(self, http_method: str = HttpVerb.ALL.value): http_method: str """ - self._add_route(effect="Deny", verb=http_method, resource="*", conditions=[]) + self._add_route(effect="Deny", http_method=http_method, resource="*") def allow_route(self, http_method: str, resource: str, conditions: Optional[List[Dict]] = None): """Adds an API Gateway method (Http verb + Resource path) to the list of allowed @@ -460,8 +510,7 @@ def allow_route(self, http_method: str, resource: str, conditions: Optional[List Optionally includes a condition for the policy statement. More on AWS policy conditions here: https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements.html#Condition""" - conditions = conditions or [] - self._add_route(effect="Allow", verb=http_method, resource=resource, conditions=conditions) + self._add_route(effect="Allow", http_method=http_method, resource=resource, conditions=conditions) def deny_route(self, http_method: str, resource: str, conditions: Optional[List[Dict]] = None): """Adds an API Gateway method (Http verb + Resource path) to the list of denied @@ -469,8 +518,7 @@ def deny_route(self, http_method: str, resource: str, conditions: Optional[List[ Optionally includes a condition for the policy statement. More on AWS policy conditions here: https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements.html#Condition""" - conditions = conditions or [] - self._add_route(effect="Deny", verb=http_method, resource=resource, conditions=conditions) + self._add_route(effect="Deny", http_method=http_method, resource=resource, conditions=conditions) def asdict(self) -> Dict[str, Any]: """Generates the policy document based on the internal lists of allowed and denied @@ -482,12 +530,15 @@ def asdict(self) -> Dict[str, Any]: response: Dict[str, Any] = { "principalId": self.principal_id, - "policyDocument": {"Version": self.version, "Statement": []}, + "policyDocument": {"Version": "2012-10-17", "Statement": []}, } response["policyDocument"]["Statement"].extend(self._get_statement_for_effect("Allow", self._allow_routes)) response["policyDocument"]["Statement"].extend(self._get_statement_for_effect("Deny", self._deny_routes)) + if self.usage_identifier_key: + response["usageIdentifierKey"] = self.usage_identifier_key + if self.context: response["context"] = self.context diff --git a/docs/utilities/data_classes.md b/docs/utilities/data_classes.md index 6cd487a2092..9c1d4676777 100644 --- a/docs/utilities/data_classes.md +++ b/docs/utilities/data_classes.md @@ -96,9 +96,10 @@ Use **`APIGatewayAuthorizerRequestEvent`** for type `REQUEST` and **`APIGatewayA When the user is found, it includes the user details in the request context that will be available to the back-end, and returns a full access policy for admin users. - ```python hl_lines="2-5 26-31 36-37 40 44 46" + ```python hl_lines="2-6 29 36-42 47 49" from aws_lambda_powertools.utilities.data_classes import event_source from aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event import ( + DENY_ALL_RESPONSE, APIGatewayAuthorizerRequestEvent, APIGatewayAuthorizerResponse, HttpVerb, @@ -108,9 +109,9 @@ Use **`APIGatewayAuthorizerRequestEvent`** for type `REQUEST` and **`APIGatewayA def get_user_by_token(token): if compare_digest(token, "admin-foo"): - return {"isAdmin": True, "name": "Admin"} + return {"id": 0, "name": "Admin", "isAdmin": True} elif compare_digest(token, "regular-foo"): - return {"name": "Joe"} + return {"id": 1, "name": "Joe"} else: return None @@ -119,25 +120,27 @@ Use **`APIGatewayAuthorizerRequestEvent`** for type `REQUEST` and **`APIGatewayA def handler(event: APIGatewayAuthorizerRequestEvent, context): user = get_user_by_token(event.get_header_value("Authorization")) + if user is None: + # No user was found + # to return 401 - `{"message":"Unauthorized"}`, but pollutes lambda error count metrics + # raise Exception("Unauthorized") + # to return 403 - `{"message":"Forbidden"}` + return DENY_ALL_RESPONSE + # parse the `methodArn` as an `APIGatewayRouteArn` arn = event.parsed_arn + # Create the response builder from parts of the `methodArn` + # and set the logged in user id and context policy = APIGatewayAuthorizerResponse( - principal_id="user", + principal_id=user["id"], + context=user, region=arn.region, aws_account_id=arn.aws_account_id, api_id=arn.api_id, - stage=arn.stage + stage=arn.stage, ) - if user is None: - # No user was found, so we return not authorized - policy.deny_all_routes() - return policy.asdict() - - # Found the user and setting the details in the context - policy.context = user - # Conditional IAM Policy if user.get("isAdmin", False): policy.allow_all_routes() diff --git a/tests/functional/data_classes/test_api_gateway_authorizer.py b/tests/functional/data_classes/test_api_gateway_authorizer.py index 7dac6cb7791..b7584ccc4a8 100644 --- a/tests/functional/data_classes/test_api_gateway_authorizer.py +++ b/tests/functional/data_classes/test_api_gateway_authorizer.py @@ -1,6 +1,7 @@ import pytest from aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event import ( + DENY_ALL_RESPONSE, APIGatewayAuthorizerResponse, HttpVerb, ) @@ -36,7 +37,8 @@ def test_authorizer_response_invalid_resource(builder: APIGatewayAuthorizerRespo def test_authorizer_response_allow_all_routes_with_context(): - builder = APIGatewayAuthorizerResponse("foo", "us-west-1", "123456789", "fantom", "dev", {"name": "Foo"}) + arn = "arn:aws:execute-api:us-west-1:123456789:fantom/dev/GET/foo" + builder = APIGatewayAuthorizerResponse.from_route_arn(arn, principal_id="foo", context={"name": "Foo"}) builder.allow_all_routes() assert builder.asdict() == { "principalId": "foo", @@ -54,6 +56,26 @@ def test_authorizer_response_allow_all_routes_with_context(): } +def test_authorizer_response_allow_all_routes_with_usage_identifier_key(): + arn = "arn:aws:execute-api:us-east-1:1111111111:api/dev/ANY/y" + builder = APIGatewayAuthorizerResponse.from_route_arn(arn, principal_id="cow", usage_identifier_key="key") + builder.allow_all_routes() + assert builder.asdict() == { + "principalId": "cow", + "policyDocument": { + "Version": "2012-10-17", + "Statement": [ + { + "Action": "execute-api:Invoke", + "Effect": "Allow", + "Resource": ["arn:aws:execute-api:us-east-1:1111111111:api/dev/*/*"], + } + ], + }, + "usageIdentifierKey": "key", + } + + def test_authorizer_response_deny_all_routes(builder: APIGatewayAuthorizerResponse): builder.deny_all_routes() assert builder.asdict() == { @@ -145,3 +167,14 @@ def test_authorizer_response_deny_route_with_conditions(builder: APIGatewayAutho ], }, } + + +def test_deny_all(): + # CHECK we always explicitly deny all + statements = DENY_ALL_RESPONSE["policyDocument"]["Statement"] + assert len(statements) == 1 + assert statements[0] == { + "Action": "execute-api:Invoke", + "Effect": "Deny", + "Resource": ["*"], + }