-
Notifications
You must be signed in to change notification settings - Fork 421
/
Copy pathdatadog.py
395 lines (320 loc) · 13.7 KB
/
datadog.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
from __future__ import annotations
import json
import logging
import numbers
import os
import re
import time
import warnings
from typing import TYPE_CHECKING, Any
from aws_lambda_powertools.metrics.exceptions import MetricValueError, SchemaValidationError
from aws_lambda_powertools.metrics.functions import is_metrics_disabled
from aws_lambda_powertools.metrics.provider import BaseProvider
from aws_lambda_powertools.metrics.provider.datadog.warnings import DatadogDataValidationWarning
from aws_lambda_powertools.shared import constants
from aws_lambda_powertools.shared.functions import resolve_env_var_choice
if TYPE_CHECKING:
from aws_lambda_powertools.shared.types import AnyCallableT
from aws_lambda_powertools.utilities.typing import LambdaContext
METRIC_NAME_REGEX = re.compile(r"^[a-zA-Z0-9_.]+$")
logger = logging.getLogger(__name__)
# Check if using datadog layer
try:
from datadog_lambda.metric import lambda_metric # type: ignore
except ImportError: # pragma: no cover
lambda_metric = None # pragma: no cover
DEFAULT_NAMESPACE = "default"
class DatadogProvider(BaseProvider):
"""
DatadogProvider creates metrics asynchronously via Datadog extension or exporter.
**Use `aws_lambda_powertools.DatadogMetrics` to create and metrics to Datadog.**
Environment variables
---------------------
POWERTOOLS_METRICS_NAMESPACE : str
metric namespace to be set for all metrics
Raises
------
MetricValueError
When metric value isn't a number
SchemaValidationError
When metric object fails EMF schema validation
"""
def __init__(
self,
metric_set: list | None = None,
namespace: str | None = None,
flush_to_log: bool | None = None,
default_tags: dict[str, Any] | None = None,
):
self.metric_set = metric_set if metric_set is not None else []
self.namespace = (
resolve_env_var_choice(choice=namespace, env=os.getenv(constants.METRICS_NAMESPACE_ENV))
or DEFAULT_NAMESPACE
)
self.default_tags = default_tags or {}
self.flush_to_log = resolve_env_var_choice(choice=flush_to_log, env=os.getenv(constants.DATADOG_FLUSH_TO_LOG))
# adding name,value,timestamp,tags
def add_metric(
self,
name: str,
value: float,
timestamp: int | None = None,
**tags,
) -> None:
"""
The add_metrics function that will be used by metrics class.
Parameters
----------
name: str
Name/Key for the metrics
value: float
Value for the metrics
timestamp: int
Timestamp in int for the metrics, default = time.time()
tags: list[str]
In format like ["tag:value", "tag2:value2"]
Examples
--------
>>> provider = DatadogProvider()
>>>
>>> provider.add_metric(
>>> name='coffee_house.order_value',
>>> value=12.45,
>>> tags=['product:latte', 'order:online'],
>>> sales='sam'
>>> )
"""
# validating metric name
if not self._validate_datadog_metric_name(name):
docs = "https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics"
raise SchemaValidationError(
f"Invalid metric name. Please ensure the metric {name} follows the requirements. \n"
f"See Datadog documentation here: \n {docs}",
)
# validating metric tag
self._validate_datadog_tags_name(tags)
if not isinstance(value, numbers.Real):
raise MetricValueError(f"{value} is not a valid number")
if not timestamp:
timestamp = int(time.time())
logger.debug({"details": "Appending metric", "metrics": name})
self.metric_set.append({"m": name, "v": value, "e": timestamp, "t": tags})
def serialize_metric_set(self, metrics: list | None = None) -> list:
"""Serializes metrics
Example
-------
**Serialize metrics into Datadog format**
metrics = DatadogMetric()
# ...add metrics, tags, namespace
ret = metrics.serialize_metric_set()
Returns
-------
list
Serialized metrics following Datadog specification
Raises
------
SchemaValidationError
Raised when serialization fail schema validation
"""
if metrics is None: # pragma: no cover
metrics = self.metric_set
if len(metrics) == 0:
raise SchemaValidationError("Must contain at least one metric.")
output_list: list = []
logger.debug({"details": "Serializing metrics", "metrics": metrics})
for single_metric in metrics:
if self.namespace != DEFAULT_NAMESPACE:
metric_name = f"{self.namespace}.{single_metric['m']}"
else:
metric_name = single_metric["m"]
output_list.append(
{
"m": metric_name,
"v": single_metric["v"],
"e": single_metric["e"],
"t": self._serialize_datadog_tags(metric_tags=single_metric["t"], default_tags=self.default_tags),
},
)
return output_list
# flush serialized data to output
def flush_metrics(self, raise_on_empty_metrics: bool = False) -> None:
"""Manually flushes the metrics. This is normally not necessary,
unless you're running on other runtimes besides Lambda, where the @log_metrics
decorator already handles things for you.
Parameters
----------
raise_on_empty_metrics : bool, optional
raise exception if no metrics are emitted, by default False
"""
if not raise_on_empty_metrics and len(self.metric_set) == 0:
warnings.warn(
"No application metrics to publish. The cold-start metric may be published if enabled. "
"If application metrics should never be empty, consider using 'raise_on_empty_metrics'",
stacklevel=2,
)
else:
logger.debug("Flushing existing metrics")
metrics = self.serialize_metric_set()
# submit through datadog extension
if lambda_metric and not self.flush_to_log:
# use lambda_metric function from datadog package, submit metrics to datadog
for metric_item in metrics: # pragma: no cover
lambda_metric( # pragma: no cover
metric_name=metric_item["m"],
value=metric_item["v"],
timestamp=metric_item["e"],
tags=metric_item["t"],
)
elif not is_metrics_disabled():
# dd module not found: flush to log, this format can be recognized via datadog log forwarder
# https://github.com/Datadog/datadog-lambda-python/blob/main/datadog_lambda/metric.py#L77
for metric_item in metrics:
print(json.dumps(metric_item, separators=(",", ":")))
self.clear_metrics()
def clear_metrics(self):
logger.debug("Clearing out existing metric set from memory")
self.metric_set.clear()
def add_cold_start_metric(self, context: LambdaContext) -> None:
"""Add cold start metric and function_name dimension
Parameters
----------
context : Any
Lambda context
"""
logger.debug("Adding cold start metric and function_name tagging")
self.add_metric(name="ColdStart", value=1, function_name=context.function_name)
def log_metrics(
self,
lambda_handler: AnyCallableT | None = None,
capture_cold_start_metric: bool = False,
raise_on_empty_metrics: bool = False,
**kwargs,
):
"""Decorator to serialize and publish metrics at the end of a function execution.
Be aware that the log_metrics **does call* the decorated function (e.g. lambda_handler).
Example
-------
**Lambda function using tracer and metrics decorators**
from aws_lambda_powertools import Tracer
from aws_lambda_powertools.metrics.provider.datadog import DatadogMetrics
metrics = DatadogMetrics(namespace="powertools")
tracer = Tracer(service="payment")
@tracer.capture_lambda_handler
@metrics.log_metrics
def handler(event, context):
...
Parameters
----------
lambda_handler : Callable[[Any, Any], Any], optional
lambda function handler, by default None
capture_cold_start_metric : bool, optional
captures cold start metric, by default False
raise_on_empty_metrics : bool, optional
raise exception if no metrics are emitted, by default False
**kwargs
Raises
------
e
Propagate error received
"""
default_tags = kwargs.get("default_tags")
if default_tags:
self.set_default_tags(**default_tags)
return super().log_metrics(
lambda_handler=lambda_handler,
capture_cold_start_metric=capture_cold_start_metric,
raise_on_empty_metrics=raise_on_empty_metrics,
**kwargs,
)
def set_default_tags(self, **tags) -> None:
"""Persist tags across Lambda invocations
Parameters
----------
tags : **kwargs
tags as key=value
Example
-------
**Sets some default dimensions that will always be present across metrics and invocations**
from aws_lambda_powertools import Metrics
metrics = Metrics(namespace="ServerlessAirline", service="payment")
metrics.set_default_tags(environment="demo", another="one")
@metrics.log_metrics()
def lambda_handler():
return True
"""
self._validate_datadog_tags_name(tags)
self.default_tags.update(**tags)
@staticmethod
def _serialize_datadog_tags(metric_tags: dict[str, Any], default_tags: dict[str, Any]) -> list[str]:
"""
Serialize metric tags into a list of formatted strings for Datadog integration.
This function takes a dictionary of metric-specific tags or default tags.
It parse these tags and converts them into a list of strings in the format "tag_key:tag_value".
Parameters
----------
metric_tags: dict[str, Any]
A dictionary containing metric-specific tags.
default_tags: dict[str, Any]
A dictionary containing default tags applicable to all metrics.
Returns:
-------
list[str]
A list of formatted tag strings, each in the "tag_key:tag_value" format.
Example:
>>> metric_tags = {'environment': 'production', 'service': 'web'}
>>> serialize_datadog_tags(metric_tags, None)
['environment:production', 'service:web']
"""
# We need to create a new dictionary by combining default_tags first,
# and then metric_tags on top of it. This ensures that the keys from metric_tags take precedence
# and replace corresponding keys in default_tags.
tags = {**default_tags, **metric_tags}
return [f"{tag_key}:{tag_value}" for tag_key, tag_value in tags.items()]
@staticmethod
def _validate_datadog_tags_name(tags: dict):
"""
Validate a metric tag according to specific requirements.
Metric tags must start with a letter.
Metric tags must not exceed 200 characters. Fewer than 100 is preferred from a UI perspective.
More information here: https://docs.datadoghq.com/getting_started/tagging/#define-tags
Parameters:
----------
tags: dict
The metric tags to be validated.
"""
for tag_key, tag_value in tags.items():
tag = f"{tag_key}:{tag_value}"
if not tag[0].isalpha() or len(tag) > 200:
docs = "https://docs.datadoghq.com/getting_started/tagging/#define-tags"
warnings.warn(
f"Invalid tag value. Please ensure the specific tag {tag} follows the requirements. \n"
f"May incur data loss for metrics. \n"
f"See Datadog documentation here: \n {docs}",
DatadogDataValidationWarning,
stacklevel=2,
)
@staticmethod
def _validate_datadog_metric_name(metric_name: str) -> bool:
"""
Validate a metric name according to specific requirements.
Metric names must start with a letter.
Metric names must only contain ASCII alphanumerics, underscores, and periods.
Other characters, including spaces, are converted to underscores.
Unicode is not supported.
Metric names must not exceed 200 characters. Fewer than 100 is preferred from a UI perspective.
More information here: https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics
Parameters:
----------
metric_name: str
The metric name to be validated.
Returns:
-------
bool
True if the metric name is valid, False otherwise.
"""
# Check if the metric name starts with a letter
# Check if the metric name contains more than 200 characters
# Check if the resulting metric name only contains ASCII alphanumerics, underscores, and periods
if not metric_name[0].isalpha() or len(metric_name) > 200 or not METRIC_NAME_REGEX.match(metric_name):
return False
return True