From 8f6cb1714e822818dd5889e4cee612309d303388 Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Tue, 28 May 2024 22:42:16 +0100 Subject: [PATCH 01/10] Transforming List[Dict..List]]] in a hashed object --- .../event_handler/api_gateway.py | 5 ++- aws_lambda_powertools/event_handler/util.py | 21 ++++++++++ .../event_handler/test_openapi_security.py | 40 +++++++++++++++++++ 3 files changed, 64 insertions(+), 2 deletions(-) diff --git a/aws_lambda_powertools/event_handler/api_gateway.py b/aws_lambda_powertools/event_handler/api_gateway.py index abbeadc5c41..6beefeaa768 100644 --- a/aws_lambda_powertools/event_handler/api_gateway.py +++ b/aws_lambda_powertools/event_handler/api_gateway.py @@ -43,7 +43,7 @@ validation_error_definition, validation_error_response_definition, ) -from aws_lambda_powertools.event_handler.util import _FrozenDict, extract_origin_header +from aws_lambda_powertools.event_handler.util import _FrozenDict, _FrozenListDict, extract_origin_header from aws_lambda_powertools.shared.cookies import Cookie from aws_lambda_powertools.shared.functions import powertools_dev_is_set from aws_lambda_powertools.shared.json_encoder import Encoder @@ -2386,6 +2386,7 @@ def register_route(func: Callable): methods = (method,) if isinstance(method, str) else tuple(method) frozen_responses = _FrozenDict(responses) if responses else None frozen_tags = frozenset(tags) if tags else None + frozen_security = _FrozenListDict(security) if security else None route_key = ( rule, @@ -2400,7 +2401,7 @@ def register_route(func: Callable): frozen_tags, operation_id, include_in_schema, - security, + frozen_security, ) # Collate Middleware for routes diff --git a/aws_lambda_powertools/event_handler/util.py b/aws_lambda_powertools/event_handler/util.py index 6f2caf10858..aea78ccdb1c 100644 --- a/aws_lambda_powertools/event_handler/util.py +++ b/aws_lambda_powertools/event_handler/util.py @@ -18,6 +18,27 @@ def __hash__(self): return hash(frozenset(self.keys())) +class _FrozenListDict(list[dict[str, list[str]]]): + """ + Freezes a list of dictionaries containing lists of strings. + + This function takes a list of dictionaries where the values are lists of strings and converts it into + a frozen set of frozen sets of frozen dictionaries. This is done by iterating over the input list, + converting each dictionary's values (lists of strings) into frozen sets of strings, and then + converting the resulting dictionary into a frozen dictionary. Finally, all these frozen dictionaries + are collected into a frozen set of frozen sets. + + This operation is useful when you want to ensure the immutability of the data structure and make it + hashable, which is required for certain operations like using it as a key in a dictionary or as an + element in a set. + + Example: [{"TestAuth": ["test", "test1"]}] + """ + + def __hash__(self): + return hash(frozenset({_FrozenDict({key: frozenset(self) for key, self in item.items()}) for item in self})) + + def extract_origin_header(resolver_headers: Dict[str, Any]): """ Extracts the 'origin' or 'Origin' header from the provided resolver headers. diff --git a/tests/functional/event_handler/test_openapi_security.py b/tests/functional/event_handler/test_openapi_security.py index 7120a815edd..d0e7d6d53d8 100644 --- a/tests/functional/event_handler/test_openapi_security.py +++ b/tests/functional/event_handler/test_openapi_security.py @@ -1,16 +1,19 @@ import pytest from aws_lambda_powertools.event_handler import APIGatewayRestResolver +from aws_lambda_powertools.event_handler.api_gateway import Router from aws_lambda_powertools.event_handler.openapi.models import APIKey, APIKeyIn def test_openapi_top_level_security(): + # GIVEN an APIGatewayRestResolver instance app = APIGatewayRestResolver() @app.get("/") def handler(): raise NotImplementedError() + # WHEN the get_openapi_schema method is called with a security scheme schema = app.get_openapi_schema( security_schemes={ "apiKey": APIKey(name="X-API-KEY", description="API Key", in_=APIKeyIn.header), @@ -18,6 +21,7 @@ def handler(): security=[{"apiKey": []}], ) + # THEN the resulting schema should have security defined at the top level security = schema.security assert security is not None @@ -26,12 +30,15 @@ def handler(): def test_openapi_top_level_security_missing(): + # GIVEN an APIGatewayRestResolver instance app = APIGatewayRestResolver() @app.get("/") def handler(): raise NotImplementedError() + # WHEN the get_openapi_schema method is called with security defined without security schemes + # THEN a ValueError should be raised with pytest.raises(ValueError): app.get_openapi_schema( security=[{"apiKey": []}], @@ -39,18 +46,51 @@ def handler(): def test_openapi_operation_security(): + # GIVEN an APIGatewayRestResolver instance app = APIGatewayRestResolver() @app.get("/", security=[{"apiKey": []}]) def handler(): raise NotImplementedError() + # WHEN the get_openapi_schema method is called with security defined at the operation level schema = app.get_openapi_schema( security_schemes={ "apiKey": APIKey(name="X-API-KEY", description="API Key", in_=APIKeyIn.header), }, ) + # THEN the resulting schema should have security defined at the operation level, not the top level + security = schema.security + assert security is None + + operation = schema.paths["/"].get + security = operation.security + assert security is not None + + assert len(security) == 1 + assert security[0] == {"apiKey": []} + + +def test_openapi_operation_security_with_router(): + # GIVEN an APIGatewayRestResolver instance with a Router + app = APIGatewayRestResolver() + router = Router() + + @router.get("/", security=[{"apiKey": []}]) + def handler(): + raise NotImplementedError() + + app.include_router(router) + + # WHEN the get_openapi_schema method is called with security defined at the operation level in the Router + schema = app.get_openapi_schema( + security_schemes={ + "apiKey": APIKey(name="X-API-KEY", description="API Key", in_=APIKeyIn.header), + }, + ) + + # THEN the resulting schema should have security defined at the operation level security = schema.security assert security is None From 5705266ad61c2d1849745960dbc42a972ff6b2b7 Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Tue, 28 May 2024 23:11:14 +0100 Subject: [PATCH 02/10] mypy.... --- aws_lambda_powertools/event_handler/api_gateway.py | 1 + aws_lambda_powertools/event_handler/util.py | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/aws_lambda_powertools/event_handler/api_gateway.py b/aws_lambda_powertools/event_handler/api_gateway.py index 6beefeaa768..c00706ef125 100644 --- a/aws_lambda_powertools/event_handler/api_gateway.py +++ b/aws_lambda_powertools/event_handler/api_gateway.py @@ -703,6 +703,7 @@ def _openapi_operation_parameters( from aws_lambda_powertools.event_handler.openapi.params import Param parameters = [] + parameter: Dict[str, Any] for param in all_route_params: field_info = param.field_info field_info = cast(Param, field_info) diff --git a/aws_lambda_powertools/event_handler/util.py b/aws_lambda_powertools/event_handler/util.py index aea78ccdb1c..c7359dfacfc 100644 --- a/aws_lambda_powertools/event_handler/util.py +++ b/aws_lambda_powertools/event_handler/util.py @@ -1,4 +1,4 @@ -from typing import Any, Dict +from typing import Any, Dict, FrozenSet, List from aws_lambda_powertools.utilities.data_classes.shared_functions import get_header_value @@ -18,7 +18,7 @@ def __hash__(self): return hash(frozenset(self.keys())) -class _FrozenListDict(list[dict[str, list[str]]]): +class _FrozenListDict(List[Dict[str, List[str]]]): """ Freezes a list of dictionaries containing lists of strings. @@ -36,7 +36,7 @@ class _FrozenListDict(list[dict[str, list[str]]]): """ def __hash__(self): - return hash(frozenset({_FrozenDict({key: frozenset(self) for key, self in item.items()}) for item in self})) + return hash(FrozenSet({_FrozenDict({key: FrozenSet(self) for key, self in item.items()}) for item in self})) def extract_origin_header(resolver_headers: Dict[str, Any]): From 340003beabc6a23fe4d86bafacbc26b364d825b7 Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Tue, 28 May 2024 23:16:04 +0100 Subject: [PATCH 03/10] Typo --- aws_lambda_powertools/event_handler/util.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aws_lambda_powertools/event_handler/util.py b/aws_lambda_powertools/event_handler/util.py index c7359dfacfc..d61cd40d9e9 100644 --- a/aws_lambda_powertools/event_handler/util.py +++ b/aws_lambda_powertools/event_handler/util.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, FrozenSet, List +from typing import Any, Dict, List from aws_lambda_powertools.utilities.data_classes.shared_functions import get_header_value @@ -36,7 +36,7 @@ class _FrozenListDict(List[Dict[str, List[str]]]): """ def __hash__(self): - return hash(FrozenSet({_FrozenDict({key: FrozenSet(self) for key, self in item.items()}) for item in self})) + return hash(frozenset({_FrozenDict({key: frozenset(self) for key, self in item.items()}) for item in self})) def extract_origin_header(resolver_headers: Dict[str, Any]): From 6a6144642c8bb2b056e1bff43f234a2f0bea0c02 Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Thu, 6 Jun 2024 09:41:00 +0100 Subject: [PATCH 04/10] Addressing Heitor's feedback --- .../event_handler/test_openapi_security.py | 26 ++++++------------- 1 file changed, 8 insertions(+), 18 deletions(-) diff --git a/tests/functional/event_handler/test_openapi_security.py b/tests/functional/event_handler/test_openapi_security.py index d0e7d6d53d8..fbc1f40530a 100644 --- a/tests/functional/event_handler/test_openapi_security.py +++ b/tests/functional/event_handler/test_openapi_security.py @@ -61,15 +61,10 @@ def handler(): ) # THEN the resulting schema should have security defined at the operation level, not the top level - security = schema.security - assert security is None - - operation = schema.paths["/"].get - security = operation.security - assert security is not None - - assert len(security) == 1 - assert security[0] == {"apiKey": []} + top_level_security = schema.security + path_level_security = schema.paths["/"].get.security + assert top_level_security is None + assert path_level_security[0] == {"apiKey": []} def test_openapi_operation_security_with_router(): @@ -91,12 +86,7 @@ def handler(): ) # THEN the resulting schema should have security defined at the operation level - security = schema.security - assert security is None - - operation = schema.paths["/"].get - security = operation.security - assert security is not None - - assert len(security) == 1 - assert security[0] == {"apiKey": []} + top_level_security = schema.security + path_level_security = schema.paths["/"].get.security + assert top_level_security is None + assert path_level_security[0] == {"apiKey": []} From 694952f0d4c9f8ea67b4934c44fad75205497c6f Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Thu, 6 Jun 2024 13:25:54 +0100 Subject: [PATCH 05/10] Addressing Heitor's feedback --- aws_lambda_powertools/event_handler/util.py | 5 ++++- .../event_handler/test_openapi_security.py | 15 +++++---------- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/aws_lambda_powertools/event_handler/util.py b/aws_lambda_powertools/event_handler/util.py index d61cd40d9e9..ab6d68533df 100644 --- a/aws_lambda_powertools/event_handler/util.py +++ b/aws_lambda_powertools/event_handler/util.py @@ -36,7 +36,10 @@ class _FrozenListDict(List[Dict[str, List[str]]]): """ def __hash__(self): - return hash(frozenset({_FrozenDict({key: frozenset(self) for key, self in item.items()}) for item in self})) + hashable_items = [] + for item in self: + hashable_items.extend((key, frozenset(value)) for key, value in item.items()) + return hash(frozenset(hashable_items)) def extract_origin_header(resolver_headers: Dict[str, Any]): diff --git a/tests/functional/event_handler/test_openapi_security.py b/tests/functional/event_handler/test_openapi_security.py index fbc1f40530a..c39bac15f7f 100644 --- a/tests/functional/event_handler/test_openapi_security.py +++ b/tests/functional/event_handler/test_openapi_security.py @@ -32,6 +32,7 @@ def handler(): def test_openapi_top_level_security_missing(): # GIVEN an APIGatewayRestResolver instance app = APIGatewayRestResolver() + app.enable_swagger() @app.get("/") def handler(): @@ -48,17 +49,14 @@ def handler(): def test_openapi_operation_security(): # GIVEN an APIGatewayRestResolver instance app = APIGatewayRestResolver() + security_schemes = {"apiKey": APIKey(name="X-API-KEY", description="API Key", in_=APIKeyIn.header)} @app.get("/", security=[{"apiKey": []}]) def handler(): raise NotImplementedError() # WHEN the get_openapi_schema method is called with security defined at the operation level - schema = app.get_openapi_schema( - security_schemes={ - "apiKey": APIKey(name="X-API-KEY", description="API Key", in_=APIKeyIn.header), - }, - ) + schema = app.get_openapi_schema(security_schemes=security_schemes) # THEN the resulting schema should have security defined at the operation level, not the top level top_level_security = schema.security @@ -71,6 +69,7 @@ def test_openapi_operation_security_with_router(): # GIVEN an APIGatewayRestResolver instance with a Router app = APIGatewayRestResolver() router = Router() + security_schemes = {"apiKey": APIKey(name="X-API-KEY", description="API Key", in_=APIKeyIn.header)} @router.get("/", security=[{"apiKey": []}]) def handler(): @@ -79,11 +78,7 @@ def handler(): app.include_router(router) # WHEN the get_openapi_schema method is called with security defined at the operation level in the Router - schema = app.get_openapi_schema( - security_schemes={ - "apiKey": APIKey(name="X-API-KEY", description="API Key", in_=APIKeyIn.header), - }, - ) + schema = app.get_openapi_schema(security_schemes=security_schemes) # THEN the resulting schema should have security defined at the operation level top_level_security = schema.security From 0756711209fce6f7b4f6198faedc8e38b4a6a078 Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Thu, 6 Jun 2024 21:05:32 +0100 Subject: [PATCH 06/10] Refactoring logic + adding validation when using security in operation level --- .../event_handler/api_gateway.py | 29 +++++-- .../event_handler/openapi/exceptions.py | 6 ++ aws_lambda_powertools/event_handler/util.py | 42 ++++++++-- tests/functional/event_handler/conftest.py | 6 ++ .../event_handler/test_openapi_security.py | 76 ++++++++++++++----- 5 files changed, 129 insertions(+), 30 deletions(-) diff --git a/aws_lambda_powertools/event_handler/api_gateway.py b/aws_lambda_powertools/event_handler/api_gateway.py index c00706ef125..db132e3bcb6 100644 --- a/aws_lambda_powertools/event_handler/api_gateway.py +++ b/aws_lambda_powertools/event_handler/api_gateway.py @@ -33,7 +33,7 @@ from aws_lambda_powertools.event_handler import content_types from aws_lambda_powertools.event_handler.exceptions import NotFoundError, ServiceError from aws_lambda_powertools.event_handler.openapi.constants import DEFAULT_API_VERSION, DEFAULT_OPENAPI_VERSION -from aws_lambda_powertools.event_handler.openapi.exceptions import RequestValidationError +from aws_lambda_powertools.event_handler.openapi.exceptions import RequestValidationError, SchemaValidationError from aws_lambda_powertools.event_handler.openapi.types import ( COMPONENT_REF_PREFIX, METHODS_WITH_BODY, @@ -43,7 +43,12 @@ validation_error_definition, validation_error_response_definition, ) -from aws_lambda_powertools.event_handler.util import _FrozenDict, _FrozenListDict, extract_origin_header +from aws_lambda_powertools.event_handler.util import ( + _FrozenDict, + _FrozenListDict, + extract_origin_header, + validate_openapi_security_parameters, +) from aws_lambda_powertools.shared.cookies import Cookie from aws_lambda_powertools.shared.functions import powertools_dev_is_set from aws_lambda_powertools.shared.json_encoder import Encoder @@ -1589,6 +1594,15 @@ def get_openapi_schema( # Add routes to the OpenAPI schema for route in all_routes: + + if route.security and not validate_openapi_security_parameters( + security=route.security, + security_schemes=security_schemes, + ): + raise SchemaValidationError( + "Security configuration was not found in security_schemas or security_schema was not defined.", + ) + if not route.include_in_schema: continue @@ -1631,15 +1645,14 @@ def _get_openapi_security( security: Optional[List[Dict[str, List[str]]]], security_schemes: Optional[Dict[str, "SecurityScheme"]], ) -> Optional[List[Dict[str, List[str]]]]: + if not security: return None - if not security_schemes: - raise ValueError("security_schemes must be provided if security is provided") - - # Check if all keys in security are present in the security_schemes - if any(key not in security_schemes for sec in security for key in sec): - raise ValueError("Some security schemes not found in security_schemes") + if not validate_openapi_security_parameters(security=security, security_schemes=security_schemes): + raise SchemaValidationError( + "Security configuration was not found in security_schemas or security_schema was not defined.", + ) return security diff --git a/aws_lambda_powertools/event_handler/openapi/exceptions.py b/aws_lambda_powertools/event_handler/openapi/exceptions.py index fdd829ba9b1..5d81d3af439 100644 --- a/aws_lambda_powertools/event_handler/openapi/exceptions.py +++ b/aws_lambda_powertools/event_handler/openapi/exceptions.py @@ -21,3 +21,9 @@ class RequestValidationError(ValidationException): def __init__(self, errors: Sequence[Any], *, body: Any = None) -> None: super().__init__(errors) self.body = body + + +class SchemaValidationError(ValidationException): + """ + Raised when the OpenAPI schema validation fails + """ diff --git a/aws_lambda_powertools/event_handler/util.py b/aws_lambda_powertools/event_handler/util.py index ab6d68533df..8fe1178ce82 100644 --- a/aws_lambda_powertools/event_handler/util.py +++ b/aws_lambda_powertools/event_handler/util.py @@ -1,5 +1,6 @@ -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional +from aws_lambda_powertools.event_handler.openapi.models import SecurityScheme from aws_lambda_powertools.utilities.data_classes.shared_functions import get_header_value @@ -48,11 +49,15 @@ def extract_origin_header(resolver_headers: Dict[str, Any]): The 'origin' or 'Origin' header can be either a single header or a multi-header. - Args: - resolver_headers (Dict): A dictionary containing the headers. + Parameters + ---------- + resolver_headers: Dict + A dictionary containing the headers. - Returns: - Optional[str]: The value(s) of the origin header or None. + Returns + ------- + Optional[str] + The value(s) of the origin header or None. """ resolved_header = get_header_value( headers=resolver_headers, @@ -64,3 +69,30 @@ def extract_origin_header(resolver_headers: Dict[str, Any]): return resolved_header[0] return resolved_header + + +def validate_openapi_security_parameters( + security: List[Dict[str, List[str]]], + security_schemes: Optional[Dict[str, "SecurityScheme"]], +) -> bool: + """ + Validates the security parameters based on the provided security schemes. + + Parameters + ---------- + security: List[Dict[str, List[str]]] + A list of security requirements + security_schemes: Optional[Dict[str, "SecurityScheme"]] + A dictionary mapping security scheme names to their corresponding security scheme objects. + + Returns + ------- + bool + True if all security scheme names in the `security` parameter are present in the `security_schemes` parameter, + False otherwise. + + """ + + return bool( + security_schemes and all(key in security_schemes for sec in security for key in sec), + ) diff --git a/tests/functional/event_handler/conftest.py b/tests/functional/event_handler/conftest.py index 3897c26fd30..a099ae4cea5 100644 --- a/tests/functional/event_handler/conftest.py +++ b/tests/functional/event_handler/conftest.py @@ -3,6 +3,7 @@ import fastjsonschema import pytest +from aws_lambda_powertools.event_handler.openapi.models import APIKey, APIKeyIn from tests.functional.utils import load_event @@ -114,3 +115,8 @@ def openapi31_schema(): data, use_formats=False, ) + + +@pytest.fixture +def security_scheme(): + return {"apiKey": APIKey(name="X-API-KEY", description="API Key", in_=APIKeyIn.header)} diff --git a/tests/functional/event_handler/test_openapi_security.py b/tests/functional/event_handler/test_openapi_security.py index c39bac15f7f..f72b9362ed0 100644 --- a/tests/functional/event_handler/test_openapi_security.py +++ b/tests/functional/event_handler/test_openapi_security.py @@ -2,10 +2,10 @@ from aws_lambda_powertools.event_handler import APIGatewayRestResolver from aws_lambda_powertools.event_handler.api_gateway import Router -from aws_lambda_powertools.event_handler.openapi.models import APIKey, APIKeyIn +from aws_lambda_powertools.event_handler.openapi.exceptions import SchemaValidationError -def test_openapi_top_level_security(): +def test_openapi_top_level_security(security_scheme): # GIVEN an APIGatewayRestResolver instance app = APIGatewayRestResolver() @@ -14,12 +14,7 @@ def handler(): raise NotImplementedError() # WHEN the get_openapi_schema method is called with a security scheme - schema = app.get_openapi_schema( - security_schemes={ - "apiKey": APIKey(name="X-API-KEY", description="API Key", in_=APIKeyIn.header), - }, - security=[{"apiKey": []}], - ) + schema = app.get_openapi_schema(security_schemes=security_scheme, security=[{"apiKey": []}]) # THEN the resulting schema should have security defined at the top level security = schema.security @@ -32,31 +27,47 @@ def handler(): def test_openapi_top_level_security_missing(): # GIVEN an APIGatewayRestResolver instance app = APIGatewayRestResolver() - app.enable_swagger() @app.get("/") def handler(): raise NotImplementedError() # WHEN the get_openapi_schema method is called with security defined without security schemes - # THEN a ValueError should be raised - with pytest.raises(ValueError): + # THEN a SchemaValidationError should be raised + with pytest.raises(SchemaValidationError): app.get_openapi_schema( security=[{"apiKey": []}], ) -def test_openapi_operation_security(): +def test_openapi_top_level_security_mismatch(security_scheme): + # GIVEN an APIGatewayRestResolver instance + app = APIGatewayRestResolver() + + @app.get("/") + def handler(): + raise NotImplementedError() + + # WHEN the get_openapi_schema method is called with security defined security schemes as APIKey + # WHEN top level security is defined as HTTPBearer + # THEN a SchemaValidationError should be raised + with pytest.raises(SchemaValidationError): + app.get_openapi_schema( + security_schemes=security_scheme, + security=[{"HTTPBearer": []}], + ) + + +def test_openapi_operation_level_security(security_scheme): # GIVEN an APIGatewayRestResolver instance app = APIGatewayRestResolver() - security_schemes = {"apiKey": APIKey(name="X-API-KEY", description="API Key", in_=APIKeyIn.header)} @app.get("/", security=[{"apiKey": []}]) def handler(): raise NotImplementedError() # WHEN the get_openapi_schema method is called with security defined at the operation level - schema = app.get_openapi_schema(security_schemes=security_schemes) + schema = app.get_openapi_schema(security_schemes=security_scheme) # THEN the resulting schema should have security defined at the operation level, not the top level top_level_security = schema.security @@ -65,11 +76,42 @@ def handler(): assert path_level_security[0] == {"apiKey": []} -def test_openapi_operation_security_with_router(): +def test_openapi_operation_level_security_missing(): + # GIVEN an APIGatewayRestResolver instance + app = APIGatewayRestResolver() + + # WHEN we define a security in operation + @app.get("/", security=[{"apiKey": []}]) + def handler(): + raise NotImplementedError() + + # WHEN the get_openapi_schema method is called without security schemes defined + # THEN a SchemaValidationError should be raised + with pytest.raises(SchemaValidationError): + app.get_openapi_schema() + + +def test_openapi_operation_level_security_mismatch(security_scheme): + # GIVEN an APIGatewayRestResolver instance + app = APIGatewayRestResolver() + + # WHEN we define a security in operation with value HTTPBearer + @app.get("/", security=[{"HTTPBearer": []}]) + def handler(): + raise NotImplementedError() + + # WHEN the get_openapi_schema method is called with security defined security schemes as APIKey + # THEN a SchemaValidationError should be raised + with pytest.raises(SchemaValidationError): + app.get_openapi_schema( + security_schemes=security_scheme, + ) + + +def test_openapi_operation_level_security_with_router(security_scheme): # GIVEN an APIGatewayRestResolver instance with a Router app = APIGatewayRestResolver() router = Router() - security_schemes = {"apiKey": APIKey(name="X-API-KEY", description="API Key", in_=APIKeyIn.header)} @router.get("/", security=[{"apiKey": []}]) def handler(): @@ -78,7 +120,7 @@ def handler(): app.include_router(router) # WHEN the get_openapi_schema method is called with security defined at the operation level in the Router - schema = app.get_openapi_schema(security_schemes=security_schemes) + schema = app.get_openapi_schema(security_schemes=security_scheme) # THEN the resulting schema should have security defined at the operation level top_level_security = schema.security From ce070f348332c33e0d5a2cf2046d8b3bb9f04769 Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Fri, 7 Jun 2024 13:23:56 +0100 Subject: [PATCH 07/10] Addressing Heitor's feedback --- .../event_handler/api_gateway.py | 6 +++--- aws_lambda_powertools/event_handler/util.py | 18 ++++++++++-------- .../event_handler/test_openapi_security.py | 6 +++--- 3 files changed, 16 insertions(+), 14 deletions(-) diff --git a/aws_lambda_powertools/event_handler/api_gateway.py b/aws_lambda_powertools/event_handler/api_gateway.py index db132e3bcb6..d9735af4434 100644 --- a/aws_lambda_powertools/event_handler/api_gateway.py +++ b/aws_lambda_powertools/event_handler/api_gateway.py @@ -46,8 +46,8 @@ from aws_lambda_powertools.event_handler.util import ( _FrozenDict, _FrozenListDict, + _validate_openapi_security_parameters, extract_origin_header, - validate_openapi_security_parameters, ) from aws_lambda_powertools.shared.cookies import Cookie from aws_lambda_powertools.shared.functions import powertools_dev_is_set @@ -1595,7 +1595,7 @@ def get_openapi_schema( # Add routes to the OpenAPI schema for route in all_routes: - if route.security and not validate_openapi_security_parameters( + if route.security and not _validate_openapi_security_parameters( security=route.security, security_schemes=security_schemes, ): @@ -1649,7 +1649,7 @@ def _get_openapi_security( if not security: return None - if not validate_openapi_security_parameters(security=security, security_schemes=security_schemes): + if not _validate_openapi_security_parameters(security=security, security_schemes=security_schemes): raise SchemaValidationError( "Security configuration was not found in security_schemas or security_schema was not defined.", ) diff --git a/aws_lambda_powertools/event_handler/util.py b/aws_lambda_powertools/event_handler/util.py index 8fe1178ce82..1474961af55 100644 --- a/aws_lambda_powertools/event_handler/util.py +++ b/aws_lambda_powertools/event_handler/util.py @@ -71,12 +71,14 @@ def extract_origin_header(resolver_headers: Dict[str, Any]): return resolved_header -def validate_openapi_security_parameters( +def _validate_openapi_security_parameters( security: List[Dict[str, List[str]]], security_schemes: Optional[Dict[str, "SecurityScheme"]], ) -> bool: """ - Validates the security parameters based on the provided security schemes. + This function checks if all security requirements listed in the 'security' + parameter are defined in the 'security_schemes' dictionary, as specified + in the OpenAPI schema. Parameters ---------- @@ -88,11 +90,11 @@ def validate_openapi_security_parameters( Returns ------- bool - True if all security scheme names in the `security` parameter are present in the `security_schemes` parameter, - False otherwise. - + Whether list of security schemes match allowed security_schemes. """ - return bool( - security_schemes and all(key in security_schemes for sec in security for key in sec), - ) + security_schemes = security_schemes or {} + + security_schema_match = all(key in security_schemes for sec in security for key in sec) + + return bool(security_schema_match and security_schemes) diff --git a/tests/functional/event_handler/test_openapi_security.py b/tests/functional/event_handler/test_openapi_security.py index f72b9362ed0..9f7cc1c536d 100644 --- a/tests/functional/event_handler/test_openapi_security.py +++ b/tests/functional/event_handler/test_openapi_security.py @@ -49,7 +49,7 @@ def handler(): raise NotImplementedError() # WHEN the get_openapi_schema method is called with security defined security schemes as APIKey - # WHEN top level security is defined as HTTPBearer + # AND top level security is defined as HTTPBearer # THEN a SchemaValidationError should be raised with pytest.raises(SchemaValidationError): app.get_openapi_schema( @@ -80,7 +80,7 @@ def test_openapi_operation_level_security_missing(): # GIVEN an APIGatewayRestResolver instance app = APIGatewayRestResolver() - # WHEN we define a security in operation + # AND a route with a security scheme defined @app.get("/", security=[{"apiKey": []}]) def handler(): raise NotImplementedError() @@ -95,7 +95,7 @@ def test_openapi_operation_level_security_mismatch(security_scheme): # GIVEN an APIGatewayRestResolver instance app = APIGatewayRestResolver() - # WHEN we define a security in operation with value HTTPBearer + # AND a route with a security scheme using HTTPBearer @app.get("/", security=[{"HTTPBearer": []}]) def handler(): raise NotImplementedError() From 093905bffa89291b2db767d3af14c1c4b97d7dcc Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Fri, 7 Jun 2024 13:37:24 +0100 Subject: [PATCH 08/10] Addressing Heitor's feedback --- aws_lambda_powertools/event_handler/api_gateway.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/aws_lambda_powertools/event_handler/api_gateway.py b/aws_lambda_powertools/event_handler/api_gateway.py index d9735af4434..f82532f1f71 100644 --- a/aws_lambda_powertools/event_handler/api_gateway.py +++ b/aws_lambda_powertools/event_handler/api_gateway.py @@ -1600,7 +1600,8 @@ def get_openapi_schema( security_schemes=security_schemes, ): raise SchemaValidationError( - "Security configuration was not found in security_schemas or security_schema was not defined.", + "Security configuration was not found in security_schemas or security_schema was not defined. " + "See: https://docs.powertools.aws.dev/lambda/python/latest/core/event_handler/api_gateway/#security-schemes", ) if not route.include_in_schema: @@ -1651,7 +1652,8 @@ def _get_openapi_security( if not _validate_openapi_security_parameters(security=security, security_schemes=security_schemes): raise SchemaValidationError( - "Security configuration was not found in security_schemas or security_schema was not defined.", + "Security configuration was not found in security_schemas or security_schema was not defined. " + "See: https://docs.powertools.aws.dev/lambda/python/latest/core/event_handler/api_gateway/#security-schemes", ) return security From b9fb77b7c7576b9314609ef748bd1e1b6554bbd3 Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Fri, 7 Jun 2024 16:56:43 +0100 Subject: [PATCH 09/10] Addressing Heitor's feedback --- aws_lambda_powertools/event_handler/util.py | 2 +- docs/core/event_handler/api_gateway.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/aws_lambda_powertools/event_handler/util.py b/aws_lambda_powertools/event_handler/util.py index 1474961af55..60cb0f87b57 100644 --- a/aws_lambda_powertools/event_handler/util.py +++ b/aws_lambda_powertools/event_handler/util.py @@ -73,7 +73,7 @@ def extract_origin_header(resolver_headers: Dict[str, Any]): def _validate_openapi_security_parameters( security: List[Dict[str, List[str]]], - security_schemes: Optional[Dict[str, "SecurityScheme"]], + security_schemes: Optional[Dict[str, "SecurityScheme"]] = None, ) -> bool: """ This function checks if all security requirements listed in the 'security' diff --git a/docs/core/event_handler/api_gateway.md b/docs/core/event_handler/api_gateway.md index aa667f5f169..2b70fcfad05 100644 --- a/docs/core/event_handler/api_gateway.md +++ b/docs/core/event_handler/api_gateway.md @@ -1033,7 +1033,7 @@ Below is an example configuration for serving Swagger UI from a custom path or C No. Powertools adds support for generating OpenAPI documentation with [security schemes](https://swagger.io/docs/specification/authentication/), but it doesn't implement any of the security schemes itself, so you must implement the security mechanisms separately. OpenAPI uses the term security scheme for [authentication and authorization schemes](https://swagger.io/docs/specification/authentication/){target="_blank"}. -When you're describing your API, declare security schemes at the top level, and reference them globally or per operation. +When you're describing your API, declare security schemes at the top level, and reference them globally or per operation. Remember to have the security configurations defined in the `security_schemes`, otherwise a `SchemaValidationError` will be raised. === "Global OpenAPI security schemes" From e27bebfe54db84f3564f594920b8e19b40b3dee6 Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Fri, 7 Jun 2024 17:16:51 +0100 Subject: [PATCH 10/10] Addressing Heitor's feedback --- docs/core/event_handler/api_gateway.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/docs/core/event_handler/api_gateway.md b/docs/core/event_handler/api_gateway.md index 2b70fcfad05..0725ff6554c 100644 --- a/docs/core/event_handler/api_gateway.md +++ b/docs/core/event_handler/api_gateway.md @@ -1032,8 +1032,7 @@ Below is an example configuration for serving Swagger UI from a custom path or C ???-info "Does Powertools implement any of the security schemes?" No. Powertools adds support for generating OpenAPI documentation with [security schemes](https://swagger.io/docs/specification/authentication/), but it doesn't implement any of the security schemes itself, so you must implement the security mechanisms separately. -OpenAPI uses the term security scheme for [authentication and authorization schemes](https://swagger.io/docs/specification/authentication/){target="_blank"}. -When you're describing your API, declare security schemes at the top level, and reference them globally or per operation. Remember to have the security configurations defined in the `security_schemes`, otherwise a `SchemaValidationError` will be raised. +Security schemes are declared at the top-level first. You can reference them globally or on a per path _(operation)_ level. **However**, if you reference security schemes that are not defined at the top-level it will lead to a `SchemaValidationError` _(invalid OpenAPI spec)_. === "Global OpenAPI security schemes"