Skip to content

Commit 25997eb

Browse files
gwang111NivekNeyandre-marcos-perezRaymond Liu
committed
feature: add verbose logging from cloudwatch for IR job (aws#790)
* feature: Add create inf rec api to session * save * fix error handling in submit. update docstring * add in job_name param * fix: type hint of PySparkProcessor __init__ (aws#3297) * fix: fix PySparkProcessor __init__ params type (aws#3354) * feature: add describe inf rec api to session * feature: add describe inf rec api to session * feature: add describe inf rec api to session * add verbose logging in wait_for_inference_recommendations_job * improve unit tests * switch to cloudwatch logs * add unit tests * fix unit test * make default logging level to Verbose * address comments in PR * address some comments * fix typo missing s * add initial print because it took a while for cw logstream to appear * print and conditional optimizations and update tests * optimize log print * fix polling for logs empty events * cover empty event case * add spacing Co-authored-by: Gary Wang <[email protected]> Co-authored-by: Kevin <[email protected]> Co-authored-by: André Perez <[email protected]> Co-authored-by: Raymond Liu <[email protected]>
1 parent 414a77c commit 25997eb

File tree

2 files changed

+307
-2
lines changed

2 files changed

+307
-2
lines changed

src/sagemaker/session.py

+149
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,12 @@
5353
_STATUS_CODE_TABLE = {
5454
"COMPLETED": "Completed",
5555
"INPROGRESS": "InProgress",
56+
"IN_PROGRESS": "InProgress",
5657
"FAILED": "Failed",
5758
"STOPPED": "Stopped",
5859
"STOPPING": "Stopping",
5960
"STARTING": "Starting",
61+
"PENDING": "Pending",
6062
}
6163

6264

@@ -4844,6 +4846,41 @@ def submit(request):
48444846
)
48454847
return job_name
48464848

4849+
def wait_for_inference_recommendations_job(
4850+
self, job_name: str, poll: int = 120, log_level: str = "Verbose"
4851+
) -> Dict[str, Any]:
4852+
"""Wait for an Amazon SageMaker Inference Recommender job to complete.
4853+
4854+
Args:
4855+
job_name (str): Name of the Inference Recommender job to wait for.
4856+
poll (int): Polling interval in seconds (default: 120).
4857+
log_level (str): The level of verbosity for the logs.
4858+
Can be "Quiet" or "Verbose" (default: "Quiet").
4859+
4860+
Returns:
4861+
(dict): Return value from the ``DescribeInferenceRecommendationsJob`` API.
4862+
4863+
Raises:
4864+
exceptions.CapacityError: If the Inference Recommender job fails with CapacityError.
4865+
exceptions.UnexpectedStatusException: If the Inference Recommender job fails.
4866+
"""
4867+
if log_level == "Quiet":
4868+
_wait_until(
4869+
lambda: _describe_inference_recommendations_job_status(
4870+
self.sagemaker_client, job_name
4871+
),
4872+
poll,
4873+
)
4874+
elif log_level == "Verbose":
4875+
_display_inference_recommendations_job_steps_status(
4876+
self, self.sagemaker_client, job_name
4877+
)
4878+
else:
4879+
raise ValueError("log_level must be either Quiet or Verbose")
4880+
desc = _describe_inference_recommendations_job_status(self.sagemaker_client, job_name)
4881+
self._check_job_status(job_name, desc, "Status")
4882+
return desc
4883+
48474884

48484885
def get_model_package_args(
48494886
content_types,
@@ -5465,6 +5502,118 @@ def _create_model_package_status(sagemaker_client, model_package_name):
54655502
return desc
54665503

54675504

5505+
def _describe_inference_recommendations_job_status(sagemaker_client, job_name: str):
5506+
"""Describes the status of a job and returns the job description.
5507+
5508+
Args:
5509+
sagemaker_client (boto3.client.sagemaker): A SageMaker client.
5510+
job_name (str): The name of the job.
5511+
5512+
Returns:
5513+
dict: The job description, or None if the job is still in progress.
5514+
"""
5515+
inference_recommendations_job_status_codes = {
5516+
"PENDING": ".",
5517+
"IN_PROGRESS": ".",
5518+
"COMPLETED": "!",
5519+
"FAILED": "*",
5520+
"STOPPING": "_",
5521+
"STOPPED": "s",
5522+
}
5523+
in_progress_statuses = {"PENDING", "IN_PROGRESS", "STOPPING"}
5524+
5525+
desc = sagemaker_client.describe_inference_recommendations_job(JobName=job_name)
5526+
status = desc["Status"]
5527+
5528+
print(inference_recommendations_job_status_codes.get(status, "?"), end="", flush=True)
5529+
5530+
if status in in_progress_statuses:
5531+
return None
5532+
5533+
print("")
5534+
return desc
5535+
5536+
5537+
def _display_inference_recommendations_job_steps_status(
5538+
sagemaker_session, sagemaker_client, job_name: str, poll: int = 60
5539+
):
5540+
"""Placeholder docstring"""
5541+
cloudwatch_client = sagemaker_session.boto_session.client("logs")
5542+
in_progress_statuses = {"PENDING", "IN_PROGRESS", "STOPPING"}
5543+
log_group_name = "/aws/sagemaker/InferenceRecommendationsJobs"
5544+
log_stream_name = job_name + "/execution"
5545+
5546+
initial_logs_batch = get_log_events_for_inference_recommender(
5547+
cloudwatch_client, log_group_name, log_stream_name
5548+
)
5549+
print(f"Retrieved logStream: {log_stream_name} from logGroup: {log_group_name}", flush=True)
5550+
events = initial_logs_batch["events"]
5551+
print(*[event["message"] for event in events], sep="\n", flush=True)
5552+
5553+
next_forward_token = initial_logs_batch["nextForwardToken"] if events else None
5554+
flush_remaining = True
5555+
while True:
5556+
logs_batch = (
5557+
cloudwatch_client.get_log_events(
5558+
logGroupName=log_group_name,
5559+
logStreamName=log_stream_name,
5560+
nextToken=next_forward_token,
5561+
)
5562+
if next_forward_token
5563+
else cloudwatch_client.get_log_events(
5564+
logGroupName=log_group_name, logStreamName=log_stream_name
5565+
)
5566+
)
5567+
5568+
events = logs_batch["events"]
5569+
5570+
desc = sagemaker_client.describe_inference_recommendations_job(JobName=job_name)
5571+
status = desc["Status"]
5572+
5573+
if not events:
5574+
if status in in_progress_statuses:
5575+
time.sleep(poll)
5576+
continue
5577+
if flush_remaining:
5578+
flush_remaining = False
5579+
time.sleep(poll)
5580+
continue
5581+
5582+
next_forward_token = logs_batch["nextForwardToken"]
5583+
print(*[event["message"] for event in events], sep="\n", flush=True)
5584+
5585+
if status not in in_progress_statuses:
5586+
break
5587+
5588+
time.sleep(poll)
5589+
5590+
5591+
def get_log_events_for_inference_recommender(cw_client, log_group_name, log_stream_name):
5592+
"""Retrieves log events from the specified CloudWatch log group and log stream.
5593+
5594+
Args:
5595+
cw_client (boto3.client): A boto3 CloudWatch client.
5596+
log_group_name (str): The name of the CloudWatch log group.
5597+
log_stream_name (str): The name of the CloudWatch log stream.
5598+
5599+
Returns:
5600+
(dict): A dictionary containing log events from CloudWatch log group and log stream.
5601+
"""
5602+
print("Fetching logs from CloudWatch...", flush=True)
5603+
for _ in retries(
5604+
max_retry_count=30, # 30*10 = 5min
5605+
exception_message_prefix="Waiting for cloudwatch stream to appear. ",
5606+
seconds_to_sleep=10,
5607+
):
5608+
try:
5609+
return cw_client.get_log_events(
5610+
logGroupName=log_group_name, logStreamName=log_stream_name
5611+
)
5612+
except ClientError as e:
5613+
if e.response["Error"]["Code"] == "ResourceNotFoundException":
5614+
pass
5615+
5616+
54685617
def _deploy_done(sagemaker_client, endpoint_name):
54695618
"""Placeholder docstring"""
54705619
hosting_status_codes = {

tests/unit/test_session.py

+158-2
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
from mock import ANY, MagicMock, Mock, patch, call, mock_open
2424

2525
import sagemaker
26-
from sagemaker import TrainingInput, Session, get_execution_role
26+
from sagemaker import TrainingInput, Session, get_execution_role, exceptions
2727
from sagemaker.async_inference import AsyncInferenceConfig
2828
from sagemaker.session import (
2929
_tuning_job_status,
@@ -2267,7 +2267,6 @@ def test_train_done_in_progress(sagemaker_session):
22672267
"GenerateCandidateDefinitionsOnly": False,
22682268
}
22692269

2270-
22712270
COMPLETE_EXPECTED_AUTO_ML_JOB_ARGS = {
22722271
"AutoMLJobName": JOB_NAME,
22732272
"InputDataConfig": [
@@ -3112,3 +3111,160 @@ def test_create_inference_recommendations_job_propogate_other_exception(sagemake
31123111
)
31133112

31143113
assert "AccessDeniedException" in str(error)
3114+
3115+
3116+
DEFAULT_LOG_EVENTS_INFERENCE_RECOMMENDER = [
3117+
MockBotoException("ResourceNotFoundException"),
3118+
{"nextForwardToken": None, "events": [{"timestamp": 1, "message": "hi there #1"}]},
3119+
{"nextForwardToken": None, "events": [{"timestamp": 2, "message": "hi there #2"}]},
3120+
{"nextForwardToken": None, "events": [{"timestamp": 3, "message": "hi there #3"}]},
3121+
{"nextForwardToken": None, "events": [{"timestamp": 4, "message": "hi there #4"}]},
3122+
]
3123+
3124+
FLUSH_LOG_EVENTS_INFERENCE_RECOMMENDER = [
3125+
MockBotoException("ResourceNotFoundException"),
3126+
{"nextForwardToken": None, "events": [{"timestamp": 1, "message": "hi there #1"}]},
3127+
{"nextForwardToken": None, "events": [{"timestamp": 2, "message": "hi there #2"}]},
3128+
{"nextForwardToken": None, "events": []},
3129+
{"nextForwardToken": None, "events": [{"timestamp": 3, "message": "hi there #3"}]},
3130+
{"nextForwardToken": None, "events": []},
3131+
{"nextForwardToken": None, "events": [{"timestamp": 4, "message": "hi there #4"}]},
3132+
]
3133+
3134+
INFERENCE_RECOMMENDATIONS_DESC_STATUS_PENDING = {"Status": "PENDING"}
3135+
INFERENCE_RECOMMENDATIONS_DESC_STATUS_IN_PROGRESS = {"Status": "IN_PROGRESS"}
3136+
INFERENCE_RECOMMENDATIONS_DESC_STATUS_COMPLETED = {"Status": "COMPLETED"}
3137+
3138+
3139+
@pytest.fixture()
3140+
def sm_session_inference_recommender():
3141+
boto_mock = MagicMock(name="boto_session")
3142+
boto_mock.client("logs").get_log_events.side_effect = DEFAULT_LOG_EVENTS_INFERENCE_RECOMMENDER
3143+
3144+
ims = sagemaker.Session(boto_session=boto_mock, sagemaker_client=MagicMock())
3145+
3146+
ims.sagemaker_client.describe_inference_recommendations_job.side_effect = [
3147+
INFERENCE_RECOMMENDATIONS_DESC_STATUS_PENDING,
3148+
INFERENCE_RECOMMENDATIONS_DESC_STATUS_IN_PROGRESS,
3149+
INFERENCE_RECOMMENDATIONS_DESC_STATUS_COMPLETED,
3150+
INFERENCE_RECOMMENDATIONS_DESC_STATUS_COMPLETED,
3151+
]
3152+
3153+
return ims
3154+
3155+
3156+
@pytest.fixture()
3157+
def sm_session_inference_recommender_flush():
3158+
boto_mock = MagicMock(name="boto_session")
3159+
boto_mock.client("logs").get_log_events.side_effect = FLUSH_LOG_EVENTS_INFERENCE_RECOMMENDER
3160+
3161+
ims = sagemaker.Session(boto_session=boto_mock, sagemaker_client=MagicMock())
3162+
3163+
ims.sagemaker_client.describe_inference_recommendations_job.side_effect = [
3164+
INFERENCE_RECOMMENDATIONS_DESC_STATUS_PENDING,
3165+
INFERENCE_RECOMMENDATIONS_DESC_STATUS_IN_PROGRESS,
3166+
INFERENCE_RECOMMENDATIONS_DESC_STATUS_IN_PROGRESS,
3167+
INFERENCE_RECOMMENDATIONS_DESC_STATUS_COMPLETED,
3168+
INFERENCE_RECOMMENDATIONS_DESC_STATUS_COMPLETED,
3169+
INFERENCE_RECOMMENDATIONS_DESC_STATUS_COMPLETED,
3170+
]
3171+
3172+
return ims
3173+
3174+
3175+
@patch("time.sleep")
3176+
def test_wait_for_inference_recommendations_job_completed(sleep, sm_session_inference_recommender):
3177+
assert (
3178+
sm_session_inference_recommender.wait_for_inference_recommendations_job(
3179+
JOB_NAME, log_level="Quiet"
3180+
)["Status"]
3181+
== "COMPLETED"
3182+
)
3183+
3184+
assert (
3185+
4
3186+
== sm_session_inference_recommender.sagemaker_client.describe_inference_recommendations_job.call_count
3187+
)
3188+
assert 2 == sleep.call_count
3189+
sleep.assert_has_calls([call(120), call(120)])
3190+
3191+
3192+
def test_wait_for_inference_recommendations_job_failed(sagemaker_session):
3193+
inference_recommendations_desc_status_failed = {
3194+
"Status": "FAILED",
3195+
"FailureReason": "Mock Failure Reason",
3196+
}
3197+
3198+
sagemaker_session.sagemaker_client.describe_inference_recommendations_job = Mock(
3199+
name="describe_inference_recommendations_job",
3200+
return_value=inference_recommendations_desc_status_failed,
3201+
)
3202+
3203+
with pytest.raises(exceptions.UnexpectedStatusException) as error:
3204+
sagemaker_session.wait_for_inference_recommendations_job(JOB_NAME)
3205+
3206+
assert "Mock Failure Reason" in str(error)
3207+
3208+
3209+
@patch("builtins.print")
3210+
@patch("time.sleep")
3211+
def test_wait_for_inference_recommendations_job_completed_verbose(
3212+
sleep, mock_print, sm_session_inference_recommender
3213+
):
3214+
assert (
3215+
sm_session_inference_recommender.wait_for_inference_recommendations_job(
3216+
JOB_NAME, log_level="Verbose"
3217+
)["Status"]
3218+
== "COMPLETED"
3219+
)
3220+
assert (
3221+
4
3222+
== sm_session_inference_recommender.sagemaker_client.describe_inference_recommendations_job.call_count
3223+
)
3224+
3225+
assert (
3226+
5 == sm_session_inference_recommender.boto_session.client("logs").get_log_events.call_count
3227+
)
3228+
3229+
assert 3 == sleep.call_count
3230+
sleep.assert_has_calls([call(10), call(60), call(60)])
3231+
3232+
assert 8 == mock_print.call_count
3233+
3234+
3235+
@patch("builtins.print")
3236+
@patch("time.sleep")
3237+
def test_wait_for_inference_recommendations_job_flush_completed(
3238+
sleep, mock_print, sm_session_inference_recommender_flush
3239+
):
3240+
assert (
3241+
sm_session_inference_recommender_flush.wait_for_inference_recommendations_job(
3242+
JOB_NAME, log_level="Verbose"
3243+
)["Status"]
3244+
== "COMPLETED"
3245+
)
3246+
assert (
3247+
6
3248+
== sm_session_inference_recommender_flush.sagemaker_client.describe_inference_recommendations_job.call_count
3249+
)
3250+
3251+
assert (
3252+
7
3253+
== sm_session_inference_recommender_flush.boto_session.client(
3254+
"logs"
3255+
).get_log_events.call_count
3256+
)
3257+
3258+
assert 5 == sleep.call_count
3259+
sleep.assert_has_calls([call(10), call(60), call(60), call(60), call(60)])
3260+
3261+
assert 8 == mock_print.call_count
3262+
3263+
3264+
def test_wait_for_inference_recommendations_job_invalid_log_level(sagemaker_session):
3265+
with pytest.raises(ValueError) as error:
3266+
sagemaker_session.wait_for_inference_recommendations_job(
3267+
JOB_NAME, log_level="invalid_log_level"
3268+
)
3269+
3270+
assert "log_level must be either Quiet or Verbose" in str(error)

0 commit comments

Comments
 (0)