Skip to content

Commit 8199447

Browse files
author
Namrata Madan
committed
fix:ResourceConflictException from AWS lambda on pipeline upsert
1 parent 92b1d47 commit 8199447

File tree

3 files changed

+59
-61
lines changed

3 files changed

+59
-61
lines changed

src/sagemaker/lambda_helper.py

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
from io import BytesIO
1717
import zipfile
18+
import time
1819
from botocore.exceptions import ClientError
1920
from sagemaker.session import Session
2021

@@ -134,32 +135,35 @@ def update(self):
134135
Returns: boto3 response from Lambda's update_function method.
135136
"""
136137
lambda_client = _get_lambda_client(self.session)
137-
138-
if self.script is not None:
139-
try:
140-
response = lambda_client.update_function_code(
141-
FunctionName=self.function_name, ZipFile=_zip_lambda_code(self.script)
142-
)
143-
return response
144-
except ClientError as e:
145-
error = e.response["Error"]
146-
raise ValueError(error)
147-
else:
138+
retry_attempts = 7
139+
for i in range(retry_attempts):
148140
try:
149-
response = lambda_client.update_function_code(
150-
FunctionName=(self.function_name or self.function_arn),
151-
S3Bucket=self.s3_bucket,
152-
S3Key=_upload_to_s3(
153-
s3_client=_get_s3_client(self.session),
154-
function_name=self.function_name,
155-
zipped_code_dir=self.zipped_code_dir,
156-
s3_bucket=self.s3_bucket,
157-
),
158-
)
141+
if self.script is not None:
142+
response = lambda_client.update_function_code(
143+
FunctionName=self.function_name, ZipFile=_zip_lambda_code(self.script)
144+
)
145+
else:
146+
response = lambda_client.update_function_code(
147+
FunctionName=(self.function_name or self.function_arn),
148+
S3Bucket=self.s3_bucket,
149+
S3Key=_upload_to_s3(
150+
s3_client=_get_s3_client(self.session),
151+
function_name=self.function_name,
152+
zipped_code_dir=self.zipped_code_dir,
153+
s3_bucket=self.s3_bucket,
154+
),
155+
)
159156
return response
160157
except ClientError as e:
161158
error = e.response["Error"]
162-
raise ValueError(error)
159+
code = error["Code"]
160+
if code == "ResourceConflictException":
161+
if i == retry_attempts - 1:
162+
raise ValueError(error)
163+
# max wait time = 2**0 + 2**1 + .. + 2**6 = 127 seconds
164+
time.sleep(2**i)
165+
else:
166+
raise ValueError(error)
163167

164168
def upsert(self):
165169
"""Method to create a lambda function or update it if it already exists

src/sagemaker/workflow/pipeline.py

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -215,30 +215,29 @@ def upsert(
215215
Returns:
216216
response dict from service
217217
"""
218+
exists = True
218219
try:
220+
self.describe()
221+
except ClientError:
222+
exists = False
223+
224+
if not exists:
219225
response = self.create(role_arn, description, tags, parallelism_config)
220-
except ClientError as e:
221-
error = e.response["Error"]
222-
if (
223-
error["Code"] == "ValidationException"
224-
and "Pipeline names must be unique within" in error["Message"]
225-
):
226-
response = self.update(role_arn, description)
227-
if tags is not None:
228-
old_tags = self.sagemaker_session.sagemaker_client.list_tags(
229-
ResourceArn=response["PipelineArn"]
230-
)["Tags"]
231-
232-
tag_keys = [tag["Key"] for tag in tags]
233-
for old_tag in old_tags:
234-
if old_tag["Key"] not in tag_keys:
235-
tags.append(old_tag)
236-
237-
self.sagemaker_session.sagemaker_client.add_tags(
238-
ResourceArn=response["PipelineArn"], Tags=tags
239-
)
240-
else:
241-
raise
226+
else:
227+
response = self.update(role_arn, description)
228+
if tags is not None:
229+
old_tags = self.sagemaker_session.sagemaker_client.list_tags(
230+
ResourceArn=response["PipelineArn"]
231+
)["Tags"]
232+
233+
tag_keys = [tag["Key"] for tag in tags]
234+
for old_tag in old_tags:
235+
if old_tag["Key"] not in tag_keys:
236+
tags.append(old_tag)
237+
238+
self.sagemaker_session.sagemaker_client.add_tags(
239+
ResourceArn=response["PipelineArn"], Tags=tags
240+
)
242241
return response
243242

244243
def delete(self) -> Dict[str, Any]:

tests/unit/sagemaker/workflow/test_pipeline.py

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -178,20 +178,15 @@ def test_large_pipeline_update(sagemaker_session_mock, role_arn):
178178

179179

180180
def test_pipeline_upsert(sagemaker_session_mock, role_arn):
181-
sagemaker_session_mock.side_effect = [
182-
ClientError(
183-
operation_name="CreatePipeline",
184-
error_response={
185-
"Error": {
186-
"Code": "ValidationException",
187-
"Message": "Pipeline names must be unique within ...",
188-
}
189-
},
190-
),
191-
{"PipelineArn": "mock_pipeline_arn"},
192-
[{"Key": "dummy", "Value": "dummy_tag"}],
193-
{},
194-
]
181+
sagemaker_session_mock.sagemaker_client.describe_pipeline.return_value = {
182+
"PipelineArn": "pipeline-arn"
183+
}
184+
sagemaker_session_mock.sagemaker_client.update_pipeline.return_value = {
185+
"PipelineArn": "pipeline-arn"
186+
}
187+
sagemaker_session_mock.sagemaker_client.list_tags.return_value = {
188+
"Tags": [{"Key": "dummy", "Value": "dummy_tag"}]
189+
}
195190

196191
pipeline = Pipeline(
197192
name="MyPipeline",
@@ -205,9 +200,9 @@ def test_pipeline_upsert(sagemaker_session_mock, role_arn):
205200
{"Key": "bar", "Value": "xyz"},
206201
]
207202
pipeline.upsert(role_arn=role_arn, tags=tags)
208-
assert sagemaker_session_mock.sagemaker_client.create_pipeline.called_with(
209-
PipelineName="MyPipeline", PipelineDefinition=pipeline.definition(), RoleArn=role_arn
210-
)
203+
204+
sagemaker_session_mock.sagemaker_client.create_pipeline.assert_not_called()
205+
211206
assert sagemaker_session_mock.sagemaker_client.update_pipeline.called_with(
212207
PipelineName="MyPipeline", PipelineDefinition=pipeline.definition(), RoleArn=role_arn
213208
)

0 commit comments

Comments
 (0)