Skip to content

fix: tag permission issue - remove describe before create #3662

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions src/sagemaker/experiments/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import time

from botocore.exceptions import ClientError

from sagemaker.apiutils import _base_types
from sagemaker.experiments.trial import _Trial
from sagemaker.experiments.trial_component import _TrialComponent
Expand Down Expand Up @@ -154,17 +156,21 @@ def _load_or_create(
Returns:
experiments.experiment._Experiment: A SageMaker `_Experiment` object
"""
sagemaker_client = sagemaker_session.sagemaker_client
try:
experiment = _Experiment.load(experiment_name, sagemaker_session)
except sagemaker_client.exceptions.ResourceNotFound:
experiment = _Experiment.create(
experiment_name=experiment_name,
display_name=display_name,
description=description,
tags=tags,
sagemaker_session=sagemaker_session,
)
except ClientError as ce:
error_code = ce.response["Error"]["Code"]
error_message = ce.response["Error"]["Message"]
if not (error_code == "ValidationException" and "already exists" in error_message):
raise ce
Comment on lines +168 to +171
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: these lines of code keeps repeating. May be good to extract them to a helper function

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack, a definite improvement

Copy link

@arkaprava08 arkaprava08 Feb 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to that. Also I don't like the idea of logic depending on the parsed error message. As error messages are prone to change and will break the logic

# already exists
experiment = _Experiment.load(experiment_name, sagemaker_session)
return experiment

def list_trials(self, created_before=None, created_after=None, sort_by=None, sort_order=None):
Expand Down
24 changes: 15 additions & 9 deletions src/sagemaker/experiments/trial.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
"""Contains the Trial class."""
from __future__ import absolute_import

from botocore.exceptions import ClientError

from sagemaker.apiutils import _base_types
from sagemaker.experiments import _api_types
from sagemaker.experiments.trial_component import _TrialComponent
Expand Down Expand Up @@ -268,8 +270,20 @@ def _load_or_create(
Returns:
experiments.trial._Trial: A SageMaker `_Trial` object
"""
sagemaker_client = sagemaker_session.sagemaker_client
try:
trial = _Trial.create(
experiment_name=experiment_name,
trial_name=trial_name,
display_name=display_name,
tags=tags,
sagemaker_session=sagemaker_session,
)
except ClientError as ce:
error_code = ce.response["Error"]["Code"]
error_message = ce.response["Error"]["Message"]
if not (error_code == "ValidationException" and "already exists" in error_message):
raise ce
# already exists
trial = _Trial.load(trial_name, sagemaker_session)
if trial.experiment_name != experiment_name: # pylint: disable=no-member
raise ValueError(
Expand All @@ -278,12 +292,4 @@ def _load_or_create(
trial.experiment_name # pylint: disable=no-member
)
)
except sagemaker_client.exceptions.ResourceNotFound:
trial = _Trial.create(
experiment_name=experiment_name,
trial_name=trial_name,
display_name=display_name,
tags=tags,
sagemaker_session=sagemaker_session,
)
return trial
14 changes: 10 additions & 4 deletions src/sagemaker/experiments/trial_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import time

from botocore.exceptions import ClientError

from sagemaker.apiutils import _base_types
from sagemaker.experiments import _api_types
from sagemaker.experiments._api_types import TrialComponentSearchResult
Expand Down Expand Up @@ -326,16 +328,20 @@ def _load_or_create(
experiments.trial_component._TrialComponent: A SageMaker `_TrialComponent` object.
bool: A boolean variable indicating whether the trail component already exists
"""
sagemaker_client = sagemaker_session.sagemaker_client
is_existed = False
try:
run_tc = _TrialComponent.load(trial_component_name, sagemaker_session)
is_existed = True
except sagemaker_client.exceptions.ResourceNotFound:
run_tc = _TrialComponent.create(
trial_component_name=trial_component_name,
display_name=display_name,
tags=tags,
sagemaker_session=sagemaker_session,
)
except ClientError as ce:
error_code = ce.response["Error"]["Code"]
error_message = ce.response["Error"]["Message"]
if not (error_code == "ValidationException" and "already exists" in error_message):
raise ce
# already exists
run_tc = _TrialComponent.load(trial_component_name, sagemaker_session)
is_existed = True
return run_tc, is_existed
160 changes: 105 additions & 55 deletions src/sagemaker/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -3223,14 +3223,11 @@ def create_model_package_from_containers(

def submit(request):
if model_package_group_name is not None:
try:
self.sagemaker_client.describe_model_package_group(
ModelPackageGroupName=request["ModelPackageGroupName"]
)
except ClientError:
self.sagemaker_client.create_model_package_group(
_create_resource(
lambda: self.sagemaker_client.create_model_package_group(
ModelPackageGroupName=request["ModelPackageGroupName"]
)
)
return self.sagemaker_client.create_model_package(**request)

return self._intercept_create_request(
Expand Down Expand Up @@ -3918,42 +3915,40 @@ def endpoint_from_model_data(
name = name or name_from_image(image_uri)
model_vpc_config = vpc_utils.sanitize(model_vpc_config)

if _deployment_entity_exists(
lambda: self.sagemaker_client.describe_endpoint(EndpointName=name)
):
raise ValueError(
'Endpoint with name "{}" already exists; please pick a different name.'.format(name)
)
primary_container = container_def(
image_uri=image_uri,
model_data_url=model_s3_location,
env=model_environment_vars,
)

if not _deployment_entity_exists(
lambda: self.sagemaker_client.describe_model(ModelName=name)
):
primary_container = container_def(
image_uri=image_uri,
model_data_url=model_s3_location,
env=model_environment_vars,
)
self.create_model(
name=name, role=role, container_defs=primary_container, vpc_config=model_vpc_config
)
self.create_model(
name=name, role=role, container_defs=primary_container, vpc_config=model_vpc_config
)

data_capture_config_dict = None
if data_capture_config is not None:
data_capture_config_dict = data_capture_config._to_request_dict()

if not _deployment_entity_exists(
lambda: self.sagemaker_client.describe_endpoint_config(EndpointConfigName=name)
):
self.create_endpoint_config(
_create_resource(
lambda: self.create_endpoint_config(
name=name,
model_name=name,
initial_instance_count=initial_instance_count,
instance_type=instance_type,
accelerator_type=accelerator_type,
data_capture_config_dict=data_capture_config_dict,
)
)

# to make change backwards compatible
response = _create_resource(
lambda: self.create_endpoint(endpoint_name=name, config_name=name, wait=wait)
)
if not response:
raise ValueError(
'Endpoint with name "{}" already exists; please pick a different name.'.format(name)
)

self.create_endpoint(endpoint_name=name, config_name=name, wait=wait)
return name

def endpoint_from_production_variants(
Expand Down Expand Up @@ -5452,34 +5447,54 @@ def _deployment_entity_exists(describe_fn):
return False


def _create_resource(create_fn):
"""Call create function and accepts/pass when resource already exists.

This is a helper function to use an existing resource if found when creating.

Args:
create_fn: Create resource function.

Returns:
(bool): True if new resource was created, False if resource already exists.
"""
try:
create_fn()
# create function succeeded, resource does not exist already
return True
except ClientError as ce:
error_code = ce.response["Error"]["Code"]
error_message = ce.response["Error"]["Message"]
already_exists_exceptions = ["ValidationException", "ResourceInUse"]
already_exists_msg_patterns = ["Cannot create already existing", "already exists"]
if not (
error_code in already_exists_exceptions
and any(p in error_message for p in already_exists_msg_patterns)
):
raise ce
# no new resource created as resource already exists
return False


def _train_done(sagemaker_client, job_name, last_desc):
"""Placeholder docstring"""
in_progress_statuses = ["InProgress", "Created"]

for _ in retries(
max_retry_count=10, # 10*30 = 5min
exception_message_prefix="Waiting for schedule to leave 'Pending' status",
seconds_to_sleep=30,
):
try:
desc = sagemaker_client.describe_training_job(TrainingJobName=job_name)
status = desc["TrainingJobStatus"]
desc = sagemaker_client.describe_training_job(TrainingJobName=job_name)
status = desc["TrainingJobStatus"]

if secondary_training_status_changed(desc, last_desc):
print()
print(secondary_training_status_message(desc, last_desc), end="")
else:
print(".", end="")
sys.stdout.flush()
if secondary_training_status_changed(desc, last_desc):
print()
print(secondary_training_status_message(desc, last_desc), end="")
else:
print(".", end="")
sys.stdout.flush()

if status in in_progress_statuses:
return desc, False
if status in in_progress_statuses:
return desc, False

print()
return desc, True
except botocore.exceptions.ClientError as err:
if err.response["Error"]["Code"] == "AccessDeniedException":
pass
print()
return desc, True


def _processing_job_status(sagemaker_client, job_name):
Expand Down Expand Up @@ -5799,19 +5814,54 @@ def _deploy_done(sagemaker_client, endpoint_name):

def _wait_until_training_done(callable_fn, desc, poll=5):
"""Placeholder docstring"""
job_desc, finished = callable_fn(desc)
elapsed_time = 0
finished = None
job_desc = desc
while not finished:
time.sleep(poll)
job_desc, finished = callable_fn(job_desc)
try:
elapsed_time += poll
time.sleep(poll)
job_desc, finished = callable_fn(job_desc)
except botocore.exceptions.ClientError as err:
# For initial 5 mins we accept/pass AccessDeniedException.
# The reason is to await tag propagation to avoid false AccessDenied claims for an
# access policy based on resource tags, The caveat here is for true AccessDenied
# cases the routine will fail after 5 mins
if err.response["Error"]["Code"] == "AccessDeniedException" and elapsed_time <= 300:
LOGGER.warning(
"Received AccessDeniedException. This could mean the IAM role does not "
"have the resource permissions, in which case please add resource access "
"and retry. For cases where the role has tag based resource policy, "
"continuing to wait for tag propagation.."
)
continue
raise err
return job_desc


def _wait_until(callable_fn, poll=5):
"""Placeholder docstring"""
result = callable_fn()
elapsed_time = 0
result = None
while result is None:
time.sleep(poll)
result = callable_fn()
try:
elapsed_time += poll
time.sleep(poll)
result = callable_fn()
except botocore.exceptions.ClientError as err:
# For initial 5 mins we accept/pass AccessDeniedException.
# The reason is to await tag propagation to avoid false AccessDenied claims for an
# access policy based on resource tags, The caveat here is for true AccessDenied
# cases the routine will fail after 5 mins
if err.response["Error"]["Code"] == "AccessDeniedException" and elapsed_time <= 300:
LOGGER.warning(
"Received AccessDeniedException. This could mean the IAM role does not "
"have the resource permissions, in which case please add resource access "
"and retry. For cases where the role has tag based resource policy, "
"continuing to wait for tag propagation.."
)
continue
raise err
return result


Expand Down
19 changes: 16 additions & 3 deletions src/sagemaker/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -604,12 +604,17 @@ def retries(
)


def retry_with_backoff(callable_func, num_attempts=8):
def retry_with_backoff(callable_func, num_attempts=8, botocore_client_error_code=None):
"""Retry with backoff until maximum attempts are reached

Args:
callable_func (callable): The callable function to retry.
num_attempts (int): The maximum number of attempts to retry.
num_attempts (int): The maximum number of attempts to retry.(Default: 8)
botocore_client_error_code (str): The specific Botocore ClientError exception error code
on which to retry on.
If provided other exceptions will be raised directly w/o retry.
If not provided, retry on any exception.
(Default: None)
"""
if num_attempts < 1:
raise ValueError(
Expand All @@ -619,7 +624,15 @@ def retry_with_backoff(callable_func, num_attempts=8):
try:
return callable_func()
except Exception as ex: # pylint: disable=broad-except
if i == num_attempts - 1:
if not botocore_client_error_code or (
botocore_client_error_code
and isinstance(ex, botocore.exceptions.ClientError)
and ex.response["Error"]["Code"] # pylint: disable=no-member
== botocore_client_error_code
):
if i == num_attempts - 1:
raise ex
else:
raise ex
logger.error("Retrying in attempt %s, due to %s", (i + 1), str(ex))
time.sleep(2**i)
Expand Down
Loading