diff --git a/aws_lambda_powertools/metrics/base.py b/aws_lambda_powertools/metrics/base.py index 73b13e33e5c..7304afa5a42 100644 --- a/aws_lambda_powertools/metrics/base.py +++ b/aws_lambda_powertools/metrics/base.py @@ -9,7 +9,7 @@ import warnings from collections import defaultdict from contextlib import contextmanager -from typing import Any, Callable, Dict, Generator, List, Optional, Union +from typing import TYPE_CHECKING, Any, Callable, Generator from aws_lambda_powertools.metrics.exceptions import ( MetricResolutionError, @@ -24,10 +24,12 @@ from aws_lambda_powertools.metrics.provider.cold_start import ( reset_cold_start_flag, # noqa: F401 # backwards compatibility ) -from aws_lambda_powertools.metrics.types import MetricNameUnitResolution from aws_lambda_powertools.shared import constants from aws_lambda_powertools.shared.functions import resolve_env_var_choice +if TYPE_CHECKING: + from aws_lambda_powertools.metrics.types import MetricNameUnitResolution + logger = logging.getLogger(__name__) # Maintenance: alias due to Hyrum's law @@ -66,10 +68,10 @@ class MetricManager: def __init__( self, - metric_set: Dict[str, Any] | None = None, - dimension_set: Dict | None = None, + metric_set: dict[str, Any] | None = None, + dimension_set: dict | None = None, namespace: str | None = None, - metadata_set: Dict[str, Any] | None = None, + metadata_set: dict[str, Any] | None = None, service: str | None = None, ): self.metric_set = metric_set if metric_set is not None else {} @@ -110,11 +112,11 @@ def add_metric( ---------- name : str Metric name - unit : Union[MetricUnit, str] + unit : MetricUnit | str `aws_lambda_powertools.helper.models.MetricUnit` value : float Metric value - resolution : Union[MetricResolution, int] + resolution : MetricResolution | int `aws_lambda_powertools.helper.models.MetricResolution` Raises @@ -129,7 +131,7 @@ def add_metric( unit = self._extract_metric_unit_value(unit=unit) resolution = self._extract_metric_resolution_value(resolution=resolution) - metric: Dict = self.metric_set.get(name, defaultdict(list)) + metric: dict = self.metric_set.get(name, defaultdict(list)) metric["Unit"] = unit metric["StorageResolution"] = resolution metric["Value"].append(float(value)) @@ -147,19 +149,19 @@ def add_metric( def serialize_metric_set( self, - metrics: Dict | None = None, - dimensions: Dict | None = None, - metadata: Dict | None = None, - ) -> Dict: + metrics: dict | None = None, + dimensions: dict | None = None, + metadata: dict | None = None, + ) -> dict: """Serializes metric and dimensions set Parameters ---------- - metrics : Dict, optional + metrics : dict, optional Dictionary of metrics to serialize, by default None - dimensions : Dict, optional + dimensions : dict, optional Dictionary of dimensions to serialize, by default None - metadata: Dict, optional + metadata: dict, optional Dictionary of metadata to serialize, by default None Example @@ -172,7 +174,7 @@ def serialize_metric_set( Returns ------- - Dict + dict Serialized metrics following EMF specification Raises @@ -206,8 +208,8 @@ def serialize_metric_set( # # In case using high-resolution metrics, add StorageResolution field # Example: [ { "Name": "metric_name", "Unit": "Count", "StorageResolution": 1 } ] # noqa ERA001 - metric_definition: List[MetricNameUnitResolution] = [] - metric_names_and_values: Dict[str, float] = {} # { "metric_name": 1.0 } + metric_definition: list[MetricNameUnitResolution] = [] + metric_names_and_values: dict[str, float] = {} # { "metric_name": 1.0 } for metric_name in metrics: metric: dict = metrics[metric_name] @@ -354,10 +356,10 @@ def flush_metrics(self, raise_on_empty_metrics: bool = False) -> None: def log_metrics( self, - lambda_handler: Callable[[Dict, Any], Any] | Optional[Callable[[Dict, Any, Optional[Dict]], Any]] = None, + lambda_handler: Callable[[dict, Any], Any] | Callable[[dict, Any, dict | None], Any] | None = None, capture_cold_start_metric: bool = False, raise_on_empty_metrics: bool = False, - default_dimensions: Dict[str, str] | None = None, + default_dimensions: dict[str, str] | None = None, ): """Decorator to serialize and publish metrics at the end of a function execution. @@ -385,7 +387,7 @@ def handler(event, context): captures cold start metric, by default False raise_on_empty_metrics : bool, optional raise exception if no metrics are emitted, by default False - default_dimensions: Dict[str, str], optional + default_dimensions: dict[str, str], optional metric dimensions as key=value that will always be present Raises @@ -420,12 +422,12 @@ def decorate(event, context, *args, **kwargs): return decorate - def _extract_metric_resolution_value(self, resolution: Union[int, MetricResolution]) -> int: + def _extract_metric_resolution_value(self, resolution: int | MetricResolution) -> int: """Return metric value from metric unit whether that's str or MetricResolution enum Parameters ---------- - unit : Union[int, MetricResolution] + unit : int | MetricResolution Metric resolution Returns @@ -448,12 +450,12 @@ def _extract_metric_resolution_value(self, resolution: Union[int, MetricResoluti f"Invalid metric resolution '{resolution}', expected either option: {self._metric_resolutions}", # noqa: E501 ) - def _extract_metric_unit_value(self, unit: Union[str, MetricUnit]) -> str: + def _extract_metric_unit_value(self, unit: str | MetricUnit) -> str: """Return metric value from metric unit whether that's str or MetricUnit enum Parameters ---------- - unit : Union[str, MetricUnit] + unit : str | MetricUnit Metric unit Returns @@ -566,7 +568,7 @@ def single_metric( value: float, resolution: MetricResolution | int = 60, namespace: str | None = None, - default_dimensions: Dict[str, str] | None = None, + default_dimensions: dict[str, str] | None = None, ) -> Generator[SingleMetric, None, None]: """Context manager to simplify creation of a single metric @@ -604,7 +606,7 @@ def single_metric( Metric value namespace: str Namespace for metrics - default_dimensions: Dict[str, str], optional + default_dimensions: dict[str, str], optional Metric dimensions as key=value that will always be present @@ -624,7 +626,7 @@ def single_metric( SchemaValidationError When metric object fails EMF schema validation """ # noqa: E501 - metric_set: Dict | None = None + metric_set: dict | None = None try: metric: SingleMetric = SingleMetric(namespace=namespace) metric.add_metric(name=name, unit=unit, value=value, resolution=resolution) diff --git a/aws_lambda_powertools/metrics/functions.py b/aws_lambda_powertools/metrics/functions.py index 6b00c608c36..14c68e88275 100644 --- a/aws_lambda_powertools/metrics/functions.py +++ b/aws_lambda_powertools/metrics/functions.py @@ -1,7 +1,6 @@ from __future__ import annotations from datetime import datetime -from typing import List from aws_lambda_powertools.metrics.provider.cloudwatch_emf.exceptions import ( MetricResolutionError, @@ -11,12 +10,12 @@ from aws_lambda_powertools.shared import constants -def extract_cloudwatch_metric_resolution_value(metric_resolutions: List, resolution: int | MetricResolution) -> int: +def extract_cloudwatch_metric_resolution_value(metric_resolutions: list, resolution: int | MetricResolution) -> int: """Return metric value from CloudWatch metric unit whether that's str or MetricResolution enum Parameters ---------- - unit : Union[int, MetricResolution] + resolution : int | MetricResolution Metric resolution Returns @@ -40,12 +39,12 @@ def extract_cloudwatch_metric_resolution_value(metric_resolutions: List, resolut ) -def extract_cloudwatch_metric_unit_value(metric_units: List, metric_valid_options: List, unit: str | MetricUnit) -> str: +def extract_cloudwatch_metric_unit_value(metric_units: list, metric_valid_options: list, unit: str | MetricUnit) -> str: """Return metric value from CloudWatch metric unit whether that's str or MetricUnit enum Parameters ---------- - unit : Union[str, MetricUnit] + unit : str | MetricUnit Metric unit Returns diff --git a/aws_lambda_powertools/metrics/metrics.py b/aws_lambda_powertools/metrics/metrics.py index 05d9010684c..8674f053bd4 100644 --- a/aws_lambda_powertools/metrics/metrics.py +++ b/aws_lambda_powertools/metrics/metrics.py @@ -1,12 +1,14 @@ # NOTE: keeps for compatibility from __future__ import annotations -from typing import Any, Dict +from typing import TYPE_CHECKING, Any -from aws_lambda_powertools.metrics.base import MetricResolution, MetricUnit from aws_lambda_powertools.metrics.provider.cloudwatch_emf.cloudwatch import AmazonCloudWatchEMFProvider -from aws_lambda_powertools.metrics.provider.cloudwatch_emf.types import CloudWatchEMFOutput -from aws_lambda_powertools.shared.types import AnyCallableT + +if TYPE_CHECKING: + from aws_lambda_powertools.metrics.base import MetricResolution, MetricUnit + from aws_lambda_powertools.metrics.provider.cloudwatch_emf.types import CloudWatchEMFOutput + from aws_lambda_powertools.shared.types import AnyCallableT class Metrics: @@ -72,10 +74,10 @@ def lambda_handler(): # and not get caught by accident with metrics data loss, or data deduplication # e.g., m1 and m2 add metric ProductCreated, however m1 has 'version' dimension but m2 doesn't # Result: ProductCreated is created twice as we now have 2 different EMF blobs - _metrics: Dict[str, Any] = {} - _dimensions: Dict[str, str] = {} - _metadata: Dict[str, Any] = {} - _default_dimensions: Dict[str, Any] = {} + _metrics: dict[str, Any] = {} + _dimensions: dict[str, str] = {} + _metadata: dict[str, Any] = {} + _default_dimensions: dict[str, Any] = {} def __init__( self, @@ -116,9 +118,9 @@ def add_dimension(self, name: str, value: str) -> None: def serialize_metric_set( self, - metrics: Dict | None = None, - dimensions: Dict | None = None, - metadata: Dict | None = None, + metrics: dict | None = None, + dimensions: dict | None = None, + metadata: dict | None = None, ) -> CloudWatchEMFOutput: return self.provider.serialize_metric_set(metrics=metrics, dimensions=dimensions, metadata=metadata) @@ -146,7 +148,7 @@ def log_metrics( lambda_handler: AnyCallableT | None = None, capture_cold_start_metric: bool = False, raise_on_empty_metrics: bool = False, - default_dimensions: Dict[str, str] | None = None, + default_dimensions: dict[str, str] | None = None, **kwargs, ): return self.provider.log_metrics( @@ -163,7 +165,7 @@ def set_default_dimensions(self, **dimensions) -> None: Parameters ---------- - dimensions : Dict[str, Any], optional + dimensions : dict[str, Any], optional metric dimensions as key=value Example diff --git a/aws_lambda_powertools/metrics/provider/base.py b/aws_lambda_powertools/metrics/provider/base.py index ea61a5ec4d7..3aab6e7561e 100644 --- a/aws_lambda_powertools/metrics/provider/base.py +++ b/aws_lambda_powertools/metrics/provider/base.py @@ -3,11 +3,13 @@ import functools import logging from abc import ABC, abstractmethod -from typing import Any +from typing import TYPE_CHECKING, Any from aws_lambda_powertools.metrics.provider import cold_start -from aws_lambda_powertools.shared.types import AnyCallableT -from aws_lambda_powertools.utilities.typing import LambdaContext + +if TYPE_CHECKING: + from aws_lambda_powertools.shared.types import AnyCallableT + from aws_lambda_powertools.utilities.typing import LambdaContext logger = logging.getLogger(__name__) @@ -40,7 +42,7 @@ def add_metric(self, *args: Any, **kwargs: Any) -> Any: Returns ---------- - Dict + dict A combined metrics dictionary. Raises @@ -66,7 +68,7 @@ def serialize_metric_set(self, *args: Any, **kwargs: Any) -> Any: Returns ---------- - Dict + dict Serialized metrics Raises @@ -172,7 +174,7 @@ def handler(event, context): captures cold start metric, by default False raise_on_empty_metrics : bool, optional raise exception if no metrics are emitted, by default False - default_dimensions: Dict[str, str], optional + default_dimensions: dict[str, str], optional metric dimensions as key=value that will always be present Raises diff --git a/aws_lambda_powertools/metrics/provider/cloudwatch_emf/cloudwatch.py b/aws_lambda_powertools/metrics/provider/cloudwatch_emf/cloudwatch.py index d59026ebf69..5da02528aab 100644 --- a/aws_lambda_powertools/metrics/provider/cloudwatch_emf/cloudwatch.py +++ b/aws_lambda_powertools/metrics/provider/cloudwatch_emf/cloudwatch.py @@ -7,7 +7,7 @@ import os import warnings from collections import defaultdict -from typing import Any, Dict, List +from typing import TYPE_CHECKING, Any from aws_lambda_powertools.metrics.base import single_metric from aws_lambda_powertools.metrics.exceptions import MetricValueError, SchemaValidationError @@ -20,12 +20,14 @@ from aws_lambda_powertools.metrics.provider.base import BaseProvider from aws_lambda_powertools.metrics.provider.cloudwatch_emf.constants import MAX_DIMENSIONS, MAX_METRICS from aws_lambda_powertools.metrics.provider.cloudwatch_emf.metric_properties import MetricResolution, MetricUnit -from aws_lambda_powertools.metrics.provider.cloudwatch_emf.types import CloudWatchEMFOutput -from aws_lambda_powertools.metrics.types import MetricNameUnitResolution from aws_lambda_powertools.shared import constants from aws_lambda_powertools.shared.functions import resolve_env_var_choice -from aws_lambda_powertools.shared.types import AnyCallableT -from aws_lambda_powertools.utilities.typing import LambdaContext + +if TYPE_CHECKING: + from aws_lambda_powertools.metrics.provider.cloudwatch_emf.types import CloudWatchEMFOutput + from aws_lambda_powertools.metrics.types import MetricNameUnitResolution + from aws_lambda_powertools.shared.types import AnyCallableT + from aws_lambda_powertools.utilities.typing import LambdaContext logger = logging.getLogger(__name__) @@ -62,12 +64,12 @@ class AmazonCloudWatchEMFProvider(BaseProvider): def __init__( self, - metric_set: Dict[str, Any] | None = None, - dimension_set: Dict | None = None, + metric_set: dict[str, Any] | None = None, + dimension_set: dict | None = None, namespace: str | None = None, - metadata_set: Dict[str, Any] | None = None, + metadata_set: dict[str, Any] | None = None, service: str | None = None, - default_dimensions: Dict[str, Any] | None = None, + default_dimensions: dict[str, Any] | None = None, ): self.metric_set = metric_set if metric_set is not None else {} self.dimension_set = dimension_set if dimension_set is not None else {} @@ -110,11 +112,11 @@ def add_metric( ---------- name : str Metric name - unit : Union[MetricUnit, str] + unit : MetricUnit | str `aws_lambda_powertools.helper.models.MetricUnit` value : float Metric value - resolution : Union[MetricResolution, int] + resolution : MetricResolution | int `aws_lambda_powertools.helper.models.MetricResolution` Raises @@ -136,7 +138,7 @@ def add_metric( metric_resolutions=self._metric_resolutions, resolution=resolution, ) - metric: Dict = self.metric_set.get(name, defaultdict(list)) + metric: dict = self.metric_set.get(name, defaultdict(list)) metric["Unit"] = unit metric["StorageResolution"] = resolution metric["Value"].append(float(value)) @@ -154,19 +156,19 @@ def add_metric( def serialize_metric_set( self, - metrics: Dict | None = None, - dimensions: Dict | None = None, - metadata: Dict | None = None, + metrics: dict | None = None, + dimensions: dict | None = None, + metadata: dict | None = None, ) -> CloudWatchEMFOutput: """Serializes metric and dimensions set Parameters ---------- - metrics : Dict, optional + metrics : dict, optional Dictionary of metrics to serialize, by default None - dimensions : Dict, optional + dimensions : dict, optional Dictionary of dimensions to serialize, by default None - metadata: Dict, optional + metadata: dict, optional Dictionary of metadata to serialize, by default None Example @@ -179,7 +181,7 @@ def serialize_metric_set( Returns ------- - Dict + CloudWatchEMFOutput Serialized metrics following EMF specification Raises @@ -213,8 +215,8 @@ def serialize_metric_set( # # In case using high-resolution metrics, add StorageResolution field # Example: [ { "Name": "metric_name", "Unit": "Count", "StorageResolution": 1 } ] # noqa ERA001 - metric_definition: List[MetricNameUnitResolution] = [] - metric_names_and_values: Dict[str, float] = {} # { "metric_name": 1.0 } + metric_definition: list[MetricNameUnitResolution] = [] + metric_names_and_values: dict[str, float] = {} # { "metric_name": 1.0 } for metric_name in metrics: metric: dict = metrics[metric_name] @@ -433,7 +435,7 @@ def set_default_dimensions(self, **dimensions) -> None: Parameters ---------- - dimensions : Dict[str, Any], optional + dimensions : dict[str, Any], optional metric dimensions as key=value Example diff --git a/aws_lambda_powertools/metrics/provider/cloudwatch_emf/types.py b/aws_lambda_powertools/metrics/provider/cloudwatch_emf/types.py index 669e931ff3e..6c94af94cf4 100644 --- a/aws_lambda_powertools/metrics/provider/cloudwatch_emf/types.py +++ b/aws_lambda_powertools/metrics/provider/cloudwatch_emf/types.py @@ -1,4 +1,6 @@ -from typing import List, TypedDict +from __future__ import annotations + +from typing import TypedDict from typing_extensions import NotRequired @@ -11,13 +13,13 @@ class CloudWatchEMFMetric(TypedDict): class CloudWatchEMFMetrics(TypedDict): Namespace: str - Dimensions: List[List[str]] # [ [ 'test_dimension' ] ] - Metrics: List[CloudWatchEMFMetric] + Dimensions: list[list[str]] # [ [ 'test_dimension' ] ] + Metrics: list[CloudWatchEMFMetric] class CloudWatchEMFRoot(TypedDict): Timestamp: int - CloudWatchMetrics: List[CloudWatchEMFMetrics] + CloudWatchMetrics: list[CloudWatchEMFMetrics] class CloudWatchEMFOutput(TypedDict): diff --git a/aws_lambda_powertools/metrics/provider/datadog/datadog.py b/aws_lambda_powertools/metrics/provider/datadog/datadog.py index 1e527a1ddb9..d79782363f4 100644 --- a/aws_lambda_powertools/metrics/provider/datadog/datadog.py +++ b/aws_lambda_powertools/metrics/provider/datadog/datadog.py @@ -7,15 +7,17 @@ import re import time import warnings -from typing import Any, Dict, List +from typing import TYPE_CHECKING, Any from aws_lambda_powertools.metrics.exceptions import MetricValueError, SchemaValidationError from aws_lambda_powertools.metrics.provider import BaseProvider from aws_lambda_powertools.metrics.provider.datadog.warnings import DatadogDataValidationWarning from aws_lambda_powertools.shared import constants from aws_lambda_powertools.shared.functions import resolve_env_var_choice -from aws_lambda_powertools.shared.types import AnyCallableT -from aws_lambda_powertools.utilities.typing import LambdaContext + +if TYPE_CHECKING: + from aws_lambda_powertools.shared.types import AnyCallableT + from aws_lambda_powertools.utilities.typing import LambdaContext METRIC_NAME_REGEX = re.compile(r"^[a-zA-Z0-9_.]+$") @@ -51,10 +53,10 @@ class DatadogProvider(BaseProvider): def __init__( self, - metric_set: List | None = None, + metric_set: list | None = None, namespace: str | None = None, flush_to_log: bool | None = None, - default_tags: Dict[str, Any] | None = None, + default_tags: dict[str, Any] | None = None, ): self.metric_set = metric_set if metric_set is not None else [] self.namespace = ( @@ -83,8 +85,8 @@ def add_metric( Value for the metrics timestamp: int Timestamp in int for the metrics, default = time.time() - tags: List[str] - In format like List["tag:value","tag2:value2"] + tags: list[str] + In format like ["tag:value", "tag2:value2"] args: Any extra args will be dropped for compatibility kwargs: Any @@ -122,7 +124,7 @@ def add_metric( logger.debug({"details": "Appending metric", "metrics": name}) self.metric_set.append({"m": name, "v": value, "e": timestamp, "t": tags}) - def serialize_metric_set(self, metrics: List | None = None) -> List: + def serialize_metric_set(self, metrics: list | None = None) -> list: """Serializes metrics Example @@ -135,7 +137,7 @@ def serialize_metric_set(self, metrics: List | None = None) -> List: Returns ------- - List + list Serialized metrics following Datadog specification Raises @@ -150,7 +152,7 @@ def serialize_metric_set(self, metrics: List | None = None) -> List: if len(metrics) == 0: raise SchemaValidationError("Must contain at least one metric.") - output_list: List = [] + output_list: list = [] logger.debug({"details": "Serializing metrics", "metrics": metrics}) @@ -304,7 +306,7 @@ def lambda_handler(): self.default_tags.update(**tags) @staticmethod - def _serialize_datadog_tags(metric_tags: Dict[str, Any], default_tags: Dict[str, Any]) -> List[str]: + def _serialize_datadog_tags(metric_tags: dict[str, Any], default_tags: dict[str, Any]) -> list[str]: """ Serialize metric tags into a list of formatted strings for Datadog integration. @@ -313,14 +315,14 @@ def _serialize_datadog_tags(metric_tags: Dict[str, Any], default_tags: Dict[str, Parameters ---------- - metric_tags: Dict[str, Any] + metric_tags: dict[str, Any] A dictionary containing metric-specific tags. - default_tags: Dict[str, Any] + default_tags: dict[str, Any] A dictionary containing default tags applicable to all metrics. Returns: ------- - List[str] + list[str] A list of formatted tag strings, each in the "tag_key:tag_value" format. Example: @@ -337,7 +339,7 @@ def _serialize_datadog_tags(metric_tags: Dict[str, Any], default_tags: Dict[str, return [f"{tag_key}:{tag_value}" for tag_key, tag_value in tags.items()] @staticmethod - def _validate_datadog_tags_name(tags: Dict): + def _validate_datadog_tags_name(tags: dict): """ Validate a metric tag according to specific requirements. @@ -348,7 +350,7 @@ def _validate_datadog_tags_name(tags: Dict): Parameters: ---------- - tags: Dict + tags: dict The metric tags to be validated. """ for tag_key, tag_value in tags.items(): diff --git a/aws_lambda_powertools/metrics/provider/datadog/metrics.py b/aws_lambda_powertools/metrics/provider/datadog/metrics.py index 7539b0336be..50a0eec4b8f 100644 --- a/aws_lambda_powertools/metrics/provider/datadog/metrics.py +++ b/aws_lambda_powertools/metrics/provider/datadog/metrics.py @@ -1,10 +1,12 @@ # NOTE: keeps for compatibility from __future__ import annotations -from typing import Any, Dict, List +from typing import TYPE_CHECKING, Any from aws_lambda_powertools.metrics.provider.datadog.datadog import DatadogProvider -from aws_lambda_powertools.shared.types import AnyCallableT + +if TYPE_CHECKING: + from aws_lambda_powertools.shared.types import AnyCallableT class DatadogMetrics: @@ -53,8 +55,8 @@ def lambda_handler(): # and not get caught by accident with metrics data loss, or data deduplication # e.g., m1 and m2 add metric ProductCreated, however m1 has 'version' dimension but m2 doesn't # Result: ProductCreated is created twice as we now have 2 different EMF blobs - _metrics: List = [] - _default_tags: Dict[str, Any] = {} + _metrics: list = [] + _default_tags: dict[str, Any] = {} def __init__( self, @@ -83,7 +85,7 @@ def add_metric( ) -> None: self.provider.add_metric(name=name, value=value, timestamp=timestamp, **tags) - def serialize_metric_set(self, metrics: List | None = None) -> List: + def serialize_metric_set(self, metrics: list | None = None) -> list: return self.provider.serialize_metric_set(metrics=metrics) def flush_metrics(self, raise_on_empty_metrics: bool = False) -> None: @@ -94,7 +96,7 @@ def log_metrics( lambda_handler: AnyCallableT | None = None, capture_cold_start_metric: bool = False, raise_on_empty_metrics: bool = False, - default_tags: Dict[str, Any] | None = None, + default_tags: dict[str, Any] | None = None, ): return self.provider.log_metrics( lambda_handler=lambda_handler,