10
10
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11
11
# ANY KIND, either express or implied. See the License for the specific
12
12
# language governing permissions and limitations under the License.
13
- """This module contains code related to Amazon SageMaker Monitoring Schedules . These
14
- classes assist with suggesting baselines and creating monitoring schedules for data captured
13
+ """This module contains code related to Amazon SageMaker Model Monitoring . These classes
14
+ assist with suggesting baselines and creating monitoring schedules for data captured
15
15
by SageMaker Endpoints.
16
16
"""
17
17
from __future__ import print_function , absolute_import
18
18
19
+ import copy
19
20
import json
20
21
import os
21
22
import logging
35
36
from sagemaker .processing import ProcessingJob
36
37
from sagemaker .processing import ProcessingInput
37
38
from sagemaker .processing import ProcessingOutput
38
- from sagemaker .model_monitor .cron_expression_generator import CronExpressionGenerator
39
39
from sagemaker .model_monitor .monitoring_files import Constraints , ConstraintViolations
40
40
from sagemaker .model_monitor .monitoring_files import Statistics
41
41
from sagemaker .exceptions import UnexpectedStatusException
42
+ from sagemaker .utils import retries
42
43
43
44
_DEFAULT_MONITOR_IMAGE_URI_WITH_PLACEHOLDERS = (
44
45
"{}.dkr.ecr.{}.amazonaws.com/sagemaker-model-monitor-analyzer"
58
59
"ap-northeast-2" : "709848358524" ,
59
60
"eu-west-2" : "749857270468" ,
60
61
"ap-northeast-1" : "574779866223" ,
61
- "us-west-2" : "159807026194" , # Prod
62
- # "us-west-2": "894667893881", # Gamma. # TODO-reinvent-2019 [knakad]: Remove this.
62
+ "us-west-2" : "159807026194" ,
63
63
"us-west-1" : "890145073186" ,
64
64
"ap-southeast-1" : "245545462676" ,
65
65
"ap-southeast-2" : "563025443158" ,
86
86
87
87
_SUGGESTION_JOB_BASE_NAME = "baseline-suggestion-job"
88
88
_MONITORING_SCHEDULE_BASE_NAME = "monitoring-schedule"
89
- _SCHEDULE_NAME_SUFFIX = "monitoring-schedule"
90
89
91
90
_DATASET_SOURCE_PATH_ENV_NAME = "dataset_source"
92
91
_DATASET_FORMAT_ENV_NAME = "dataset_format"
96
95
_PUBLISH_CLOUDWATCH_METRICS_ENV_NAME = "publish_cloudwatch_metrics"
97
96
98
97
_LOGGER = logging .getLogger (__name__ )
99
- # TODO-reinvent-2019 [knakad]: Review all docstrings.
100
98
101
99
102
100
class ModelMonitor (object ):
@@ -239,8 +237,8 @@ def create_monitoring_schedule(
239
237
output ,
240
238
statistics = None ,
241
239
constraints = None ,
242
- monitor_schedule_name = None , # TODO-reinvent-2019 [knakad]: Change to mon_sched_name evwhere
243
- schedule_cron_expression = CronExpressionGenerator . hourly () ,
240
+ monitor_schedule_name = None ,
241
+ schedule_cron_expression = None ,
244
242
):
245
243
"""Creates a monitoring schedule to monitor an Amazon SageMaker Endpoint.
246
244
@@ -266,9 +264,18 @@ def create_monitoring_schedule(
266
264
a default job name, based on the image name and current timestamp.
267
265
schedule_cron_expression (str): The cron expression that dictates the frequency that
268
266
this job runs at. See sagemaker.model_monitor.CronExpressionGenerator for valid
269
- expressions.
267
+ expressions. Default: Daily.
270
268
271
269
"""
270
+ if self .monitoring_schedule_name is not None :
271
+ message = (
272
+ "It seems that this object was already used to create an Amazon Model "
273
+ "Monitoring Schedule. To create another, first delete the existing one "
274
+ "using my_monitor.delete_monitoring_schedule()."
275
+ )
276
+ print (message )
277
+ raise ValueError (message )
278
+
272
279
self .monitoring_schedule_name = self ._generate_monitoring_schedule_name (
273
280
schedule_name = monitor_schedule_name
274
281
)
@@ -474,23 +481,30 @@ def update_monitoring_schedule(
474
481
role_arn = role ,
475
482
)
476
483
484
+ self ._wait_for_schedule_changes_to_apply ()
485
+
477
486
def start_monitoring_schedule (self ):
478
487
"""Starts the monitoring schedule."""
479
488
self .sagemaker_session .start_monitoring_schedule (
480
489
monitoring_schedule_name = self .monitoring_schedule_name
481
490
)
482
491
492
+ self ._wait_for_schedule_changes_to_apply ()
493
+
483
494
def stop_monitoring_schedule (self ):
484
495
"""Stops the monitoring schedule."""
485
496
self .sagemaker_session .stop_monitoring_schedule (
486
497
monitoring_schedule_name = self .monitoring_schedule_name
487
498
)
488
499
500
+ self ._wait_for_schedule_changes_to_apply ()
501
+
489
502
def delete_monitoring_schedule (self ):
490
503
"""Deletes the monitoring schedule."""
491
504
self .sagemaker_session .delete_monitoring_schedule (
492
505
monitoring_schedule_name = self .monitoring_schedule_name
493
506
)
507
+ self .monitoring_schedule_name = None
494
508
495
509
def baseline_statistics (self , file_name = STATISTICS_JSON_DEFAULT_FILE_NAME ):
496
510
"""Returns a Statistics object representing the statistics json file generated by the
@@ -665,7 +679,7 @@ def attach(cls, monitor_schedule_name, sagemaker_session=None):
665
679
]["ClusterConfig" ]["InstanceType" ]
666
680
entrypoint = schedule_desc ["MonitoringScheduleConfig" ]["MonitoringJobDefinition" ][
667
681
"MonitoringAppSpecification"
668
- ][ "ContainerEntrypoint" ]
682
+ ]. get ( "ContainerEntrypoint" )
669
683
volume_size_in_gb = schedule_desc ["MonitoringScheduleConfig" ]["MonitoringJobDefinition" ][
670
684
"MonitoringResources"
671
685
]["ClusterConfig" ]["VolumeSizeInGB" ]
@@ -744,7 +758,7 @@ def _generate_baselining_job_name(self, job_name=None):
744
758
return job_name
745
759
746
760
if self .base_job_name :
747
- base_name = "{}-{}" . format ( self .base_job_name , _SCHEDULE_NAME_SUFFIX )
761
+ base_name = self .base_job_name
748
762
else :
749
763
base_name = _SUGGESTION_JOB_BASE_NAME
750
764
@@ -932,6 +946,20 @@ def _s3_uri_from_local_path(self, path):
932
946
path = os .path .join (s3_uri , os .path .basename (path ))
933
947
return path
934
948
949
+ def _wait_for_schedule_changes_to_apply (self ):
950
+ """Waits for the schedule associated with this monitor to no longer be in the 'Pending'
951
+ state.
952
+
953
+ """
954
+ for _ in retries (
955
+ max_retry_count = 36 , # 36*5 = 3min
956
+ exception_message_prefix = "Waiting for schedule to leave 'Pending' status" ,
957
+ seconds_to_sleep = 5 ,
958
+ ):
959
+ schedule_desc = self .describe_schedule ()
960
+ if schedule_desc ["MonitoringScheduleStatus" ] != "Pending" :
961
+ break
962
+
935
963
936
964
class DefaultModelMonitor (ModelMonitor ):
937
965
"""Sets up Amazon SageMaker Monitoring Schedules and baseline suggestions. Use this class when
@@ -1088,7 +1116,6 @@ def suggest_baseline(
1088
1116
dataset_format = dataset_format ,
1089
1117
output_path = normalized_baseline_output .source ,
1090
1118
enable_cloudwatch_metrics = False , # Only supported for monitoring schedules
1091
- # TODO-reinvent-2019 [knakad]: Remove this once API stops failing if not provided.
1092
1119
dataset_source_container_path = baseline_dataset_container_path ,
1093
1120
record_preprocessor_script_container_path = record_preprocessor_script_container_path ,
1094
1121
post_processor_script_container_path = post_processor_script_container_path ,
@@ -1147,8 +1174,7 @@ def create_monitoring_schedule(
1147
1174
constraints = None ,
1148
1175
statistics = None ,
1149
1176
monitor_schedule_name = None ,
1150
- schedule_cron_expression = CronExpressionGenerator .hourly (),
1151
- # TODO-reinvent-2019 [knakad]: Service to default this to daily at a random hour
1177
+ schedule_cron_expression = None ,
1152
1178
enable_cloudwatch_metrics = True ,
1153
1179
):
1154
1180
"""Creates a monitoring schedule to monitor an Amazon SageMaker Endpoint.
@@ -1179,11 +1205,20 @@ def create_monitoring_schedule(
1179
1205
a default job name, based on the image name and current timestamp.
1180
1206
schedule_cron_expression (str): The cron expression that dictates the frequency that
1181
1207
this job run. See sagemaker.model_monitor.CronExpressionGenerator for valid
1182
- expressions.
1208
+ expressions. Default: Daily.
1183
1209
enable_cloudwatch_metrics (bool): Whether to publish cloudwatch metrics as part of
1184
1210
the baselining or monitoring jobs.
1185
1211
1186
1212
"""
1213
+ if self .monitoring_schedule_name is not None :
1214
+ message = (
1215
+ "It seems that this object was already used to create an Amazon Model "
1216
+ "Monitoring Schedule. To create another, first delete the existing one "
1217
+ "using my_monitor.delete_monitoring_schedule()."
1218
+ )
1219
+ print (message )
1220
+ raise ValueError (message )
1221
+
1187
1222
self .monitoring_schedule_name = self ._generate_monitoring_schedule_name (
1188
1223
schedule_name = monitor_schedule_name
1189
1224
)
@@ -1354,12 +1389,7 @@ def update_monitoring_schedule(
1354
1389
self .env = env
1355
1390
1356
1391
normalized_env = self ._generate_env_map (
1357
- env = env ,
1358
- # dataset_format=DatasetFormat.sagemaker_capture_json(),
1359
- output_path = output_path ,
1360
- enable_cloudwatch_metrics = enable_cloudwatch_metrics ,
1361
- # record_preprocessor_script_input=record_preprocessor_script_input,
1362
- # post_analytics_processor_script_input=post_analytics_processor_script_input,
1392
+ env = env , output_path = output_path , enable_cloudwatch_metrics = enable_cloudwatch_metrics
1363
1393
)
1364
1394
1365
1395
statistics_object , constraints_object = self ._get_baseline_files (
@@ -1422,6 +1452,8 @@ def update_monitoring_schedule(
1422
1452
role_arn = role ,
1423
1453
)
1424
1454
1455
+ self ._wait_for_schedule_changes_to_apply ()
1456
+
1425
1457
def run_baseline (self ):
1426
1458
"""'.run_baseline()' is only allowed for ModelMonitor objects. Please use suggest_baseline
1427
1459
for DefaultModelMonitor objects, instead."""
@@ -1569,8 +1601,8 @@ def latest_monitoring_constraint_violations(self):
1569
1601
except ClientError :
1570
1602
status = latest_monitoring_execution .describe ()["ProcessingJobStatus" ]
1571
1603
print (
1572
- "Unable to retrieve statistics as job is in status '{}'. Latest violations only "
1573
- "available for completed executions." .format (status )
1604
+ "Unable to retrieve constraint violations as job is in status '{}'. Latest "
1605
+ "violations only available for completed executions." .format (status )
1574
1606
)
1575
1607
1576
1608
def _normalize_baseline_output (self , output_s3_uri = None ):
@@ -1649,7 +1681,7 @@ def _generate_env_map(
1649
1681
cloudwatch_env_map = {True : "Enabled" , False : "Disabled" }
1650
1682
1651
1683
if env is not None :
1652
- env = env . copy ( )
1684
+ env = copy . deepcopy ( env )
1653
1685
env = env or {}
1654
1686
1655
1687
if output_path is not None :
@@ -1672,12 +1704,6 @@ def _generate_env_map(
1672
1704
if dataset_source_container_path is not None :
1673
1705
env [_DATASET_SOURCE_PATH_ENV_NAME ] = dataset_source_container_path
1674
1706
1675
- # if dataset_source_input is not None:
1676
- # dataset_source_input_container_path = os.path.join(
1677
- # dataset_source_input.destination, os.path.basename(dataset_source_input.source)
1678
- # )
1679
- # env[_DATASET_SOURCE_PATH_ENV_NAME] = dataset_source_input_container_path
1680
-
1681
1707
return env
1682
1708
1683
1709
def _upload_and_convert_to_processing_input (self , source , destination , name ):
@@ -1808,7 +1834,7 @@ def baseline_statistics(self, file_name=STATISTICS_JSON_DEFAULT_FILE_NAME, kms_k
1808
1834
actual_status = status ,
1809
1835
)
1810
1836
else :
1811
- raise
1837
+ raise client_error
1812
1838
1813
1839
def suggested_constraints (self , file_name = CONSTRAINTS_JSON_DEFAULT_FILE_NAME , kms_key = None ):
1814
1840
"""Returns a sagemaker.model_monitor.Constraints object representing the constraints
@@ -1845,7 +1871,7 @@ def suggested_constraints(self, file_name=CONSTRAINTS_JSON_DEFAULT_FILE_NAME, km
1845
1871
actual_status = status ,
1846
1872
)
1847
1873
else :
1848
- raise
1874
+ raise client_error
1849
1875
1850
1876
1851
1877
class MonitoringExecution (ProcessingJob ):
@@ -1956,7 +1982,7 @@ def statistics(self, file_name=STATISTICS_JSON_DEFAULT_FILE_NAME, kms_key=None):
1956
1982
actual_status = status ,
1957
1983
)
1958
1984
else :
1959
- raise
1985
+ raise client_error
1960
1986
1961
1987
def constraint_violations (
1962
1988
self , file_name = CONSTRAINT_VIOLATIONS_JSON_DEFAULT_FILE_NAME , kms_key = None
@@ -1997,7 +2023,7 @@ def constraint_violations(
1997
2023
actual_status = status ,
1998
2024
)
1999
2025
else :
2000
- raise
2026
+ raise client_error
2001
2027
2002
2028
2003
2029
class EndpointInput (object ):
0 commit comments