Skip to content

refactor(metrics): move from protocol to ABC; split provider tests #2934

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
3 changes: 2 additions & 1 deletion aws_lambda_powertools/metrics/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from aws_lambda_powertools.metrics.base import MetricResolution, MetricUnit
from aws_lambda_powertools.metrics.provider.cloudwatch_emf.cloudwatch import AmazonCloudWatchEMFProvider
from aws_lambda_powertools.metrics.provider.cloudwatch_emf.types import CloudWatchEMFOutput


class Metrics:
Expand Down Expand Up @@ -115,7 +116,7 @@ def serialize_metric_set(
metrics: Dict | None = None,
dimensions: Dict | None = None,
metadata: Dict | None = None,
) -> Dict:
) -> CloudWatchEMFOutput:
return self.provider.serialize_metric_set(metrics=metrics, dimensions=dimensions, metadata=metadata)

def add_metadata(self, key: str, value: Any) -> None:
Expand Down
16 changes: 7 additions & 9 deletions aws_lambda_powertools/metrics/provider/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@
from typing import Any, Callable, Dict, Optional

from aws_lambda_powertools.metrics.provider import cold_start
from aws_lambda_powertools.utilities.typing import LambdaContext

logger = logging.getLogger(__name__)


class BaseProvider(ABC):
"""
Class for metric provider interface.
Interface to create a metrics provider.

This class serves as an interface for creating your own metric provider. Inherit from this class
and implement the required methods to define your specific metric provider.
BaseProvider implements `log_metrics` decorator for every provider as a value add feature.

Usage:
1. Inherit from this class.
Expand Down Expand Up @@ -97,11 +97,11 @@ def flush_metrics(self, *args: Any, **kwargs) -> Any:
raise NotImplementedError

@abstractmethod
def clear_metrics(self, *args: Any, **kwargs) -> Any:
def clear_metrics(self, *args: Any, **kwargs) -> None:
"""
Abstract method for clear metric instance.

This method must be implemented in subclasses to add a metric and return a combined metrics dictionary.
This method must be implemented in subclasses to clear the metric instance

Parameters
----------
Expand All @@ -118,7 +118,7 @@ def clear_metrics(self, *args: Any, **kwargs) -> Any:
raise NotImplementedError

@abstractmethod
def add_cold_start_metric(self, context: Any) -> Any:
def add_cold_start_metric(self, context: LambdaContext) -> Any:
"""
Abstract method for clear metric instance.

Expand Down Expand Up @@ -196,8 +196,6 @@ def handler(event, context):
@functools.wraps(lambda_handler)
def decorate(event, context):
try:
if default_dimensions:
self.set_default_dimensions(**default_dimensions)
response = lambda_handler(event, context)
if capture_cold_start_metric:
self._add_cold_start_metric(context=context)
Expand All @@ -210,7 +208,7 @@ def decorate(event, context):

def _add_cold_start_metric(self, context: Any) -> None:
"""
Check if it's cold start and add a metric if yes
Add cold start metric

Parameters
----------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,18 @@
from aws_lambda_powertools.metrics.provider.base import BaseProvider
from aws_lambda_powertools.metrics.provider.cloudwatch_emf.constants import MAX_DIMENSIONS, MAX_METRICS
from aws_lambda_powertools.metrics.provider.cloudwatch_emf.metric_properties import MetricResolution, MetricUnit
from aws_lambda_powertools.metrics.provider.cloudwatch_emf.types import CloudWatchEMFOutput
from aws_lambda_powertools.metrics.types import MetricNameUnitResolution
from aws_lambda_powertools.shared import constants
from aws_lambda_powertools.shared.functions import resolve_env_var_choice
from aws_lambda_powertools.utilities.typing import LambdaContext

logger = logging.getLogger(__name__)


class AmazonCloudWatchEMFProvider(BaseProvider):
"""
AmazonCloudWatchEMFProvider class (namespace, metric, dimension, serialization)
AmazonCloudWatchEMFProvider creates metrics asynchronously via CloudWatch Embedded Metric Format (EMF).

AmazonCloudWatchEMFProvider creates metrics asynchronously thanks to CloudWatch Embedded Metric Format (EMF).
CloudWatch EMF can create up to 100 metrics per EMF object
Expand Down Expand Up @@ -152,7 +154,7 @@ def serialize_metric_set(
metrics: Dict | None = None,
dimensions: Dict | None = None,
metadata: Dict | None = None,
) -> Dict:
) -> CloudWatchEMFOutput:
"""Serializes metric and dimensions set

Parameters
Expand Down Expand Up @@ -238,7 +240,8 @@ def serialize_metric_set(
},
],
},
**dimensions, # "service": "test_service"
# NOTE: Mypy doesn't recognize splats '** syntax' in TypedDict
**dimensions, # type: ignore[misc] # "service": "test_service"
**metadata, # "username": "test"
**metric_names_and_values, # "single_metric": 1.0
}
Expand Down Expand Up @@ -371,14 +374,19 @@ def handler(event, context):
Propagate error received
"""

default_dimensions = kwargs.get("default_dimensions")

if default_dimensions:
self.set_default_dimensions(**default_dimensions)

return super().log_metrics(
lambda_handler=lambda_handler,
capture_cold_start_metric=capture_cold_start_metric,
raise_on_empty_metrics=raise_on_empty_metrics,
**kwargs,
)

def add_cold_start_metric(self, context: Any) -> None:
def add_cold_start_metric(self, context: LambdaContext) -> None:
"""Add cold start metric and function_name dimension

Parameters
Expand Down
24 changes: 24 additions & 0 deletions aws_lambda_powertools/metrics/provider/cloudwatch_emf/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from typing import List

from typing_extensions import NotRequired, TypedDict


class CloudWatchEMFMetric(TypedDict):
Name: str
Unit: str
StorageResolution: NotRequired[int]


class CloudWatchEMFMetrics(TypedDict):
Namespace: str
Dimensions: List[List[str]] # [ [ 'test_dimension' ] ]
Metrics: List[CloudWatchEMFMetric]


class CloudWatchEMFRoot(TypedDict):
Timestamp: int
CloudWatchMetrics: List[CloudWatchEMFMetrics]


class CloudWatchEMFOutput(TypedDict):
_aws: CloudWatchEMFRoot
26 changes: 20 additions & 6 deletions tests/functional/metrics/test_metrics_cloudwatch_emf.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import json
import warnings
from collections import namedtuple
Expand All @@ -16,16 +18,23 @@
SchemaValidationError,
single_metric,
)
from aws_lambda_powertools.metrics.provider.cloudwatch_emf.cloudwatch import AmazonCloudWatchEMFProvider
from aws_lambda_powertools.metrics.provider.cloudwatch_emf.constants import MAX_DIMENSIONS
from aws_lambda_powertools.metrics.provider.cloudwatch_emf.cloudwatch import (
AmazonCloudWatchEMFProvider,
)
from aws_lambda_powertools.metrics.provider.cloudwatch_emf.constants import (
MAX_DIMENSIONS,
)
from aws_lambda_powertools.metrics.provider.cloudwatch_emf.types import (
CloudWatchEMFOutput,
)


def serialize_metrics(
metrics: List[Dict],
dimensions: List[Dict],
namespace: str,
metadatas: List[Dict] = None,
) -> Dict:
metadatas: List[Dict] | None = None,
) -> CloudWatchEMFOutput:
"""Helper function to build EMF object from a list of metrics, dimensions"""
my_metrics = AmazonCloudWatchEMFProvider(namespace=namespace)
for dimension in dimensions:
Expand All @@ -42,7 +51,12 @@ def serialize_metrics(
return my_metrics.serialize_metric_set()


def serialize_single_metric(metric: Dict, dimension: Dict, namespace: str, metadata: Dict = None) -> Dict:
def serialize_single_metric(
metric: Dict,
dimension: Dict,
namespace: str,
metadata: Dict | None = None,
) -> CloudWatchEMFOutput:
"""Helper function to build EMF object from a given metric, dimension and namespace"""
my_metrics = AmazonCloudWatchEMFProvider(namespace=namespace)
my_metrics.add_metric(**metric)
Expand All @@ -64,7 +78,7 @@ def capture_metrics_output(capsys):
return json.loads(capsys.readouterr().out.strip())


def capture_metrics_output_multiple_emf_objects(capsys):
def capture_metrics_output_multiple_emf_objects(capsys) -> List[CloudWatchEMFOutput]:
return [json.loads(line.strip()) for line in capsys.readouterr().out.split("\n") if line]


Expand Down
Loading