diff --git a/.flake8 b/.flake8 index 6c0c78fa967..c7d6486ed48 100644 --- a/.flake8 +++ b/.flake8 @@ -3,6 +3,9 @@ exclude = docs, .eggs, setup.py, example, .aws-sam, .git, dist, *.md, *.yaml, ex ignore = E203, E266, W503, BLK100, W291, I004 max-line-length = 120 max-complexity = 15 +per-file-ignores = + tests/e2e/utils/data_builder/__init__.py:F401 + tests/e2e/utils/data_fetcher/__init__.py:F401 [isort] multi_line_output = 3 diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py index 4be6a26c6a6..3865be4d3e7 100644 --- a/tests/e2e/conftest.py +++ b/tests/e2e/conftest.py @@ -5,6 +5,8 @@ import boto3 +from tests.e2e.utils import data_fetcher, infrastructure + # We only need typing_extensions for python versions <3.8 if sys.version_info >= (3, 8): from typing import TypedDict @@ -14,7 +16,6 @@ from typing import Dict, Generator, Optional import pytest -from e2e.utils import helpers, infrastructure class LambdaConfig(TypedDict): @@ -61,5 +62,5 @@ def execute_lambda(create_infrastructure) -> InfrastructureOutput: session = boto3.Session() client = session.client("lambda") for _, arn in create_infrastructure.items(): - helpers.trigger_lambda(lambda_arn=arn, client=client) + data_fetcher.get_lambda_response(lambda_arn=arn, client=client) return InfrastructureOutput(arns=create_infrastructure, execution_time=execution_time) diff --git a/tests/e2e/logger/test_logger.py b/tests/e2e/logger/test_logger.py index ea27b93740b..992cf779275 100644 --- a/tests/e2e/logger/test_logger.py +++ b/tests/e2e/logger/test_logger.py @@ -1,7 +1,8 @@ import boto3 import pytest from e2e import conftest -from e2e.utils import helpers + +from tests.e2e.utils import data_fetcher @pytest.fixture(scope="module") @@ -23,7 +24,7 @@ def test_basic_lambda_logs_visible(execute_lambda: conftest.InfrastructureOutput cw_client = boto3.client("logs") # WHEN - filtered_logs = helpers.get_logs(lambda_function_name=lambda_name, start_time=timestamp, log_client=cw_client) + filtered_logs = data_fetcher.get_logs(lambda_function_name=lambda_name, start_time=timestamp, log_client=cw_client) # THEN assert any( @@ -42,7 +43,7 @@ def test_basic_lambda_no_debug_logs_visible( cw_client = boto3.client("logs") # WHEN - filtered_logs = helpers.get_logs(lambda_function_name=lambda_name, start_time=timestamp, log_client=cw_client) + filtered_logs = data_fetcher.get_logs(lambda_function_name=lambda_name, start_time=timestamp, log_client=cw_client) # THEN assert not any( @@ -66,7 +67,7 @@ def test_basic_lambda_contextual_data_logged(execute_lambda: conftest.Infrastruc cw_client = boto3.client("logs") # WHEN - filtered_logs = helpers.get_logs(lambda_function_name=lambda_name, start_time=timestamp, log_client=cw_client) + filtered_logs = data_fetcher.get_logs(lambda_function_name=lambda_name, start_time=timestamp, log_client=cw_client) # THEN assert all(keys in logs.dict(exclude_unset=True) for logs in filtered_logs for keys in required_keys) @@ -81,7 +82,7 @@ def test_basic_lambda_additional_key_persistence_basic_lambda( cw_client = boto3.client("logs") # WHEN - filtered_logs = helpers.get_logs(lambda_function_name=lambda_name, start_time=timestamp, log_client=cw_client) + filtered_logs = data_fetcher.get_logs(lambda_function_name=lambda_name, start_time=timestamp, log_client=cw_client) # THEN assert any( @@ -100,7 +101,7 @@ def test_basic_lambda_empty_event_logged(execute_lambda: conftest.Infrastructure cw_client = boto3.client("logs") # WHEN - filtered_logs = helpers.get_logs(lambda_function_name=lambda_name, start_time=timestamp, log_client=cw_client) + filtered_logs = data_fetcher.get_logs(lambda_function_name=lambda_name, start_time=timestamp, log_client=cw_client) # THEN assert any(log.message == {} for log in filtered_logs) @@ -122,7 +123,7 @@ def test_no_context_lambda_contextual_data_not_logged(execute_lambda: conftest.I cw_client = boto3.client("logs") # WHEN - filtered_logs = helpers.get_logs(lambda_function_name=lambda_name, start_time=timestamp, log_client=cw_client) + filtered_logs = data_fetcher.get_logs(lambda_function_name=lambda_name, start_time=timestamp, log_client=cw_client) # THEN assert not any(keys in logs.dict(exclude_unset=True) for logs in filtered_logs for keys in required_missing_keys) @@ -136,7 +137,7 @@ def test_no_context_lambda_event_not_logged(execute_lambda: conftest.Infrastruct cw_client = boto3.client("logs") # WHEN - filtered_logs = helpers.get_logs(lambda_function_name=lambda_name, start_time=timestamp, log_client=cw_client) + filtered_logs = data_fetcher.get_logs(lambda_function_name=lambda_name, start_time=timestamp, log_client=cw_client) # THEN assert not any(log.message == {} for log in filtered_logs) diff --git a/tests/e2e/metrics/conftest.py b/tests/e2e/metrics/conftest.py index 0f4ca8e58c2..18f4564e714 100644 --- a/tests/e2e/metrics/conftest.py +++ b/tests/e2e/metrics/conftest.py @@ -10,8 +10,12 @@ def infrastructure(request: pytest.FixtureRequest, tmp_path_factory: pytest.Temp Parameters ---------- - request : fixtures.SubRequest - test fixture containing metadata about test execution + request : pytest.FixtureRequest + pytest request fixture to introspect absolute path to test being executed + tmp_path_factory : pytest.TempPathFactory + pytest temporary path factory to discover shared tmp when multiple CPU processes are spun up + worker_id : str + pytest-xdist worker identification to detect whether parallelization is enabled Yields ------ diff --git a/tests/e2e/metrics/test_metrics.py b/tests/e2e/metrics/test_metrics.py index f1a31bb3c82..01d1ba2fbf1 100644 --- a/tests/e2e/metrics/test_metrics.py +++ b/tests/e2e/metrics/test_metrics.py @@ -2,7 +2,7 @@ import pytest -from tests.e2e.utils import helpers +from tests.e2e.utils import data_builder, data_fetcher @pytest.fixture @@ -30,40 +30,40 @@ def cold_start_fn_arn(infrastructure: dict) -> str: def test_basic_lambda_metric_is_visible(basic_handler_fn: str, basic_handler_fn_arn: str): # GIVEN - metric_name = helpers.build_metric_name() - service = helpers.build_service_name() - dimensions = helpers.build_add_dimensions_input(service=service) - metrics = helpers.build_multiple_add_metric_input(metric_name=metric_name, value=1, quantity=3) + metric_name = data_builder.build_metric_name() + service = data_builder.build_service_name() + dimensions = data_builder.build_add_dimensions_input(service=service) + metrics = data_builder.build_multiple_add_metric_input(metric_name=metric_name, value=1, quantity=3) # WHEN event = json.dumps({"metrics": metrics, "service": service, "namespace": METRIC_NAMESPACE}) - _, execution_time = helpers.trigger_lambda(lambda_arn=basic_handler_fn_arn, payload=event) + _, execution_time = data_fetcher.get_lambda_response(lambda_arn=basic_handler_fn_arn, payload=event) - metrics = helpers.get_metrics( + my_metrics = data_fetcher.get_metrics( namespace=METRIC_NAMESPACE, start_date=execution_time, metric_name=metric_name, dimensions=dimensions ) # THEN - metric_data = metrics.get("Values", []) + metric_data = my_metrics.get("Values", []) assert metric_data and metric_data[0] == 3.0 def test_cold_start_metric(cold_start_fn_arn: str, cold_start_fn: str): # GIVEN metric_name = "ColdStart" - service = helpers.build_service_name() - dimensions = helpers.build_add_dimensions_input(function_name=cold_start_fn, service=service) + service = data_builder.build_service_name() + dimensions = data_builder.build_add_dimensions_input(function_name=cold_start_fn, service=service) # WHEN we invoke twice event = json.dumps({"service": service, "namespace": METRIC_NAMESPACE}) - _, execution_time = helpers.trigger_lambda(lambda_arn=cold_start_fn_arn, payload=event) - _, _ = helpers.trigger_lambda(lambda_arn=cold_start_fn_arn, payload=event) + _, execution_time = data_fetcher.get_lambda_response(lambda_arn=cold_start_fn_arn, payload=event) + _, _ = data_fetcher.get_lambda_response(lambda_arn=cold_start_fn_arn, payload=event) - metrics = helpers.get_metrics( + my_metrics = data_fetcher.get_metrics( namespace=METRIC_NAMESPACE, start_date=execution_time, metric_name=metric_name, dimensions=dimensions ) # THEN - metric_data = metrics.get("Values", []) + metric_data = my_metrics.get("Values", []) assert metric_data and metric_data[0] == 1.0 diff --git a/tests/e2e/tracer/conftest.py b/tests/e2e/tracer/conftest.py new file mode 100644 index 00000000000..599d7ab4ca8 --- /dev/null +++ b/tests/e2e/tracer/conftest.py @@ -0,0 +1,25 @@ +import pytest + +from tests.e2e.tracer.infrastructure import TracerStack +from tests.e2e.utils.infrastructure import deploy_once + + +@pytest.fixture(autouse=True, scope="module") +def infrastructure(request: pytest.FixtureRequest, tmp_path_factory: pytest.TempPathFactory, worker_id: str): + """Setup and teardown logic for E2E test infrastructure + + Parameters + ---------- + request : pytest.FixtureRequest + pytest request fixture to introspect absolute path to test being executed + tmp_path_factory : pytest.TempPathFactory + pytest temporary path factory to discover shared tmp when multiple CPU processes are spun up + worker_id : str + pytest-xdist worker identification to detect whether parallelization is enabled + + Yields + ------ + Dict[str, str] + CloudFormation Outputs from deployed infrastructure + """ + yield from deploy_once(stack=TracerStack, request=request, tmp_path_factory=tmp_path_factory, worker_id=worker_id) diff --git a/tests/e2e/tracer/handlers/async_capture.py b/tests/e2e/tracer/handlers/async_capture.py new file mode 100644 index 00000000000..b19840a6f69 --- /dev/null +++ b/tests/e2e/tracer/handlers/async_capture.py @@ -0,0 +1,16 @@ +import asyncio +from uuid import uuid4 + +from aws_lambda_powertools import Tracer +from aws_lambda_powertools.utilities.typing import LambdaContext + +tracer = Tracer() + + +@tracer.capture_method +async def async_get_users(): + return [{"id": f"{uuid4()}"} for _ in range(5)] + + +def lambda_handler(event: dict, context: LambdaContext): + return asyncio.run(async_get_users()) diff --git a/tests/e2e/tracer/handlers/basic_handler.py b/tests/e2e/tracer/handlers/basic_handler.py index d074b30796f..ba94c845ace 100644 --- a/tests/e2e/tracer/handlers/basic_handler.py +++ b/tests/e2e/tracer/handlers/basic_handler.py @@ -1,25 +1,16 @@ -import asyncio -import os +from uuid import uuid4 from aws_lambda_powertools import Tracer from aws_lambda_powertools.utilities.typing import LambdaContext -tracer = Tracer(service="e2e-tests-app") +tracer = Tracer() -ANNOTATION_KEY = os.environ["ANNOTATION_KEY"] -ANNOTATION_VALUE = os.environ["ANNOTATION_VALUE"] -ANNOTATION_ASYNC_VALUE = os.environ["ANNOTATION_ASYNC_VALUE"] + +@tracer.capture_method +def get_todos(): + return [{"id": f"{uuid4()}", "completed": False} for _ in range(5)] @tracer.capture_lambda_handler def lambda_handler(event: dict, context: LambdaContext): - tracer.put_annotation(key=ANNOTATION_KEY, value=ANNOTATION_VALUE) - tracer.put_metadata(key=ANNOTATION_KEY, value=ANNOTATION_VALUE) - return asyncio.run(collect_payment()) - - -@tracer.capture_method -async def collect_payment() -> str: - tracer.put_annotation(key=ANNOTATION_KEY, value=ANNOTATION_ASYNC_VALUE) - tracer.put_metadata(key=ANNOTATION_KEY, value=ANNOTATION_ASYNC_VALUE) - return "success" + return get_todos() diff --git a/tests/e2e/tracer/infrastructure.py b/tests/e2e/tracer/infrastructure.py new file mode 100644 index 00000000000..bd40fd2ca13 --- /dev/null +++ b/tests/e2e/tracer/infrastructure.py @@ -0,0 +1,17 @@ +from pathlib import Path + +from tests.e2e.utils.data_builder import build_service_name +from tests.e2e.utils.infrastructure import BaseInfrastructureV2 + + +class TracerStack(BaseInfrastructureV2): + # Maintenance: Tracer doesn't support dynamic service injection (tracer.py L310) + # we could move after handler response or adopt env vars usage in e2e tests + SERVICE_NAME: str = build_service_name() + + def __init__(self, handlers_dir: Path, feature_name: str = "tracer") -> None: + super().__init__(feature_name, handlers_dir) + + def create_resources(self) -> None: + env_vars = {"POWERTOOLS_SERVICE_NAME": self.SERVICE_NAME} + self.create_lambda_functions(function_props={"environment": env_vars}) diff --git a/tests/e2e/tracer/test_tracer.py b/tests/e2e/tracer/test_tracer.py index c2af4386749..06dde811ef1 100644 --- a/tests/e2e/tracer/test_tracer.py +++ b/tests/e2e/tracer/test_tracer.py @@ -1,51 +1,69 @@ -import datetime -import uuid - -import boto3 import pytest -from e2e import conftest -from e2e.utils import helpers + +from tests.e2e.tracer.handlers import async_capture, basic_handler +from tests.e2e.tracer.infrastructure import TracerStack +from tests.e2e.utils import data_builder, data_fetcher + + +@pytest.fixture +def basic_handler_fn_arn(infrastructure: dict) -> str: + return infrastructure.get("BasicHandlerArn", "") + + +@pytest.fixture +def basic_handler_fn(infrastructure: dict) -> str: + return infrastructure.get("BasicHandler", "") + + +@pytest.fixture +def async_fn_arn(infrastructure: dict) -> str: + return infrastructure.get("AsyncCaptureArn", "") + + +@pytest.fixture +def async_fn(infrastructure: dict) -> str: + return infrastructure.get("AsyncCapture", "") -@pytest.fixture(scope="module") -def config() -> conftest.LambdaConfig: - return { - "parameters": {"tracing": "ACTIVE"}, - "environment_variables": { - "ANNOTATION_KEY": f"e2e-tracer-{str(uuid.uuid4()).replace('-','_')}", - "ANNOTATION_VALUE": "stored", - "ANNOTATION_ASYNC_VALUE": "payments", - }, - } +def test_lambda_handler_trace_is_visible(basic_handler_fn_arn: str, basic_handler_fn: str): + # GIVEN + handler_name = basic_handler.lambda_handler.__name__ + handler_subsegment = f"## {handler_name}" + handler_metadata_key = f"{handler_name} response" + + method_name = basic_handler.get_todos.__name__ + method_subsegment = f"## {method_name}" + handler_metadata_key = f"{method_name} response" + + trace_query = data_builder.build_trace_default_query(function_name=basic_handler_fn) + + # WHEN + _, execution_time = data_fetcher.get_lambda_response(lambda_arn=basic_handler_fn_arn) + data_fetcher.get_lambda_response(lambda_arn=basic_handler_fn_arn) + + # THEN + trace = data_fetcher.get_traces(start_date=execution_time, filter_expression=trace_query, minimum_traces=2) + + assert len(trace.get_annotation(key="ColdStart", value=True)) == 1 + assert len(trace.get_metadata(key=handler_metadata_key, namespace=TracerStack.SERVICE_NAME)) == 2 + assert len(trace.get_metadata(key=handler_metadata_key, namespace=TracerStack.SERVICE_NAME)) == 2 + assert len(trace.get_subsegment(name=handler_subsegment)) == 2 + assert len(trace.get_subsegment(name=method_subsegment)) == 2 -def test_basic_lambda_async_trace_visible(execute_lambda: conftest.InfrastructureOutput, config: conftest.LambdaConfig): +def test_async_trace_is_visible(async_fn_arn: str, async_fn: str): # GIVEN - lambda_name = execute_lambda.get_lambda_function_name(cf_output_name="basichandlerarn") - start_date = execute_lambda.get_lambda_execution_time() - end_date = start_date + datetime.timedelta(minutes=5) - trace_filter_exporession = f'service("{lambda_name}")' + async_fn_name = async_capture.async_get_users.__name__ + async_fn_name_subsegment = f"## {async_fn_name}" + async_fn_name_metadata_key = f"{async_fn_name} response" + + trace_query = data_builder.build_trace_default_query(function_name=async_fn) # WHEN - trace = helpers.get_traces( - start_date=start_date, - end_date=end_date, - filter_expression=trace_filter_exporession, - xray_client=boto3.client("xray"), - ) + _, execution_time = data_fetcher.get_lambda_response(lambda_arn=async_fn_arn) # THEN - info = helpers.find_trace_additional_info(trace=trace) - print(info) - handler_trace_segment = [trace_segment for trace_segment in info if trace_segment.name == "## lambda_handler"][0] - collect_payment_trace_segment = [ - trace_segment for trace_segment in info if trace_segment.name == "## collect_payment" - ][0] - - annotation_key = config["environment_variables"]["ANNOTATION_KEY"] - expected_value = config["environment_variables"]["ANNOTATION_VALUE"] - expected_async_value = config["environment_variables"]["ANNOTATION_ASYNC_VALUE"] - - assert handler_trace_segment.annotations["Service"] == "e2e-tests-app" - assert handler_trace_segment.metadata["e2e-tests-app"][annotation_key] == expected_value - assert collect_payment_trace_segment.metadata["e2e-tests-app"][annotation_key] == expected_async_value + trace = data_fetcher.get_traces(start_date=execution_time, filter_expression=trace_query) + + assert len(trace.get_subsegment(name=async_fn_name_subsegment)) == 1 + assert len(trace.get_metadata(key=async_fn_name_metadata_key, namespace=TracerStack.SERVICE_NAME)) == 1 diff --git a/tests/e2e/utils/asset.py b/tests/e2e/utils/asset.py index 0bc7b5dfabe..04d368a6ff4 100644 --- a/tests/e2e/utils/asset.py +++ b/tests/e2e/utils/asset.py @@ -2,18 +2,45 @@ import json import zipfile from pathlib import Path -from typing import List, Optional +from typing import Dict, List, Optional import boto3 import botocore.exceptions from mypy_boto3_s3 import S3Client +from pydantic import BaseModel, Field from aws_lambda_powertools import Logger -from tests.e2e.utils.models import AssetTemplateConfig, TemplateAssembly logger = Logger(service="e2e-utils") +class AssetManifest(BaseModel): + path: str + packaging: str + + +class AssetTemplateConfigDestinationsAccount(BaseModel): + bucket_name: str = Field(str, alias="bucketName") + object_key: str = Field(str, alias="objectKey") + assume_role_arn: str = Field(str, alias="assumeRoleArn") + + +class AssetTemplateConfigDestinations(BaseModel): + current_account_current_region: AssetTemplateConfigDestinationsAccount = Field( + AssetTemplateConfigDestinationsAccount, alias="current_account-current_region" + ) + + +class AssetTemplateConfig(BaseModel): + source: AssetManifest + destinations: AssetTemplateConfigDestinations + + +class TemplateAssembly(BaseModel): + version: str + files: Dict[str, AssetTemplateConfig] + + class Asset: def __init__( self, config: AssetTemplateConfig, account_id: str, region: str, boto3_client: Optional[S3Client] = None diff --git a/tests/e2e/utils/data_builder/__init__.py b/tests/e2e/utils/data_builder/__init__.py new file mode 100644 index 00000000000..72c216faa76 --- /dev/null +++ b/tests/e2e/utils/data_builder/__init__.py @@ -0,0 +1,13 @@ +from tests.e2e.utils.data_builder.common import build_random_value, build_service_name +from tests.e2e.utils.data_builder.metrics import ( + build_add_dimensions_input, + build_add_metric_input, + build_metric_name, + build_metric_query_data, + build_multiple_add_metric_input, +) +from tests.e2e.utils.data_builder.traces import ( + build_put_annotations_input, + build_put_metadata_input, + build_trace_default_query, +) diff --git a/tests/e2e/utils/data_builder/common.py b/tests/e2e/utils/data_builder/common.py new file mode 100644 index 00000000000..f28778ffed3 --- /dev/null +++ b/tests/e2e/utils/data_builder/common.py @@ -0,0 +1,9 @@ +import secrets + + +def build_service_name() -> str: + return f"test_service{build_random_value()}" + + +def build_random_value(nbytes: int = 10) -> str: + return secrets.token_urlsafe(nbytes).replace("-", "") diff --git a/tests/e2e/utils/data_builder/metrics.py b/tests/e2e/utils/data_builder/metrics.py new file mode 100644 index 00000000000..d14f4ae3567 --- /dev/null +++ b/tests/e2e/utils/data_builder/metrics.py @@ -0,0 +1,116 @@ +from typing import Dict, List, Optional + +from mypy_boto3_cloudwatch.type_defs import DimensionTypeDef, MetricDataQueryTypeDef + +from aws_lambda_powertools.metrics import MetricUnit +from tests.e2e.utils.data_builder.common import build_random_value + + +def build_metric_query_data( + namespace: str, + metric_name: str, + period: int = 60, + stat: str = "Sum", + dimensions: Optional[List[DimensionTypeDef]] = None, +) -> List[MetricDataQueryTypeDef]: + """Create input for CloudWatch GetMetricData API call + + Parameters + ---------- + namespace : str + Metric namespace to search for + metric_name : str + Metric name to search for + period : int, optional + Time period in seconds to search metrics, by default 60 + stat : str, optional + Aggregate function to use for results, by default "Sum" + dimensions : Optional[List[DimensionTypeDef]], optional + Metric dimensions to search for, by default None + + Returns + ------- + List[MetricDataQueryTypeDef] + _description_ + """ + dimensions = dimensions or [] + data_query: List[MetricDataQueryTypeDef] = [ + { + "Id": metric_name.lower(), + "MetricStat": { + "Metric": {"Namespace": namespace, "MetricName": metric_name}, + "Period": period, + "Stat": stat, + }, + "ReturnData": True, + } + ] + + if dimensions: + data_query[0]["MetricStat"]["Metric"]["Dimensions"] = dimensions + + return data_query + + +def build_add_metric_input(metric_name: str, value: float, unit: str = MetricUnit.Count.value) -> Dict: + """Create a metric input to be used with Metrics.add_metric() + + Parameters + ---------- + metric_name : str + metric name + value : float + metric value + unit : str, optional + metric unit, by default Count + + Returns + ------- + Dict + Metric input + """ + return {"name": metric_name, "unit": unit, "value": value} + + +def build_multiple_add_metric_input( + metric_name: str, value: float, unit: str = MetricUnit.Count.value, quantity: int = 1 +) -> List[Dict]: + """Create list of metrics input to be used with Metrics.add_metric() + + Parameters + ---------- + metric_name : str + metric name + value : float + metric value + unit : str, optional + metric unit, by default Count + quantity : int, optional + number of metrics to be created, by default 1 + + Returns + ------- + List[Dict] + List of metrics + """ + return [{"name": metric_name, "unit": unit, "value": value} for _ in range(quantity)] + + +def build_add_dimensions_input(**dimensions) -> List[DimensionTypeDef]: + """Create dimensions input to be used with either get_metrics or Metrics.add_dimension() + + Parameters + ---------- + dimensions : str + key=value pair as dimension + + Returns + ------- + List[DimensionTypeDef] + Metric dimension input + """ + return [{"Name": name, "Value": value} for name, value in dimensions.items()] + + +def build_metric_name() -> str: + return f"test_metric{build_random_value()}" diff --git a/tests/e2e/utils/data_builder/traces.py b/tests/e2e/utils/data_builder/traces.py new file mode 100644 index 00000000000..59350c8ff68 --- /dev/null +++ b/tests/e2e/utils/data_builder/traces.py @@ -0,0 +1,39 @@ +from typing import Any, Dict, List, Optional + + +def build_trace_default_query(function_name: str) -> str: + return f'service("{function_name}")' + + +def build_put_annotations_input(**annotations: str) -> List[Dict]: + """Create trace annotations input to be used with Tracer.put_annotation() + + Parameters + ---------- + annotations : str + annotations in key=value form + + Returns + ------- + List[Dict] + List of put annotations input + """ + return [{"key": key, "value": value} for key, value in annotations.items()] + + +def build_put_metadata_input(namespace: Optional[str] = None, **metadata: Any) -> List[Dict]: + """Create trace metadata input to be used with Tracer.put_metadata() + + All metadata will be under `test` namespace + + Parameters + ---------- + metadata : Any + metadata in key=value form + + Returns + ------- + List[Dict] + List of put metadata input + """ + return [{"key": key, "value": value, "namespace": namespace} for key, value in metadata.items()] diff --git a/tests/e2e/utils/data_fetcher/__init__.py b/tests/e2e/utils/data_fetcher/__init__.py new file mode 100644 index 00000000000..43024f9946f --- /dev/null +++ b/tests/e2e/utils/data_fetcher/__init__.py @@ -0,0 +1,4 @@ +from tests.e2e.utils.data_fetcher.common import get_lambda_response +from tests.e2e.utils.data_fetcher.logs import get_logs +from tests.e2e.utils.data_fetcher.metrics import get_metrics +from tests.e2e.utils.data_fetcher.traces import get_traces diff --git a/tests/e2e/utils/data_fetcher/common.py b/tests/e2e/utils/data_fetcher/common.py new file mode 100644 index 00000000000..2de8838dc74 --- /dev/null +++ b/tests/e2e/utils/data_fetcher/common.py @@ -0,0 +1,15 @@ +from datetime import datetime +from typing import Optional, Tuple + +import boto3 +from mypy_boto3_lambda import LambdaClient +from mypy_boto3_lambda.type_defs import InvocationResponseTypeDef + + +def get_lambda_response( + lambda_arn: str, payload: Optional[str] = None, client: Optional[LambdaClient] = None +) -> Tuple[InvocationResponseTypeDef, datetime]: + client = client or boto3.client("lambda") + payload = payload or "" + execution_time = datetime.utcnow() + return client.invoke(FunctionName=lambda_arn, InvocationType="RequestResponse", Payload=payload), execution_time diff --git a/tests/e2e/utils/data_fetcher/logs.py b/tests/e2e/utils/data_fetcher/logs.py new file mode 100644 index 00000000000..e8211eeea30 --- /dev/null +++ b/tests/e2e/utils/data_fetcher/logs.py @@ -0,0 +1,39 @@ +import json +from functools import lru_cache +from typing import List, Optional, Union + +from mypy_boto3_cloudwatch import CloudWatchClient +from pydantic import BaseModel +from retry import retry + + +class Log(BaseModel): + level: str + location: str + message: Union[dict, str] + timestamp: str + service: str + cold_start: Optional[bool] + function_name: Optional[str] + function_memory_size: Optional[str] + function_arn: Optional[str] + function_request_id: Optional[str] + xray_trace_id: Optional[str] + extra_info: Optional[str] + + +@lru_cache(maxsize=10, typed=False) +@retry(ValueError, delay=1, jitter=1, tries=20) +def get_logs(lambda_function_name: str, log_client: CloudWatchClient, start_time: int, **kwargs: dict) -> List[Log]: + response = log_client.filter_log_events(logGroupName=f"/aws/lambda/{lambda_function_name}", startTime=start_time) + if not response["events"]: + raise ValueError("Empty response from Cloudwatch Logs. Repeating...") + filtered_logs = [] + for event in response["events"]: + try: + message = Log(**json.loads(event["message"])) + except json.decoder.JSONDecodeError: + continue + filtered_logs.append(message) + + return filtered_logs diff --git a/tests/e2e/utils/data_fetcher/metrics.py b/tests/e2e/utils/data_fetcher/metrics.py new file mode 100644 index 00000000000..5a017f0a845 --- /dev/null +++ b/tests/e2e/utils/data_fetcher/metrics.py @@ -0,0 +1,71 @@ +from datetime import datetime, timedelta +from typing import List, Optional + +import boto3 +from mypy_boto3_cloudwatch import CloudWatchClient +from mypy_boto3_cloudwatch.type_defs import DimensionTypeDef, MetricDataResultTypeDef +from retry import retry + +from tests.e2e.utils.data_builder import build_metric_query_data + + +@retry(ValueError, delay=2, jitter=1.5, tries=10) +def get_metrics( + namespace: str, + start_date: datetime, + metric_name: str, + dimensions: Optional[List[DimensionTypeDef]] = None, + cw_client: Optional[CloudWatchClient] = None, + end_date: Optional[datetime] = None, + period: int = 60, + stat: str = "Sum", +) -> MetricDataResultTypeDef: + """Fetch CloudWatch Metrics + + It takes into account eventual consistency with up to 10 retries and 1.5s jitter. + + Parameters + ---------- + namespace : str + Metric Namespace + start_date : datetime + Start window to fetch metrics + metric_name : str + Metric name + dimensions : Optional[List[DimensionTypeDef]], optional + List of Metric Dimension, by default None + cw_client : Optional[CloudWatchClient], optional + Boto3 CloudWatch low-level client (boto3.client("cloudwatch"), by default None + end_date : Optional[datetime], optional + End window to fetch metrics, by default start_date + 2 minutes window + period : int, optional + Time period to fetch metrics for, by default 60 + stat : str, optional + Aggregation function to use when fetching metrics, by default "Sum" + + Returns + ------- + MetricDataResultTypeDef + Dict with metric values found + + Raises + ------ + ValueError + When no metric is found within retry window + """ + cw_client = cw_client or boto3.client("cloudwatch") + end_date = end_date or start_date + timedelta(minutes=2) + + metric_query = build_metric_query_data( + namespace=namespace, metric_name=metric_name, period=period, stat=stat, dimensions=dimensions + ) + + response = cw_client.get_metric_data( + MetricDataQueries=metric_query, + StartTime=start_date, + EndTime=end_date or datetime.utcnow(), + ) + result = response["MetricDataResults"][0] + if not result["Values"]: + raise ValueError("Empty response from Cloudwatch. Repeating...") + return result diff --git a/tests/e2e/utils/data_fetcher/traces.py b/tests/e2e/utils/data_fetcher/traces.py new file mode 100644 index 00000000000..827109112df --- /dev/null +++ b/tests/e2e/utils/data_fetcher/traces.py @@ -0,0 +1,267 @@ +import json +from datetime import datetime, timedelta +from typing import Any, Dict, Generator, List, Optional + +import boto3 +from botocore.paginate import PageIterator +from mypy_boto3_xray.client import XRayClient +from mypy_boto3_xray.type_defs import TraceSummaryTypeDef +from pydantic import BaseModel +from retry import retry + + +class TraceSubsegment(BaseModel): + id: str # noqa: A003 VNE003 # id is a field we can't change + name: str + start_time: float + end_time: float + aws: Optional[dict] + subsegments: Optional[List["TraceSubsegment"]] + annotations: Optional[Dict[str, Any]] + metadata: Optional[Dict[str, Dict[str, Any]]] + + +class TraceDocument(BaseModel): + id: str # noqa: A003 VNE003 # id is a field we can't change + name: str + start_time: float + end_time: float + trace_id: str + parent_id: Optional[str] + aws: Dict + origin: str + subsegments: Optional[List[TraceSubsegment]] + + +class TraceFetcher: + default_exclude_seg_name: List = ["Initialization", "Invocation", "Overhead"] + + def __init__( + self, + filter_expression: str, + start_date: datetime, + end_date: Optional[datetime] = None, + xray_client: Optional[XRayClient] = None, + exclude_segment_name: Optional[List[str]] = None, + resource_name: Optional[List[str]] = None, + origin: Optional[List[str]] = None, + minimum_traces: int = 1, + ): + """Fetch and expose traces from X-Ray based on parameters + + Data is recursively fetched in the following order: + + * Trace summaries + * Trace IDs + * Traces + * Segments + * Subsegments + * Nested Subsegments + + Parameters + ---------- + filter_expression : str + AWS X-Ray Filter Expressions + see: https://docs.aws.amazon.com/xray/latest/devguide/xray-console-filters.html + start_date : datetime + Start date range to filter traces + end_date : Optional[datetime], optional + End date range to filter traces, by default 5 minutes past start_date + xray_client : Optional[XRayClient], optional + AWS X-Ray SDK Client, by default boto3.client('xray') + exclude_segment_name : Optional[List[str]], optional + Name of segments to exclude, by default ["Initialization", "Invocation", "Overhead"] + resource_name : Optional[List[str]], optional + Name of resource to filter traces (e.g., function name), by default None + origin : Optional[List[str]], optional + Trace origin name to filter traces, by default ["AWS::Lambda::Function"] + minimum_traces : int + Minimum number of traces to be retrieved before exhausting retry attempts + """ + self.filter_expression = filter_expression + self.start_date = start_date + self.end_date = end_date or self.start_date + timedelta(minutes=5) + self.xray_client: XRayClient = xray_client or boto3.client("xray") + self.trace_documents: Dict[str, TraceDocument] = {} + self.subsegments: List[TraceSubsegment] = [] + self.exclude_segment_name = exclude_segment_name or self.default_exclude_seg_name + self.resource_name = resource_name + self.origin = origin or ["AWS::Lambda::Function"] + self.annotations: List[Dict[str, Any]] = [] + self.metadata: List[Dict[str, Dict[str, Any]]] = [] + self.minimum_traces = minimum_traces + + paginator = self.xray_client.get_paginator("get_trace_summaries") + pages = paginator.paginate( + StartTime=self.start_date, + EndTime=self.end_date, + TimeRangeType="Event", + Sampling=False, + FilterExpression=self.filter_expression, + ) + + trace_ids = self._get_trace_ids(pages) + self.trace_documents = self._get_trace_documents(trace_ids) + self.subsegments = self._get_subsegments() + + def get_annotation(self, key: str, value: Optional[any] = None) -> List: + return [ + annotation + for annotation in self.annotations + if (value is not None and annotation.get(key) == value) or (value is None and key in annotation) + ] + + def get_metadata(self, key: str, namespace: str = "") -> List[Dict[str, Any]]: + seen = [] + for meta in self.metadata: + metadata = meta.get(namespace, {}) + if key in metadata: + seen.append(metadata) + return seen + + def get_subsegment(self, name: str) -> List: + return [seg for seg in self.subsegments if seg.name == name] + + def _find_nested_subsegments(self, subsegments: List[TraceSubsegment]) -> Generator[TraceSubsegment, None, None]: + """Recursively yield any subsegment that we might be interested. + + It excludes any subsegments contained in exclude_segment_name. + Since these are nested, subsegment name might be '## lambda_handler'. + + It also populates annotations and metadata nested in subsegments. + + Parameters + ---------- + subsegment : TraceSubsegment + subsegment to traverse + seen : List + list of subsegments to be updated + """ + for seg in subsegments: + if seg.name not in self.exclude_segment_name: + if seg.annotations: + self.annotations.append(seg.annotations) + + if seg.metadata: + self.metadata.append(seg.metadata) + + yield seg + + if seg.subsegments: + # recursively iterate over any arbitrary number of subsegments + yield from self._find_nested_subsegments(seg.subsegments) + + def _get_subsegments(self) -> List[TraceSubsegment]: + """Find subsegments and potentially any nested subsegments + + It excludes any subsegments contained in exclude_segment_name. + Since these are top-level, subsegment name might be 'Overhead/Invocation, etc.'. + + Returns + ------- + List[TraceSubsegment] + List of subsegments + """ + seen = [] + for document in self.trace_documents.values(): + if document.subsegments: + seen.extend(self._find_nested_subsegments(document.subsegments)) + + return seen + + def _get_trace_ids(self, pages: PageIterator) -> List[str]: + """Get list of trace IDs found + + Parameters + ---------- + pages : PageIterator + Paginated streaming response from AWS X-Ray + + Returns + ------- + List[str] + Trace IDs + + Raises + ------ + ValueError + When no traces are available within time range and filter expression + """ + summaries: List[TraceSummaryTypeDef] = [trace["TraceSummaries"] for trace in pages if trace["TraceSummaries"]] + if not summaries: + raise ValueError("Empty response from X-Ray. Repeating...") + + trace_ids = [trace["Id"] for trace in summaries[0]] # type: ignore[index] # TypedDict not being recognized + if len(trace_ids) < self.minimum_traces: + raise ValueError( + f"Number of traces found doesn't meet minimum required ({self.minimum_traces}). Repeating..." + ) + + return trace_ids + + def _get_trace_documents(self, trace_ids: List[str]) -> Dict[str, TraceDocument]: + """Find trace documents available in each trace segment + + Returns + ------- + Dict[str, TraceDocument] + Trace documents grouped by their ID + """ + traces = self.xray_client.batch_get_traces(TraceIds=trace_ids) + documents: Dict = {} + segments = [seg for trace in traces["Traces"] for seg in trace["Segments"]] + for seg in segments: + trace_document = TraceDocument(**json.loads(seg["Document"])) + if trace_document.origin in self.origin or trace_document.name == self.resource_name: + documents[trace_document.id] = trace_document + return documents + + +@retry(ValueError, delay=5, jitter=1.5, tries=10) +def get_traces( + filter_expression: str, + start_date: datetime, + end_date: Optional[datetime] = None, + xray_client: Optional[XRayClient] = None, + exclude_segment_name: Optional[List[str]] = None, + resource_name: Optional[List[str]] = None, + origin: Optional[List[str]] = None, + minimum_traces: int = 1, +) -> TraceFetcher: + """Fetch traces from AWS X-Ray + + Parameters + ---------- + filter_expression : str + AWS X-Ray Filter Expressions + see: https://docs.aws.amazon.com/xray/latest/devguide/xray-console-filters.html + start_date : datetime + Start date range to filter traces + end_date : Optional[datetime], optional + End date range to filter traces, by default 5 minutes past start_date + xray_client : Optional[XRayClient], optional + AWS X-Ray SDK Client, by default boto3.client('xray') + exclude_segment_name : Optional[List[str]], optional + Name of segments to exclude, by default ["Initialization", "Invocation", "Overhead"] + resource_name : Optional[List[str]], optional + Name of resource to filter traces (e.g., function name), by default None + origin : Optional[List[str]], optional + Trace origin name to filter traces, by default ["AWS::Lambda::Function"] + minimum_traces : int + Minimum number of traces to be retrieved before exhausting retry attempts + + Returns + ------- + TraceFetcher + TraceFetcher instance with trace data available as properties and methods + """ + return TraceFetcher( + filter_expression=filter_expression, + start_date=start_date, + end_date=end_date, + xray_client=xray_client, + exclude_segment_name=exclude_segment_name, + resource_name=resource_name, + origin=origin, + minimum_traces=minimum_traces, + ) diff --git a/tests/e2e/utils/helpers.py b/tests/e2e/utils/helpers.py deleted file mode 100644 index 6827ac12d90..00000000000 --- a/tests/e2e/utils/helpers.py +++ /dev/null @@ -1,276 +0,0 @@ -import json -import secrets -from datetime import datetime, timedelta -from functools import lru_cache -from typing import Dict, List, Optional, Tuple, Union - -import boto3 -from mypy_boto3_cloudwatch.client import CloudWatchClient -from mypy_boto3_cloudwatch.type_defs import DimensionTypeDef, MetricDataQueryTypeDef, MetricDataResultTypeDef -from mypy_boto3_lambda.client import LambdaClient -from mypy_boto3_lambda.type_defs import InvocationResponseTypeDef -from mypy_boto3_xray.client import XRayClient -from pydantic import BaseModel -from retry import retry - -# Helper methods && Class -from aws_lambda_powertools.metrics import MetricUnit - - -class Log(BaseModel): - level: str - location: str - message: Union[dict, str] - timestamp: str - service: str - cold_start: Optional[bool] - function_name: Optional[str] - function_memory_size: Optional[str] - function_arn: Optional[str] - function_request_id: Optional[str] - xray_trace_id: Optional[str] - extra_info: Optional[str] - - -class TraceSegment(BaseModel): - name: str - metadata: Dict = {} - annotations: Dict = {} - - -def trigger_lambda( - lambda_arn: str, payload: str, client: Optional[LambdaClient] = None -) -> Tuple[InvocationResponseTypeDef, datetime]: - client = client or boto3.client("lambda") - execution_time = datetime.utcnow() - return client.invoke(FunctionName=lambda_arn, InvocationType="RequestResponse", Payload=payload), execution_time - - -@lru_cache(maxsize=10, typed=False) -@retry(ValueError, delay=1, jitter=1, tries=20) -def get_logs(lambda_function_name: str, log_client: CloudWatchClient, start_time: int, **kwargs: dict) -> List[Log]: - response = log_client.filter_log_events(logGroupName=f"/aws/lambda/{lambda_function_name}", startTime=start_time) - if not response["events"]: - raise ValueError("Empty response from Cloudwatch Logs. Repeating...") - filtered_logs = [] - for event in response["events"]: - try: - message = Log(**json.loads(event["message"])) - except json.decoder.JSONDecodeError: - continue - filtered_logs.append(message) - - return filtered_logs - - -@retry(ValueError, delay=2, jitter=1.5, tries=10) -def get_metrics( - namespace: str, - start_date: datetime, - metric_name: str, - dimensions: Optional[List[DimensionTypeDef]] = None, - cw_client: Optional[CloudWatchClient] = None, - end_date: Optional[datetime] = None, - period: int = 60, - stat: str = "Sum", -) -> MetricDataResultTypeDef: - """Fetch CloudWatch Metrics - - It takes into account eventual consistency with up to 10 retries and 1s jitter. - - Parameters - ---------- - namespace : str - Metric Namespace - start_date : datetime - Start window to fetch metrics - metric_name : str - Metric name - dimensions : Optional[List[DimensionTypeDef]], optional - List of Metric Dimension, by default None - cw_client : Optional[CloudWatchClient], optional - Boto3 CloudWatch low-level client (boto3.client("cloudwatch"), by default None - end_date : Optional[datetime], optional - End window to fetch metrics, by default start_date + 2 minutes window - period : int, optional - Time period to fetch metrics for, by default 60 - stat : str, optional - Aggregation function to use when fetching metrics, by default "Sum" - - Returns - ------- - MetricDataResultTypeDef - _description_ - - Raises - ------ - ValueError - When no metric is found within retry window - """ - cw_client = cw_client or boto3.client("cloudwatch") - end_date = end_date or start_date + timedelta(minutes=2) - - metric_query = build_metric_query_data( - namespace=namespace, metric_name=metric_name, period=period, stat=stat, dimensions=dimensions - ) - - response = cw_client.get_metric_data( - MetricDataQueries=metric_query, - StartTime=start_date, - EndTime=end_date or datetime.utcnow(), - ) - result = response["MetricDataResults"][0] - if not result["Values"]: - raise ValueError("Empty response from Cloudwatch. Repeating...") - return result - - -@retry(ValueError, delay=1, jitter=1, tries=10) -def get_traces( - filter_expression: str, start_date: datetime, end_date: datetime, xray_client: Optional[XRayClient] = None -) -> Dict: - xray_client = xray_client or boto3.client("xray") - paginator = xray_client.get_paginator("get_trace_summaries") - response_iterator = paginator.paginate( - StartTime=start_date, - EndTime=end_date, - TimeRangeType="Event", - Sampling=False, - FilterExpression=filter_expression, - ) - - traces = [trace["TraceSummaries"][0]["Id"] for trace in response_iterator if trace["TraceSummaries"]] - if not traces: - raise ValueError("Empty response from X-RAY. Repeating...") - - trace_details = xray_client.batch_get_traces( - TraceIds=traces, - ) - - return trace_details - - -def find_trace_additional_info(trace: Dict) -> List[TraceSegment]: - """Find all trace annotations and metadata and return them to the caller""" - info = [] - for segment in trace["Traces"][0]["Segments"]: - document = json.loads(segment["Document"]) - if document["origin"] == "AWS::Lambda::Function": - for subsegment in document["subsegments"]: - if subsegment["name"] == "Invocation": - find_meta(segment=subsegment, result=info) - return info - - -def find_meta(segment: dict, result: List): - for x_subsegment in segment["subsegments"]: - result.append( - TraceSegment( - name=x_subsegment["name"], - metadata=x_subsegment.get("metadata", {}), - annotations=x_subsegment.get("annotations", {}), - ) - ) - if x_subsegment.get("subsegments"): - find_meta(segment=x_subsegment, result=result) - - -# Maintenance: Build a separate module for builders -def build_metric_name() -> str: - return f"test_metric{build_random_value()}" - - -def build_service_name() -> str: - return f"test_service{build_random_value()}" - - -def build_random_value(nbytes: int = 10) -> str: - return secrets.token_urlsafe(nbytes).replace("-", "") - - -def build_metric_query_data( - namespace: str, - metric_name: str, - period: int = 60, - stat: str = "Sum", - dimensions: Optional[List[DimensionTypeDef]] = None, -) -> List[MetricDataQueryTypeDef]: - dimensions = dimensions or [] - data_query: List[MetricDataQueryTypeDef] = [ - { - "Id": metric_name.lower(), - "MetricStat": { - "Metric": {"Namespace": namespace, "MetricName": metric_name}, - "Period": period, - "Stat": stat, - }, - "ReturnData": True, - } - ] - - if dimensions: - data_query[0]["MetricStat"]["Metric"]["Dimensions"] = dimensions - - return data_query - - -def build_add_metric_input(metric_name: str, value: float, unit: str = MetricUnit.Count.value) -> Dict: - """Create a metric input to be used with Metrics.add_metric() - - Parameters - ---------- - metric_name : str - metric name - value : float - metric value - unit : str, optional - metric unit, by default Count - - Returns - ------- - Dict - Metric input - """ - return {"name": metric_name, "unit": unit, "value": value} - - -def build_multiple_add_metric_input( - metric_name: str, value: float, unit: str = MetricUnit.Count.value, quantity: int = 1 -) -> Dict: - """Create list of metrics input to be used with Metrics.add_metric() - - Parameters - ---------- - metric_name : str - metric name - value : float - metric value - unit : str, optional - metric unit, by default Count - quantity : int, optional - number of metrics to be created, by default 1 - - Returns - ------- - List[Dict] - List of metrics - """ - return [{"name": metric_name, "unit": unit, "value": value} for _ in range(quantity)] - - -def build_add_dimensions_input(**dimensions) -> List[DimensionTypeDef]: - """Create dimensions input to be used with either get_metrics or Metrics.add_dimension() - - Parameters - ---------- - name : str - dimension name - value : float - dimension value - - Returns - ------- - Dict - Metric dimension input - """ - return [{"Name": name, "Value": value} for name, value in dimensions.items()] diff --git a/tests/e2e/utils/infrastructure.py b/tests/e2e/utils/infrastructure.py index fb3c3c02ce2..ced6d70a1ad 100644 --- a/tests/e2e/utils/infrastructure.py +++ b/tests/e2e/utils/infrastructure.py @@ -312,12 +312,12 @@ def deploy(self) -> Dict[str, str]: assets.upload() return self._deploy_stack(self.stack_name, template) - def delete(self): + def delete(self) -> None: """Delete CloudFormation Stack""" self.cfn.delete_stack(StackName=self.stack_name) @abstractmethod - def create_resources(self): + def create_resources(self) -> None: """Create any necessary CDK resources. It'll be called before deploy Examples diff --git a/tests/e2e/utils/models.py b/tests/e2e/utils/models.py deleted file mode 100644 index 0c3f81070d5..00000000000 --- a/tests/e2e/utils/models.py +++ /dev/null @@ -1,31 +0,0 @@ -from typing import Dict - -from pydantic import BaseModel, Field - - -class AssetTemplateConfigSource(BaseModel): - path: str - packaging: str - - -class AssetTemplateConfigDestinationsAccount(BaseModel): - bucket_name: str = Field(str, alias="bucketName") - object_key: str = Field(str, alias="objectKey") - assume_role_arn: str = Field(str, alias="assumeRoleArn") - - -class AssetTemplateConfigDestinations(BaseModel): - current_account_current_region: AssetTemplateConfigDestinationsAccount = Field( - AssetTemplateConfigDestinationsAccount, alias="current_account-current_region" - ) - - -class AssetTemplateConfig(BaseModel): - source: AssetTemplateConfigSource - destinations: AssetTemplateConfigDestinations - - -class TemplateAssembly(BaseModel): - version: str - files: Dict[str, AssetTemplateConfig] - docker_images: Dict = Field(Dict, alias="dockerImages")