Skip to content

Commit 59bace4

Browse files
keshav-chandakKeshav Chandak
and
Keshav Chandak
authored
feat: Support to get latest monitoring execution processing logs (#4036)
Co-authored-by: Keshav Chandak <[email protected]>
1 parent 3662c8d commit 59bace4

File tree

4 files changed

+117
-0
lines changed

4 files changed

+117
-0
lines changed

src/sagemaker/model_monitor/clarify_model_monitoring.py

+24
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from sagemaker.session import Session
2828
from sagemaker.utils import name_from_base
2929
from sagemaker.clarify import SageMakerClarifyProcessor, ModelPredictedLabelConfig
30+
from sagemaker.lineage._utils import get_resource_name_from_arn
3031

3132
_LOGGER = logging.getLogger(__name__)
3233

@@ -154,6 +155,29 @@ def list_executions(self):
154155
for execution in executions
155156
]
156157

158+
def get_latest_execution_logs(self, wait=False):
159+
"""Get the processing job logs for the most recent monitoring execution
160+
161+
Args:
162+
wait (bool): Whether the call should wait until the job completes (default: False).
163+
164+
Raises:
165+
ValueError: If no execution job or processing job for the last execution has run
166+
167+
Returns: None
168+
"""
169+
monitoring_executions = self.sagemaker_session.list_monitoring_executions(
170+
monitoring_schedule_name=self.monitoring_schedule_name
171+
)
172+
if len(monitoring_executions["MonitoringExecutionSummaries"]) == 0:
173+
raise ValueError("No execution jobs were kicked off.")
174+
if "ProcessingJobArn" not in monitoring_executions["MonitoringExecutionSummaries"][0]:
175+
raise ValueError("Processing Job did not run for the last execution")
176+
job_arn = monitoring_executions["MonitoringExecutionSummaries"][0]["ProcessingJobArn"]
177+
self.sagemaker_session.logs_for_processing_job(
178+
job_name=get_resource_name_from_arn(job_arn), wait=wait
179+
)
180+
157181
def _create_baselining_processor(self):
158182
"""Create and return a SageMakerClarifyProcessor object which will run the baselining job.
159183

src/sagemaker/model_monitor/model_monitoring.py

+24
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
resolve_value_from_config,
6464
resolve_class_attribute_from_config,
6565
)
66+
from sagemaker.lineage._utils import get_resource_name_from_arn
6667

6768
DEFAULT_REPOSITORY_NAME = "sagemaker-model-monitor-analyzer"
6869

@@ -768,6 +769,29 @@ def list_executions(self):
768769

769770
return monitoring_executions
770771

772+
def get_latest_execution_logs(self, wait=False):
773+
"""Get the processing job logs for the most recent monitoring execution
774+
775+
Args:
776+
wait (bool): Whether the call should wait until the job completes (default: False).
777+
778+
Raises:
779+
ValueError: If no execution job or processing job for the last execution has run
780+
781+
Returns: None
782+
"""
783+
monitoring_executions = self.sagemaker_session.list_monitoring_executions(
784+
monitoring_schedule_name=self.monitoring_schedule_name
785+
)
786+
if len(monitoring_executions["MonitoringExecutionSummaries"]) == 0:
787+
raise ValueError("No execution jobs were kicked off.")
788+
if "ProcessingJobArn" not in monitoring_executions["MonitoringExecutionSummaries"][0]:
789+
raise ValueError("Processing Job did not run for the last execution")
790+
job_arn = monitoring_executions["MonitoringExecutionSummaries"][0]["ProcessingJobArn"]
791+
self.sagemaker_session.logs_for_processing_job(
792+
job_name=get_resource_name_from_arn(job_arn), wait=wait
793+
)
794+
771795
def update_monitoring_alert(
772796
self,
773797
monitoring_alert_name: str,

tests/integ/test_model_monitor.py

+44
Original file line numberDiff line numberDiff line change
@@ -486,6 +486,38 @@ def test_default_monitor_suggest_baseline_and_create_monitoring_schedule_with_cu
486486
assert len(summary["MonitoringScheduleSummaries"]) > 0
487487

488488

489+
def test_default_monitor_display_logs_errors(sagemaker_session):
490+
my_default_monitor = DefaultModelMonitor(role=ROLE, sagemaker_session=sagemaker_session)
491+
492+
data_captured_destination_s3_uri = os.path.join(
493+
"s3://",
494+
sagemaker_session.default_bucket(),
495+
"sagemaker-serving-batch-transform",
496+
str(uuid.uuid4()),
497+
)
498+
499+
batch_transform_input = BatchTransformInput(
500+
data_captured_destination_s3_uri=data_captured_destination_s3_uri,
501+
destination="/opt/ml/processing/output",
502+
dataset_format=MonitoringDatasetFormat.csv(header=False),
503+
)
504+
505+
my_default_monitor.create_monitoring_schedule(
506+
batch_transform_input=batch_transform_input,
507+
schedule_cron_expression=CronExpressionGenerator.hourly(),
508+
)
509+
510+
_wait_for_schedule_changes_to_apply(monitor=my_default_monitor)
511+
512+
try:
513+
my_default_monitor.get_latest_execution_logs(wait=False)
514+
except ValueError as ve:
515+
assert "No execution jobs were kicked off." in str(ve)
516+
517+
my_default_monitor.stop_monitoring_schedule()
518+
my_default_monitor.delete_monitoring_schedule()
519+
520+
489521
@pytest.mark.skipif(
490522
tests.integ.test_region() in tests.integ.NO_MODEL_MONITORING_REGIONS,
491523
reason="ModelMonitoring is not yet supported in this region.",
@@ -1643,6 +1675,7 @@ def test_byoc_monitor_attach_followed_by_baseline_and_update_monitoring_schedule
16431675
output_kms_key,
16441676
updated_volume_kms_key,
16451677
updated_output_kms_key,
1678+
capfd,
16461679
):
16471680
baseline_dataset = os.path.join(DATA_DIR, "monitor/baseline_dataset.csv")
16481681

@@ -1771,6 +1804,10 @@ def test_byoc_monitor_attach_followed_by_baseline_and_update_monitoring_schedule
17711804

17721805
_wait_for_schedule_changes_to_apply(monitor=my_attached_monitor)
17731806

1807+
_check_processing_logs_generated(
1808+
monitor=my_attached_monitor, schedule_description=schedule_description, capfd=capfd
1809+
)
1810+
17741811
my_attached_monitor.stop_monitoring_schedule()
17751812

17761813
_wait_for_schedule_changes_to_apply(monitor=my_attached_monitor)
@@ -1877,6 +1914,13 @@ def test_default_monitor_monitoring_alerts(sagemaker_session, predictor):
18771914
my_default_monitor.delete_monitoring_schedule()
18781915

18791916

1917+
def _check_processing_logs_generated(monitor, schedule_description, capfd):
1918+
monitor.get_latest_execution_logs(wait=False)
1919+
out, _ = capfd.readouterr()
1920+
assert len(out) > 0
1921+
assert schedule_description.get("LastMonitoringExecutionSummary")["ProcessingJobArn"] in out
1922+
1923+
18801924
def _wait_for_schedule_changes_to_apply(monitor):
18811925
"""Waits for the monitor to no longer be in the 'Pending' state. Updates take under a minute
18821926
to apply.

tests/unit/sagemaker/monitor/test_clarify_model_monitor.py

+25
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,14 @@
221221
"NetworkConfig": NETWORK_CONFIG._to_request_dict(),
222222
}
223223

224+
MONITORING_EXECUTIONS_EMPTY = {
225+
"MonitoringExecutionSummaries": [],
226+
}
227+
228+
MONITORING_EXECUTIONS_NO_PROCESSING_JOB = {
229+
"MonitoringExecutionSummaries": [{"MonitoringSchedule": "MonitoringSchedule"}],
230+
}
231+
224232
# For update API
225233
NEW_ROLE_ARN = "arn:aws:iam::012345678902:role/{}".format(ROLE)
226234
NEW_INSTANCE_COUNT = 2
@@ -1716,3 +1724,20 @@ def _test_model_explainability_monitor_delete_schedule(
17161724
sagemaker_session.sagemaker_client.delete_model_explainability_job_definition.assert_called_once_with(
17171725
JobDefinitionName=job_definition_name
17181726
)
1727+
1728+
1729+
def test_model_explainability_monitor_logs_failure(model_explainability_monitor, sagemaker_session):
1730+
sagemaker_session.list_monitoring_executions = MagicMock(
1731+
return_value=MONITORING_EXECUTIONS_EMPTY
1732+
)
1733+
try:
1734+
model_explainability_monitor.get_latest_execution_logs()
1735+
except ValueError as ve:
1736+
assert "No execution jobs were kicked off." in str(ve)
1737+
sagemaker_session.list_monitoring_executions = MagicMock(
1738+
return_value=MONITORING_EXECUTIONS_NO_PROCESSING_JOB
1739+
)
1740+
try:
1741+
model_explainability_monitor.get_latest_execution_logs()
1742+
except ValueError as ve:
1743+
assert "Processing Job did not run for the last execution" in str(ve)

0 commit comments

Comments
 (0)