Skip to content

Fixed: Cron expression resetting on update monitor #3655

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions src/sagemaker/model_monitor/dataset_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ def csv(header=True):
Args:
header (bool): Whether the csv dataset to baseline and monitor has a header.
Default: True.
output_columns_position (str): The position of the output columns.
Must be one of ("START", "END"). Default: "START".

Returns:
dict: JSON string containing DatasetFormat to be used by DefaultModelMonitor.
Expand Down
15 changes: 15 additions & 0 deletions src/sagemaker/model_monitor/model_monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -2054,6 +2054,21 @@ def _update_data_quality_monitoring_schedule(
self._update_monitoring_schedule(self.job_definition_name, schedule_cron_expression)
return

existing_desc = self.sagemaker_session.describe_monitoring_schedule(
monitoring_schedule_name=self.monitoring_schedule_name
)

if (
existing_desc.get("MonitoringScheduleConfig") is not None
and existing_desc["MonitoringScheduleConfig"].get("ScheduleConfig") is not None
and existing_desc["MonitoringScheduleConfig"]["ScheduleConfig"]["ScheduleExpression"]
is not None
and schedule_cron_expression is None
):
schedule_cron_expression = existing_desc["MonitoringScheduleConfig"]["ScheduleConfig"][
"ScheduleExpression"
]

# Need to update schedule with a new job definition
job_desc = self.sagemaker_session.sagemaker_client.describe_data_quality_job_definition(
JobDefinitionName=self.job_definition_name
Expand Down
48 changes: 47 additions & 1 deletion tests/integ/test_model_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -802,6 +802,49 @@ def test_default_monitor_create_and_update_schedule_config_without_customization
_wait_for_schedule_changes_to_apply(monitor=my_default_monitor)


@pytest.mark.skipif(
tests.integ.test_region() in tests.integ.NO_MODEL_MONITORING_REGIONS,
reason="ModelMonitoring is not yet supported in this region.",
)
def test_default_monitor_create_and_simple_update_schedule_config(sagemaker_session, predictor):
my_default_monitor = DefaultModelMonitor(role=ROLE, sagemaker_session=sagemaker_session)

my_default_monitor.create_monitoring_schedule(
endpoint_input=predictor.endpoint_name,
schedule_cron_expression=CronExpressionGenerator.hourly(),
)

schedule_description = my_default_monitor.describe_schedule()
_verify_default_monitoring_schedule(
sagemaker_session=sagemaker_session,
schedule_description=schedule_description,
cron_expression=CronExpressionGenerator.hourly(),
)

_wait_for_schedule_changes_to_apply(my_default_monitor)

my_default_monitor.update_monitoring_schedule(
max_runtime_in_seconds=UPDATED_MAX_RUNTIME_IN_SECONDS
)

_wait_for_schedule_changes_to_apply(my_default_monitor)

schedule_description = my_default_monitor.describe_schedule()

_verify_default_monitoring_schedule(
sagemaker_session=sagemaker_session,
schedule_description=schedule_description,
cron_expression=CronExpressionGenerator.hourly(),
max_runtime_in_seconds=UPDATED_MAX_RUNTIME_IN_SECONDS,
)

_wait_for_schedule_changes_to_apply(monitor=my_default_monitor)

my_default_monitor.stop_monitoring_schedule()

_wait_for_schedule_changes_to_apply(monitor=my_default_monitor)


@pytest.mark.skipif(
tests.integ.test_region() in tests.integ.NO_MODEL_MONITORING_REGIONS,
reason="ModelMonitoring is not yet supported in this region.",
Expand Down Expand Up @@ -1644,7 +1687,10 @@ def _verify_default_monitoring_schedule(
schedule_description["MonitoringScheduleConfig"]["ScheduleConfig"]["ScheduleExpression"]
== cron_expression
)
assert schedule_description["MonitoringType"] == "DataQuality"
if schedule_description.get("MonitoringType") is not None:
assert schedule_description["MonitoringType"] == "DataQuality"
else:
assert schedule_description["MonitoringScheduleConfig"]["MonitoringType"] == "DataQuality"
job_definition_name = schedule_description["MonitoringScheduleConfig"].get(
"MonitoringJobDefinitionName"
)
Expand Down
8 changes: 5 additions & 3 deletions tests/unit/sagemaker/monitor/test_model_monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
PROBLEM_TYPE = "Regression"
GROUND_TRUTH_ATTRIBUTE = "TestAttribute"


CRON_DAILY = CronExpressionGenerator.daily()
BASELINING_JOB_NAME = "baselining-job"
BASELINE_DATASET_PATH = "/my/local/path/baseline.csv"
PREPROCESSOR_PATH = "/my/local/path/preprocessor.py"
Expand All @@ -84,7 +84,9 @@
MONITORING_SCHEDULE_DESC = {
"MonitoringScheduleArn": "arn:aws:monitoring-schedule",
"MonitoringScheduleName": "my-monitoring-schedule",
"MonitoringScheduleStatus": "Completed",
"MonitoringScheduleConfig": {
"ScheduleExpression": CRON_DAILY,
"MonitoringJobDefinition": {
"MonitoringOutputConfig": {},
"MonitoringResources": {
Expand All @@ -101,7 +103,7 @@
],
},
"RoleArn": ROLE,
}
},
},
"EndpointName": "my-endpoint",
}
Expand Down Expand Up @@ -282,7 +284,6 @@
security_group_ids=NEW_SECURITY_GROUP_IDS,
subnets=NEW_SUBNETS,
)
CRON_DAILY = CronExpressionGenerator.daily()
NEW_ENDPOINT_NAME = "new-endpoint"
NEW_GROUND_TRUTH_S3_URI = "s3://bucket/monitoring_captured/groundtruth"
NEW_START_TIME_OFFSET = "-PT2H"
Expand Down Expand Up @@ -858,6 +859,7 @@ def _test_data_quality_monitor_update_schedule(data_quality_monitor, sagemaker_s
)
sagemaker_session.sagemaker_client.create_data_quality_job_definition = Mock()
sagemaker_session.expand_role = Mock(return_value=NEW_ROLE_ARN)
sagemaker_session.describe_monitoring_schedule = Mock(return_value=MONITORING_SCHEDULE_DESC)
old_job_definition_name = data_quality_monitor.job_definition_name
data_quality_monitor.update_monitoring_schedule(role=NEW_ROLE_ARN)
expected_arguments = {
Expand Down