Skip to content

Commit 6f2cd5a

Browse files
nmadanNamrata Madan
and
Namrata Madan
authored
fix:ResourceConflictException from AWS Lambda on pipeline upsert (#3106)
Co-authored-by: Namrata Madan <[email protected]>
1 parent 79ecab2 commit 6f2cd5a

File tree

3 files changed

+62
-86
lines changed

3 files changed

+62
-86
lines changed

src/sagemaker/lambda_helper.py

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
from io import BytesIO
1717
import zipfile
18+
import time
1819
from botocore.exceptions import ClientError
1920
from sagemaker.session import Session
2021

@@ -134,32 +135,35 @@ def update(self):
134135
Returns: boto3 response from Lambda's update_function method.
135136
"""
136137
lambda_client = _get_lambda_client(self.session)
137-
138-
if self.script is not None:
139-
try:
140-
response = lambda_client.update_function_code(
141-
FunctionName=self.function_name, ZipFile=_zip_lambda_code(self.script)
142-
)
143-
return response
144-
except ClientError as e:
145-
error = e.response["Error"]
146-
raise ValueError(error)
147-
else:
138+
retry_attempts = 7
139+
for i in range(retry_attempts):
148140
try:
149-
response = lambda_client.update_function_code(
150-
FunctionName=(self.function_name or self.function_arn),
151-
S3Bucket=self.s3_bucket,
152-
S3Key=_upload_to_s3(
153-
s3_client=_get_s3_client(self.session),
154-
function_name=self.function_name,
155-
zipped_code_dir=self.zipped_code_dir,
156-
s3_bucket=self.s3_bucket,
157-
),
158-
)
141+
if self.script is not None:
142+
response = lambda_client.update_function_code(
143+
FunctionName=self.function_name, ZipFile=_zip_lambda_code(self.script)
144+
)
145+
else:
146+
response = lambda_client.update_function_code(
147+
FunctionName=(self.function_name or self.function_arn),
148+
S3Bucket=self.s3_bucket,
149+
S3Key=_upload_to_s3(
150+
s3_client=_get_s3_client(self.session),
151+
function_name=self.function_name,
152+
zipped_code_dir=self.zipped_code_dir,
153+
s3_bucket=self.s3_bucket,
154+
),
155+
)
159156
return response
160157
except ClientError as e:
161158
error = e.response["Error"]
162-
raise ValueError(error)
159+
code = error["Code"]
160+
if code == "ResourceConflictException":
161+
if i == retry_attempts - 1:
162+
raise ValueError(error)
163+
# max wait time = 2**0 + 2**1 + .. + 2**6 = 127 seconds
164+
time.sleep(2**i)
165+
else:
166+
raise ValueError(error)
163167

164168
def upsert(self):
165169
"""Method to create a lambda function or update it if it already exists

src/sagemaker/workflow/pipeline.py

Lines changed: 24 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -215,30 +215,33 @@ def upsert(
215215
Returns:
216216
response dict from service
217217
"""
218+
exists = True
218219
try:
219-
response = self.create(role_arn, description, tags, parallelism_config)
220+
self.describe()
220221
except ClientError as e:
221-
error = e.response["Error"]
222-
if (
223-
error["Code"] == "ValidationException"
224-
and "Pipeline names must be unique within" in error["Message"]
225-
):
226-
response = self.update(role_arn, description)
227-
if tags is not None:
228-
old_tags = self.sagemaker_session.sagemaker_client.list_tags(
229-
ResourceArn=response["PipelineArn"]
230-
)["Tags"]
231-
232-
tag_keys = [tag["Key"] for tag in tags]
233-
for old_tag in old_tags:
234-
if old_tag["Key"] not in tag_keys:
235-
tags.append(old_tag)
236-
237-
self.sagemaker_session.sagemaker_client.add_tags(
238-
ResourceArn=response["PipelineArn"], Tags=tags
239-
)
222+
err = e.response.get("Error", {})
223+
if err.get("Code", None) == "ResourceNotFound":
224+
exists = False
240225
else:
241-
raise
226+
raise e
227+
228+
if not exists:
229+
response = self.create(role_arn, description, tags, parallelism_config)
230+
else:
231+
response = self.update(role_arn, description)
232+
if tags is not None:
233+
old_tags = self.sagemaker_session.sagemaker_client.list_tags(
234+
ResourceArn=response["PipelineArn"]
235+
)["Tags"]
236+
237+
tag_keys = [tag["Key"] for tag in tags]
238+
for old_tag in old_tags:
239+
if old_tag["Key"] not in tag_keys:
240+
tags.append(old_tag)
241+
242+
self.sagemaker_session.sagemaker_client.add_tags(
243+
ResourceArn=response["PipelineArn"], Tags=tags
244+
)
242245
return response
243246

244247
def delete(self) -> Dict[str, Any]:
@@ -270,18 +273,6 @@ def start(
270273
Returns:
271274
A `_PipelineExecution` instance, if successful.
272275
"""
273-
exists = True
274-
try:
275-
self.describe()
276-
except ClientError:
277-
exists = False
278-
279-
if not exists:
280-
raise ValueError(
281-
"This pipeline is not associated with a Pipeline in SageMaker. "
282-
"Please invoke create() first before attempting to invoke start()."
283-
)
284-
285276
kwargs = dict(PipelineName=self.name)
286277
update_args(
287278
kwargs,

tests/unit/sagemaker/workflow/test_pipeline.py

Lines changed: 12 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
import pytest
1919

20-
from botocore.exceptions import ClientError
21-
2220
from mock import Mock
2321

2422
from sagemaker import s3
@@ -178,20 +176,15 @@ def test_large_pipeline_update(sagemaker_session_mock, role_arn):
178176

179177

180178
def test_pipeline_upsert(sagemaker_session_mock, role_arn):
181-
sagemaker_session_mock.side_effect = [
182-
ClientError(
183-
operation_name="CreatePipeline",
184-
error_response={
185-
"Error": {
186-
"Code": "ValidationException",
187-
"Message": "Pipeline names must be unique within ...",
188-
}
189-
},
190-
),
191-
{"PipelineArn": "mock_pipeline_arn"},
192-
[{"Key": "dummy", "Value": "dummy_tag"}],
193-
{},
194-
]
179+
sagemaker_session_mock.sagemaker_client.describe_pipeline.return_value = {
180+
"PipelineArn": "pipeline-arn"
181+
}
182+
sagemaker_session_mock.sagemaker_client.update_pipeline.return_value = {
183+
"PipelineArn": "pipeline-arn"
184+
}
185+
sagemaker_session_mock.sagemaker_client.list_tags.return_value = {
186+
"Tags": [{"Key": "dummy", "Value": "dummy_tag"}]
187+
}
195188

196189
pipeline = Pipeline(
197190
name="MyPipeline",
@@ -205,9 +198,9 @@ def test_pipeline_upsert(sagemaker_session_mock, role_arn):
205198
{"Key": "bar", "Value": "xyz"},
206199
]
207200
pipeline.upsert(role_arn=role_arn, tags=tags)
208-
assert sagemaker_session_mock.sagemaker_client.create_pipeline.called_with(
209-
PipelineName="MyPipeline", PipelineDefinition=pipeline.definition(), RoleArn=role_arn
210-
)
201+
202+
sagemaker_session_mock.sagemaker_client.create_pipeline.assert_not_called()
203+
211204
assert sagemaker_session_mock.sagemaker_client.update_pipeline.called_with(
212205
PipelineName="MyPipeline", PipelineDefinition=pipeline.definition(), RoleArn=role_arn
213206
)
@@ -273,18 +266,6 @@ def test_pipeline_start(sagemaker_session_mock):
273266
)
274267

275268

276-
def test_pipeline_start_before_creation(sagemaker_session_mock):
277-
sagemaker_session_mock.sagemaker_client.describe_pipeline.side_effect = ClientError({}, "bar")
278-
pipeline = Pipeline(
279-
name="MyPipeline",
280-
parameters=[ParameterString("alpha", "beta"), ParameterString("gamma", "delta")],
281-
steps=[],
282-
sagemaker_session=sagemaker_session_mock,
283-
)
284-
with pytest.raises(ValueError):
285-
pipeline.start()
286-
287-
288269
def test_pipeline_basic():
289270
parameter = ParameterString("MyStr")
290271
pipeline = Pipeline(

0 commit comments

Comments
 (0)