Skip to content

refactor(data-classes): clean up internal logic for APIGatewayAuthorizerResponse #643

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Sep 9, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -374,22 +374,22 @@ def __init__(
Optional, context.
Note: only names of type string and values of type int, string or boolean are supported
"""
self.principal_id = principal_id
self.region = region
self.aws_account_id = aws_account_id
self.api_id = api_id
self.stage = stage
self.principal_id = principal_id
self.context = context
self._allow_routes: List[Dict] = []
self._deny_routes: List[Dict] = []

def _add_route(self, effect: str, verb: str, resource: str, conditions: List[Dict]):
def _add_route(self, effect: str, http_method: str, resource: str, conditions: Optional[List[Dict]] = None):
"""Adds a route to the internal lists of allowed or denied routes. Each object in
the internal list contains a resource ARN and a condition statement. The condition
statement can be null."""
if verb != "*" and verb not in HttpVerb.__members__:
if http_method != "*" and http_method not in HttpVerb.__members__:
allowed_values = [verb.value for verb in HttpVerb]
raise ValueError(f"Invalid HTTP verb: '{verb}'. Use either '{allowed_values}'")
raise ValueError(f"Invalid HTTP verb: '{http_method}'. Use either '{allowed_values}'")

resource_pattern = re.compile(self.path_regex)
if not resource_pattern.match(resource):
Expand All @@ -398,7 +398,9 @@ def _add_route(self, effect: str, verb: str, resource: str, conditions: List[Dic
if resource[:1] == "/":
resource = resource[1:]

resource_arn = APIGatewayRouteArn(self.region, self.aws_account_id, self.api_id, self.stage, verb, resource).arn
resource_arn = APIGatewayRouteArn(
self.region, self.aws_account_id, self.api_id, self.stage, http_method, resource
).arn

route = {"resourceArn": resource_arn, "conditions": conditions}

Expand All @@ -412,24 +414,27 @@ def _get_empty_statement(effect: str) -> Dict[str, Any]:
"""Returns an empty statement object prepopulated with the correct action and the desired effect."""
return {"Action": "execute-api:Invoke", "Effect": effect.capitalize(), "Resource": []}

def _get_statement_for_effect(self, effect: str, methods: List) -> List:
"""This function loops over an array of objects containing a resourceArn and
conditions statement and generates the array of statements for the policy."""
if len(methods) == 0:
def _get_statement_for_effect(self, effect: str, routes: List[Dict]) -> List[Dict]:
"""This function loops over an array of objects containing a `resourceArn` and
`conditions` statement and generates the array of statements for the policy."""
if len(routes) == 0:
return []

statements = []

statements: List[Dict] = []
statement = self._get_empty_statement(effect)
for method in methods:
if method["conditions"] is None or len(method["conditions"]) == 0:
statement["Resource"].append(method["resourceArn"])
else:

for route in routes:
resource_arn = route["resourceArn"]
conditions = route.get("conditions")
if conditions is not None and len(conditions) > 0:
conditional_statement = self._get_empty_statement(effect)
conditional_statement["Resource"].append(method["resourceArn"])
conditional_statement["Condition"] = method["conditions"]
conditional_statement["Resource"].append(resource_arn)
conditional_statement["Condition"] = conditions
statements.append(conditional_statement)

else:
statement["Resource"].append(resource_arn)

if len(statement["Resource"]) > 0:
statements.append(statement)

Expand All @@ -442,7 +447,7 @@ def allow_all_routes(self, http_method: str = HttpVerb.ALL.value):
----------
http_method: str
"""
self._add_route(effect="Allow", verb=http_method, resource="*", conditions=[])
self._add_route(effect="Allow", http_method=http_method, resource="*")

def deny_all_routes(self, http_method: str = HttpVerb.ALL.value):
"""Adds a '*' allow to the policy to deny access to all methods of an API
Expand All @@ -452,25 +457,23 @@ def deny_all_routes(self, http_method: str = HttpVerb.ALL.value):
http_method: str
"""

self._add_route(effect="Deny", verb=http_method, resource="*", conditions=[])
self._add_route(effect="Deny", http_method=http_method, resource="*")

def allow_route(self, http_method: str, resource: str, conditions: Optional[List[Dict]] = None):
"""Adds an API Gateway method (Http verb + Resource path) to the list of allowed
methods for the policy.

Optionally includes a condition for the policy statement. More on AWS policy
conditions here: https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements.html#Condition"""
conditions = conditions or []
self._add_route(effect="Allow", verb=http_method, resource=resource, conditions=conditions)
self._add_route(effect="Allow", http_method=http_method, resource=resource, conditions=conditions)

def deny_route(self, http_method: str, resource: str, conditions: Optional[List[Dict]] = None):
"""Adds an API Gateway method (Http verb + Resource path) to the list of denied
methods for the policy.

Optionally includes a condition for the policy statement. More on AWS policy
conditions here: https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements.html#Condition"""
conditions = conditions or []
self._add_route(effect="Deny", verb=http_method, resource=resource, conditions=conditions)
self._add_route(effect="Deny", http_method=http_method, resource=resource, conditions=conditions)

def asdict(self) -> Dict[str, Any]:
"""Generates the policy document based on the internal lists of allowed and denied
Expand Down
29 changes: 16 additions & 13 deletions docs/utilities/data_classes.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ Use **`APIGatewayAuthorizerRequestEvent`** for type `REQUEST` and **`APIGatewayA

When the user is found, it includes the user details in the request context that will be available to the back-end, and returns a full access policy for admin users.

```python hl_lines="2-5 26-31 36-37 40 44 46"
```python hl_lines="2-5 29 36-42 47 49"
from aws_lambda_powertools.utilities.data_classes import event_source
from aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event import (
APIGatewayAuthorizerRequestEvent,
Expand All @@ -106,11 +106,15 @@ Use **`APIGatewayAuthorizerRequestEvent`** for type `REQUEST` and **`APIGatewayA
from secrets import compare_digest


class UnAuthorizedError(Exception):
...


def get_user_by_token(token):
if compare_digest(token, "admin-foo"):
return {"isAdmin": True, "name": "Admin"}
return {"id": 0, "name": "Admin", "isAdmin": True}
elif compare_digest(token, "regular-foo"):
return {"name": "Joe"}
return {"id": 1, "name": "Joe"}
else:
return None

Expand All @@ -119,25 +123,24 @@ Use **`APIGatewayAuthorizerRequestEvent`** for type `REQUEST` and **`APIGatewayA
def handler(event: APIGatewayAuthorizerRequestEvent, context):
user = get_user_by_token(event.get_header_value("Authorization"))

if user is None:
# No user was found, so we raised a not authorized error
raise UnAuthorizedError("Not authorized to perform this action")

# parse the `methodArn` as an `APIGatewayRouteArn`
arn = event.parsed_arn

# Create the response builder from parts of the `methodArn`
# and set the logged in user id and context
policy = APIGatewayAuthorizerResponse(
principal_id="user",
principal_id=user["id"],
context=user,
region=arn.region,
aws_account_id=arn.aws_account_id,
api_id=arn.api_id,
stage=arn.stage
stage=arn.stage,
)

if user is None:
# No user was found, so we return not authorized
policy.deny_all_routes()
return policy.asdict()

# Found the user and setting the details in the context
policy.context = user

# Conditional IAM Policy
if user.get("isAdmin", False):
policy.allow_all_routes()
Expand Down