Skip to content

feat(metrics): add EphemeralMetrics as a non-singleton option #1676

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
3 changes: 2 additions & 1 deletion aws_lambda_powertools/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
from .base import MetricUnit
from .exceptions import MetricUnitError, MetricValueError, SchemaValidationError
from .metric import single_metric
from .metrics import Metrics
from .metrics import EphemeralMetrics, Metrics

__all__ = [
"Metrics",
"EphemeralMetrics",
"single_metric",
"MetricUnit",
"MetricUnitError",
Expand Down
230 changes: 225 additions & 5 deletions aws_lambda_powertools/metrics/base.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import datetime
import functools
import json
import logging
import numbers
import os
import warnings
from collections import defaultdict
from contextlib import contextmanager
from enum import Enum
from typing import Any, Dict, List, Optional, Union
from typing import Any, Callable, Dict, Generator, List, Optional, Union

from ..shared import constants
from ..shared.functions import resolve_env_var_choice
Expand All @@ -16,6 +19,8 @@
MAX_METRICS = 100
MAX_DIMENSIONS = 29

is_cold_start = True


class MetricUnit(Enum):
Seconds = "Seconds"
Expand Down Expand Up @@ -86,9 +91,9 @@ def __init__(
self.dimension_set = dimension_set if dimension_set is not None else {}
self.namespace = resolve_env_var_choice(choice=namespace, env=os.getenv(constants.METRICS_NAMESPACE_ENV))
self.service = resolve_env_var_choice(choice=service, env=os.getenv(constants.SERVICE_NAME_ENV))
self.metadata_set = metadata_set if metadata_set is not None else {}
self._metric_units = [unit.value for unit in MetricUnit]
self._metric_unit_options = list(MetricUnit.__members__)
self.metadata_set = metadata_set if metadata_set is not None else {}

def add_metric(self, name: str, unit: Union[MetricUnit, str], value: float) -> None:
"""Adds given metric
Expand Down Expand Up @@ -120,7 +125,7 @@ def add_metric(self, name: str, unit: Union[MetricUnit, str], value: float) -> N
if not isinstance(value, numbers.Number):
raise MetricValueError(f"{value} is not a valid number")

unit = self.__extract_metric_unit_value(unit=unit)
unit = self._extract_metric_unit_value(unit=unit)
metric: Dict = self.metric_set.get(name, defaultdict(list))
metric["Unit"] = unit
metric["Value"].append(float(value))
Expand Down Expand Up @@ -179,7 +184,7 @@ def serialize_metric_set(

if self.service and not self.dimension_set.get("service"):
# self.service won't be a float
self.add_dimension(name="service", value=self.service) # type: ignore[arg-type]
self.add_dimension(name="service", value=self.service)

if len(metrics) == 0:
raise SchemaValidationError("Must contain at least one metric.")
Expand Down Expand Up @@ -274,7 +279,86 @@ def add_metadata(self, key: str, value: Any) -> None:
else:
self.metadata_set[str(key)] = value

def __extract_metric_unit_value(self, unit: Union[str, MetricUnit]) -> str:
def clear_metrics(self) -> None:
logger.debug("Clearing out existing metric set from memory")
self.metric_set.clear()
self.dimension_set.clear()
self.metadata_set.clear()

def log_metrics(
self,
lambda_handler: Union[Callable[[Dict, Any], Any], Optional[Callable[[Dict, Any, Optional[Dict]], Any]]] = None,
capture_cold_start_metric: bool = False,
raise_on_empty_metrics: bool = False,
default_dimensions: Optional[Dict[str, str]] = None,
):
"""Decorator to serialize and publish metrics at the end of a function execution.

Be aware that the log_metrics **does call* the decorated function (e.g. lambda_handler).

Example
-------
**Lambda function using tracer and metrics decorators**

from aws_lambda_powertools import Metrics, Tracer

metrics = Metrics(service="payment")
tracer = Tracer(service="payment")

@tracer.capture_lambda_handler
@metrics.log_metrics
def handler(event, context):
...

Parameters
----------
lambda_handler : Callable[[Any, Any], Any], optional
lambda function handler, by default None
capture_cold_start_metric : bool, optional
captures cold start metric, by default False
raise_on_empty_metrics : bool, optional
raise exception if no metrics are emitted, by default False
default_dimensions: Dict[str, str], optional
metric dimensions as key=value that will always be present

Raises
------
e
Propagate error received
"""

# If handler is None we've been called with parameters
# Return a partial function with args filled
if lambda_handler is None:
logger.debug("Decorator called with parameters")
return functools.partial(
self.log_metrics,
capture_cold_start_metric=capture_cold_start_metric,
raise_on_empty_metrics=raise_on_empty_metrics,
default_dimensions=default_dimensions,
)

@functools.wraps(lambda_handler)
def decorate(event, context):
try:
if default_dimensions:
self.set_default_dimensions(**default_dimensions)
response = lambda_handler(event, context)
if capture_cold_start_metric:
self._add_cold_start_metric(context=context)
finally:
if not raise_on_empty_metrics and not self.metric_set:
warnings.warn("No metrics to publish, skipping")
else:
metrics = self.serialize_metric_set()
self.clear_metrics()
print(json.dumps(metrics, separators=(",", ":")))

return response

return decorate

def _extract_metric_unit_value(self, unit: Union[str, MetricUnit]) -> str:
"""Return metric value from metric unit whether that's str or MetricUnit enum

Parameters
Expand Down Expand Up @@ -306,3 +390,139 @@ def __extract_metric_unit_value(self, unit: Union[str, MetricUnit]) -> str:
unit = unit.value

return unit

def _add_cold_start_metric(self, context: Any) -> None:
"""Add cold start metric and function_name dimension

Parameters
----------
context : Any
Lambda context
"""
global is_cold_start
if is_cold_start:
logger.debug("Adding cold start metric and function_name dimension")
with single_metric(name="ColdStart", unit=MetricUnit.Count, value=1, namespace=self.namespace) as metric:
metric.add_dimension(name="function_name", value=context.function_name)
if self.service:
metric.add_dimension(name="service", value=str(self.service))
is_cold_start = False


class SingleMetric(MetricManager):
"""SingleMetric creates an EMF object with a single metric.

EMF specification doesn't allow metrics with different dimensions.
SingleMetric overrides MetricManager's add_metric method to do just that.

Use `single_metric` when you need to create metrics with different dimensions,
otherwise `aws_lambda_powertools.metrics.metrics.Metrics` is
a more cost effective option

Environment variables
---------------------
POWERTOOLS_METRICS_NAMESPACE : str
metric namespace

Example
-------
**Creates cold start metric with function_version as dimension**

import json
from aws_lambda_powertools.metrics import single_metric, MetricUnit
metric = single_metric(namespace="ServerlessAirline")

metric.add_metric(name="ColdStart", unit=MetricUnit.Count, value=1)
metric.add_dimension(name="function_version", value=47)

print(json.dumps(metric.serialize_metric_set(), indent=4))

Parameters
----------
MetricManager : MetricManager
Inherits from `aws_lambda_powertools.metrics.base.MetricManager`
"""

def add_metric(self, name: str, unit: Union[MetricUnit, str], value: float) -> None:
"""Method to prevent more than one metric being created

Parameters
----------
name : str
Metric name (e.g. BookingConfirmation)
unit : MetricUnit
Metric unit (e.g. "Seconds", MetricUnit.Seconds)
value : float
Metric value
"""
if len(self.metric_set) > 0:
logger.debug(f"Metric {name} already set, skipping...")
return
return super().add_metric(name, unit, value)


@contextmanager
def single_metric(
name: str, unit: MetricUnit, value: float, namespace: Optional[str] = None
) -> Generator[SingleMetric, None, None]:
"""Context manager to simplify creation of a single metric

Example
-------
**Creates cold start metric with function_version as dimension**

from aws_lambda_powertools import single_metric
from aws_lambda_powertools.metrics import MetricUnit

with single_metric(name="ColdStart", unit=MetricUnit.Count, value=1, namespace="ServerlessAirline") as metric:
metric.add_dimension(name="function_version", value="47")

**Same as above but set namespace using environment variable**

$ export POWERTOOLS_METRICS_NAMESPACE="ServerlessAirline"

from aws_lambda_powertools import single_metric
from aws_lambda_powertools.metrics import MetricUnit

with single_metric(name="ColdStart", unit=MetricUnit.Count, value=1) as metric:
metric.add_dimension(name="function_version", value="47")

Parameters
----------
name : str
Metric name
unit : MetricUnit
`aws_lambda_powertools.helper.models.MetricUnit`
value : float
Metric value
namespace: str
Namespace for metrics

Yields
-------
SingleMetric
SingleMetric class instance

Raises
------
MetricUnitError
When metric metric isn't supported by CloudWatch
MetricValueError
When metric value isn't a number
SchemaValidationError
When metric object fails EMF schema validation
"""
metric_set: Optional[Dict] = None
try:
metric: SingleMetric = SingleMetric(namespace=namespace)
metric.add_metric(name=name, unit=unit, value=value)
yield metric
metric_set = metric.serialize_metric_set()
finally:
print(json.dumps(metric_set, separators=(",", ":")))


def reset_cold_start_flag():
global is_cold_start
if not is_cold_start:
is_cold_start = True
Loading