1
1
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
- # SPDX-License-Identifier: MIT-0
3
-
4
- # Permission is hereby granted, free of charge, to any person obtaining a copy
5
- # of this software and associated documentation files (the "Software"), to deal
6
- # in the Software without restriction, including without limitation the rights
7
- # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8
- # copies of the Software, and to permit persons to whom the Software is
9
- # furnished to do so.
10
-
11
- # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
12
- # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
13
- # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
14
- # AUTHORS OR COPYRIGHT OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
15
- # IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
2
+ #
3
+ # Licensed under the Apache License, Version 2.0 (the "License"). You
4
+ # may not use this file except in compliance with the License. A copy of
5
+ # the License is located at
6
+ #
7
+ # http://aws.amazon.com/apache2.0/
8
+ #
9
+ # or in the "license" file accompanying this file. This file is
10
+ # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11
+ # ANY KIND, either express or implied. See the License for the specific
12
+ # language governing permissions and limitations under the License.
13
+ """Helper functions to retrieve job metrics from CloudWatch."""
14
+ from __future__ import absolute_import
16
15
17
16
from datetime import datetime , timedelta
18
17
from typing import Callable , List , Optional , Tuple , Dict , Any
19
18
import hashlib
20
19
import os
21
20
from pathlib import Path
22
21
22
+ import logging
23
23
import pandas as pd
24
24
import numpy as np
25
25
import boto3
26
- import logging
27
26
28
27
logger = logging .getLogger (__name__ )
29
28
@@ -58,16 +57,16 @@ def inner(*args: Any, **kwargs: Any) -> pd.DataFrame:
58
57
logger .debug ("H" , end = "" )
59
58
df ["ts" ] = pd .to_datetime (df ["ts" ])
60
59
df ["ts" ] = df ["ts" ].dt .tz_localize (None )
61
- df ["rel_ts" ] = pd .to_datetime (df ["rel_ts" ]) # pyright: ignore [reportIndexIssue, reportOptionalSubscript]
60
+ # pyright: ignore [reportIndexIssue, reportOptionalSubscript]
61
+ df ["rel_ts" ] = pd .to_datetime (df ["rel_ts" ])
62
62
df ["rel_ts" ] = df ["rel_ts" ].dt .tz_localize (None )
63
63
return df
64
64
except KeyError :
65
65
# Empty file leads to empty df, hence no df['ts'] possible
66
66
pass
67
67
# nosec b110 - doesn't matter why we could not load it.
68
68
except BaseException as e :
69
- logger .error ("\n Exception" , type (e ), e )
70
- pass # continue with calling the outer function
69
+ logger .error ("\n Exception: %s - %s" , type (e ), e )
71
70
72
71
logger .debug ("M" , end = "" )
73
72
df = outer (* args , ** kwargs )
@@ -82,6 +81,7 @@ def inner(*args: Any, **kwargs: Any) -> pd.DataFrame:
82
81
83
82
84
83
def _metric_data_query_tpl (metric_name : str , dim_name : str , dim_value : str ) -> Dict [str , Any ]:
84
+ """Returns a CloudWatch metric data query template."""
85
85
return {
86
86
"Id" : metric_name .lower ().replace (":" , "_" ).replace ("-" , "_" ),
87
87
"MetricStat" : {
@@ -100,18 +100,19 @@ def _metric_data_query_tpl(metric_name: str, dim_name: str, dim_value: str) -> D
100
100
101
101
102
102
def _get_metric_data (
103
- queries : List [Dict [str , Any ]],
104
- start_time : datetime ,
103
+ queries : List [Dict [str , Any ]],
104
+ start_time : datetime ,
105
105
end_time : datetime
106
106
) -> pd .DataFrame :
107
+ """Fetches CloudWatch metrics between timestamps and returns a DataFrame with selected columns."""
107
108
start_time = start_time - timedelta (hours = 1 )
108
109
end_time = end_time + timedelta (hours = 1 )
109
110
response = cw .get_metric_data (MetricDataQueries = queries , StartTime = start_time , EndTime = end_time )
110
111
111
112
df = pd .DataFrame ()
112
113
if "MetricDataResults" not in response :
113
114
return df
114
-
115
+
115
116
for metric_data in response ["MetricDataResults" ]:
116
117
values = metric_data ["Values" ]
117
118
ts = np .array (metric_data ["Timestamps" ], dtype = np .datetime64 )
@@ -130,11 +131,11 @@ def _get_metric_data(
130
131
131
132
@disk_cache
132
133
def _collect_metrics (
133
- dimensions : List [Tuple [str , str ]],
134
- start_time : datetime ,
134
+ dimensions : List [Tuple [str , str ]],
135
+ start_time : datetime ,
135
136
end_time : Optional [datetime ]
136
137
) -> pd .DataFrame :
137
-
138
+ """Collects SageMaker training job metrics from CloudWatch based on given dimensions and time range."""
138
139
df = pd .DataFrame ()
139
140
for dim_name , dim_value in dimensions :
140
141
response = cw .list_metrics (
@@ -158,8 +159,8 @@ def _collect_metrics(
158
159
159
160
160
161
def get_cw_job_metrics (
161
- job_name : str ,
162
- start_time : Optional [datetime ] = None ,
162
+ job_name : str ,
163
+ start_time : Optional [datetime ] = None ,
163
164
end_time : Optional [datetime ] = None
164
165
) -> pd .DataFrame :
165
166
"""Retrieves CloudWatch metrics for a SageMaker training job.
@@ -182,4 +183,4 @@ def get_cw_job_metrics(
182
183
# If not given, use reasonable defaults for start and end time
183
184
start_time = start_time or datetime .now () - timedelta (hours = 4 )
184
185
end_time = end_time or start_time + timedelta (hours = 4 )
185
- return _collect_metrics (dimensions , start_time , end_time )
186
+ return _collect_metrics (dimensions , start_time , end_time )
0 commit comments