Skip to content

refactor(data-classes): clean up internal logic for APIGatewayAuthorizerResponse #643

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Sep 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .pylintrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[MESSAGES CONTROL]
disable=
too-many-arguments,
too-many-instance-attributes,
too-few-public-methods,
anomalous-backslash-in-string,
missing-class-docstring,
missing-module-docstring,
missing-function-docstring,

[FORMAT]
max-line-length=120
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,12 @@ def raw_query_string(self) -> str:

@property
def cookies(self) -> List[str]:
"""Cookies"""
return self["cookies"]

@property
def headers(self) -> Dict[str, str]:
"""Http headers"""
return self["headers"]

@property
Expand Down Expand Up @@ -314,6 +316,8 @@ def asdict(self) -> dict:


class HttpVerb(enum.Enum):
"""Enum of http methods / verbs"""

GET = "GET"
POST = "POST"
PUT = "PUT"
Expand All @@ -324,15 +328,32 @@ class HttpVerb(enum.Enum):
ALL = "*"


DENY_ALL_RESPONSE = {
"principalId": "deny-all-user",
"policyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Action": "execute-api:Invoke",
"Effect": "Deny",
"Resource": ["*"],
}
],
},
}


class APIGatewayAuthorizerResponse:
"""Api Gateway HTTP API V1 payload or Rest api authorizer response helper
"""The IAM Policy Response required for API Gateway REST APIs and HTTP APIs.

Based on: - https://github.com/awslabs/aws-apigateway-lambda-authorizer-blueprints/blob/\
master/blueprints/python/api-gateway-authorizer-python.py
"""

version = "2012-10-17"
"""The policy version used for the evaluation. This should always be '2012-10-17'"""
Documentation:
-------------
- https://docs.aws.amazon.com/apigateway/latest/developerguide/http-api-lambda-authorizer.html
- https://docs.aws.amazon.com/apigateway/latest/developerguide/api-gateway-lambda-authorizer-output.html
"""

path_regex = r"^[/.a-zA-Z0-9-\*]+$"
"""The regular expression used to validate resource paths for the policy"""
Expand All @@ -345,6 +366,7 @@ def __init__(
api_id: str,
stage: str,
context: Optional[Dict] = None,
usage_identifier_key: Optional[str] = None,
):
"""
Parameters
Expand Down Expand Up @@ -373,32 +395,57 @@ def __init__(
context : Dict, optional
Optional, context.
Note: only names of type string and values of type int, string or boolean are supported
usage_identifier_key: str, optional
If the API uses a usage plan (the apiKeySource is set to `AUTHORIZER`), the Lambda authorizer function
must return one of the usage plan's API keys as the usageIdentifierKey property value.
> **Note:** This only applies for REST APIs.
"""
self.principal_id = principal_id
self.region = region
self.aws_account_id = aws_account_id
self.api_id = api_id
self.stage = stage
self.context = context
self.usage_identifier_key = usage_identifier_key
self._allow_routes: List[Dict] = []
self._deny_routes: List[Dict] = []
self._resource_pattern = re.compile(self.path_regex)

def _add_route(self, effect: str, verb: str, resource: str, conditions: List[Dict]):
@staticmethod
def from_route_arn(
arn: str,
principal_id: str,
context: Optional[Dict] = None,
usage_identifier_key: Optional[str] = None,
) -> "APIGatewayAuthorizerResponse":
parsed_arn = parse_api_gateway_arn(arn)
return APIGatewayAuthorizerResponse(
principal_id,
parsed_arn.region,
parsed_arn.aws_account_id,
parsed_arn.api_id,
parsed_arn.stage,
context,
usage_identifier_key,
)

def _add_route(self, effect: str, http_method: str, resource: str, conditions: Optional[List[Dict]] = None):
"""Adds a route to the internal lists of allowed or denied routes. Each object in
the internal list contains a resource ARN and a condition statement. The condition
statement can be null."""
if verb != "*" and verb not in HttpVerb.__members__:
if http_method != "*" and http_method not in HttpVerb.__members__:
allowed_values = [verb.value for verb in HttpVerb]
raise ValueError(f"Invalid HTTP verb: '{verb}'. Use either '{allowed_values}'")
raise ValueError(f"Invalid HTTP verb: '{http_method}'. Use either '{allowed_values}'")

resource_pattern = re.compile(self.path_regex)
if not resource_pattern.match(resource):
if not self._resource_pattern.match(resource):
raise ValueError(f"Invalid resource path: {resource}. Path should match {self.path_regex}")

if resource[:1] == "/":
resource = resource[1:]

resource_arn = APIGatewayRouteArn(self.region, self.aws_account_id, self.api_id, self.stage, verb, resource).arn
resource_arn = APIGatewayRouteArn(
self.region, self.aws_account_id, self.api_id, self.stage, http_method, resource
).arn

route = {"resourceArn": resource_arn, "conditions": conditions}

Expand All @@ -412,24 +459,27 @@ def _get_empty_statement(effect: str) -> Dict[str, Any]:
"""Returns an empty statement object prepopulated with the correct action and the desired effect."""
return {"Action": "execute-api:Invoke", "Effect": effect.capitalize(), "Resource": []}

def _get_statement_for_effect(self, effect: str, methods: List) -> List:
"""This function loops over an array of objects containing a resourceArn and
conditions statement and generates the array of statements for the policy."""
if len(methods) == 0:
def _get_statement_for_effect(self, effect: str, routes: List[Dict]) -> List[Dict]:
"""This function loops over an array of objects containing a `resourceArn` and
`conditions` statement and generates the array of statements for the policy."""
if not routes:
return []

statements = []

statements: List[Dict] = []
statement = self._get_empty_statement(effect)
for method in methods:
if method["conditions"] is None or len(method["conditions"]) == 0:
statement["Resource"].append(method["resourceArn"])
else:

for route in routes:
resource_arn = route["resourceArn"]
conditions = route.get("conditions")
if conditions is not None and len(conditions) > 0:
conditional_statement = self._get_empty_statement(effect)
conditional_statement["Resource"].append(method["resourceArn"])
conditional_statement["Condition"] = method["conditions"]
conditional_statement["Resource"].append(resource_arn)
conditional_statement["Condition"] = conditions
statements.append(conditional_statement)

else:
statement["Resource"].append(resource_arn)

if len(statement["Resource"]) > 0:
statements.append(statement)

Expand All @@ -442,7 +492,7 @@ def allow_all_routes(self, http_method: str = HttpVerb.ALL.value):
----------
http_method: str
"""
self._add_route(effect="Allow", verb=http_method, resource="*", conditions=[])
self._add_route(effect="Allow", http_method=http_method, resource="*")

def deny_all_routes(self, http_method: str = HttpVerb.ALL.value):
"""Adds a '*' allow to the policy to deny access to all methods of an API
Expand All @@ -452,25 +502,23 @@ def deny_all_routes(self, http_method: str = HttpVerb.ALL.value):
http_method: str
"""

self._add_route(effect="Deny", verb=http_method, resource="*", conditions=[])
self._add_route(effect="Deny", http_method=http_method, resource="*")

def allow_route(self, http_method: str, resource: str, conditions: Optional[List[Dict]] = None):
"""Adds an API Gateway method (Http verb + Resource path) to the list of allowed
methods for the policy.

Optionally includes a condition for the policy statement. More on AWS policy
conditions here: https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements.html#Condition"""
conditions = conditions or []
self._add_route(effect="Allow", verb=http_method, resource=resource, conditions=conditions)
self._add_route(effect="Allow", http_method=http_method, resource=resource, conditions=conditions)

def deny_route(self, http_method: str, resource: str, conditions: Optional[List[Dict]] = None):
"""Adds an API Gateway method (Http verb + Resource path) to the list of denied
methods for the policy.

Optionally includes a condition for the policy statement. More on AWS policy
conditions here: https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements.html#Condition"""
conditions = conditions or []
self._add_route(effect="Deny", verb=http_method, resource=resource, conditions=conditions)
self._add_route(effect="Deny", http_method=http_method, resource=resource, conditions=conditions)

def asdict(self) -> Dict[str, Any]:
"""Generates the policy document based on the internal lists of allowed and denied
Expand All @@ -482,12 +530,15 @@ def asdict(self) -> Dict[str, Any]:

response: Dict[str, Any] = {
"principalId": self.principal_id,
"policyDocument": {"Version": self.version, "Statement": []},
"policyDocument": {"Version": "2012-10-17", "Statement": []},
}

response["policyDocument"]["Statement"].extend(self._get_statement_for_effect("Allow", self._allow_routes))
response["policyDocument"]["Statement"].extend(self._get_statement_for_effect("Deny", self._deny_routes))

if self.usage_identifier_key:
response["usageIdentifierKey"] = self.usage_identifier_key

if self.context:
response["context"] = self.context

Expand Down
29 changes: 16 additions & 13 deletions docs/utilities/data_classes.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,10 @@ Use **`APIGatewayAuthorizerRequestEvent`** for type `REQUEST` and **`APIGatewayA

When the user is found, it includes the user details in the request context that will be available to the back-end, and returns a full access policy for admin users.

```python hl_lines="2-5 26-31 36-37 40 44 46"
```python hl_lines="2-6 29 36-42 47 49"
from aws_lambda_powertools.utilities.data_classes import event_source
from aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event import (
DENY_ALL_RESPONSE,
APIGatewayAuthorizerRequestEvent,
APIGatewayAuthorizerResponse,
HttpVerb,
Expand All @@ -108,9 +109,9 @@ Use **`APIGatewayAuthorizerRequestEvent`** for type `REQUEST` and **`APIGatewayA

def get_user_by_token(token):
if compare_digest(token, "admin-foo"):
return {"isAdmin": True, "name": "Admin"}
return {"id": 0, "name": "Admin", "isAdmin": True}
elif compare_digest(token, "regular-foo"):
return {"name": "Joe"}
return {"id": 1, "name": "Joe"}
else:
return None

Expand All @@ -119,25 +120,27 @@ Use **`APIGatewayAuthorizerRequestEvent`** for type `REQUEST` and **`APIGatewayA
def handler(event: APIGatewayAuthorizerRequestEvent, context):
user = get_user_by_token(event.get_header_value("Authorization"))

if user is None:
# No user was found
# to return 401 - `{"message":"Unauthorized"}`, but pollutes lambda error count metrics
# raise Exception("Unauthorized")
# to return 403 - `{"message":"Forbidden"}`
return DENY_ALL_RESPONSE

# parse the `methodArn` as an `APIGatewayRouteArn`
arn = event.parsed_arn

# Create the response builder from parts of the `methodArn`
# and set the logged in user id and context
policy = APIGatewayAuthorizerResponse(
principal_id="user",
principal_id=user["id"],
context=user,
region=arn.region,
aws_account_id=arn.aws_account_id,
api_id=arn.api_id,
stage=arn.stage
stage=arn.stage,
)

if user is None:
# No user was found, so we return not authorized
policy.deny_all_routes()
return policy.asdict()

# Found the user and setting the details in the context
policy.context = user

# Conditional IAM Policy
if user.get("isAdmin", False):
policy.allow_all_routes()
Expand Down
35 changes: 34 additions & 1 deletion tests/functional/data_classes/test_api_gateway_authorizer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pytest

from aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event import (
DENY_ALL_RESPONSE,
APIGatewayAuthorizerResponse,
HttpVerb,
)
Expand Down Expand Up @@ -36,7 +37,8 @@ def test_authorizer_response_invalid_resource(builder: APIGatewayAuthorizerRespo


def test_authorizer_response_allow_all_routes_with_context():
builder = APIGatewayAuthorizerResponse("foo", "us-west-1", "123456789", "fantom", "dev", {"name": "Foo"})
arn = "arn:aws:execute-api:us-west-1:123456789:fantom/dev/GET/foo"
builder = APIGatewayAuthorizerResponse.from_route_arn(arn, principal_id="foo", context={"name": "Foo"})
builder.allow_all_routes()
assert builder.asdict() == {
"principalId": "foo",
Expand All @@ -54,6 +56,26 @@ def test_authorizer_response_allow_all_routes_with_context():
}


def test_authorizer_response_allow_all_routes_with_usage_identifier_key():
arn = "arn:aws:execute-api:us-east-1:1111111111:api/dev/ANY/y"
builder = APIGatewayAuthorizerResponse.from_route_arn(arn, principal_id="cow", usage_identifier_key="key")
builder.allow_all_routes()
assert builder.asdict() == {
"principalId": "cow",
"policyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Action": "execute-api:Invoke",
"Effect": "Allow",
"Resource": ["arn:aws:execute-api:us-east-1:1111111111:api/dev/*/*"],
}
],
},
"usageIdentifierKey": "key",
}


def test_authorizer_response_deny_all_routes(builder: APIGatewayAuthorizerResponse):
builder.deny_all_routes()
assert builder.asdict() == {
Expand Down Expand Up @@ -145,3 +167,14 @@ def test_authorizer_response_deny_route_with_conditions(builder: APIGatewayAutho
],
},
}


def test_deny_all():
# CHECK we always explicitly deny all
statements = DENY_ALL_RESPONSE["policyDocument"]["Statement"]
assert len(statements) == 1
assert statements[0] == {
"Action": "execute-api:Invoke",
"Effect": "Deny",
"Resource": ["*"],
}