-
Notifications
You must be signed in to change notification settings - Fork 1.2k
feature: support large pipeline #2825
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
e9f59a3
04396ff
5afcfab
eac9d22
d360659
f69a544
f007d03
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"). You | ||
# may not use this file except in compliance with the License. A copy of | ||
# the License is located at | ||
# | ||
# http://aws.amazon.com/apache2.0/ | ||
# | ||
# or in the "license" file accompanying this file. This file is | ||
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF | ||
# ANY KIND, either express or implied. See the License for the specific | ||
# language governing permissions and limitations under the License. | ||
"""Pipeline Parallelism Configuration""" | ||
from __future__ import absolute_import | ||
from sagemaker.workflow.entities import RequestType | ||
|
||
|
||
class ParallelismConfiguration: | ||
"""Parallelism config for SageMaker pipeline.""" | ||
|
||
def __init__(self, max_parallel_execution_steps: int): | ||
"""Create a ParallelismConfiguration | ||
|
||
Args: | ||
max_parallel_execution_steps, int: | ||
max number of steps which could be parallelized | ||
""" | ||
self.max_parallel_execution_steps = max_parallel_execution_steps | ||
|
||
def to_request(self) -> RequestType: | ||
"""Returns: the request structure.""" | ||
return { | ||
"MaxParallelExecutionSteps": self.max_parallel_execution_steps, | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2757,3 +2757,99 @@ def cleanup_feature_group(feature_group: FeatureGroup): | |
except Exception as e: | ||
print(f"Delete FeatureGroup failed with error: {e}.") | ||
pass | ||
|
||
|
||
def test_large_pipeline(sagemaker_session, role, pipeline_name, region_name): | ||
instance_count = ParameterInteger(name="InstanceCount", default_value=2) | ||
|
||
outputParam = CallbackOutput(output_name="output", output_type=CallbackOutputTypeEnum.String) | ||
|
||
callback_steps = [ | ||
CallbackStep( | ||
name=f"callback-step{count}", | ||
sqs_queue_url="https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue", | ||
inputs={"arg1": "foo"}, | ||
outputs=[outputParam], | ||
) | ||
for count in range(2000) | ||
] | ||
pipeline = Pipeline( | ||
name=pipeline_name, | ||
parameters=[instance_count], | ||
steps=callback_steps, | ||
sagemaker_session=sagemaker_session, | ||
) | ||
|
||
try: | ||
response = pipeline.create(role) | ||
create_arn = response["PipelineArn"] | ||
assert re.match( | ||
fr"arn:aws:sagemaker:{region_name}:\d{{12}}:pipeline/{pipeline_name}", | ||
create_arn, | ||
) | ||
response = pipeline.describe() | ||
assert len(json.loads(pipeline.describe()["PipelineDefinition"])["Steps"]) == 2000 | ||
|
||
pipeline.parameters = [ParameterInteger(name="InstanceCount", default_value=1)] | ||
response = pipeline.update(role) | ||
update_arn = response["PipelineArn"] | ||
assert re.match( | ||
fr"arn:aws:sagemaker:{region_name}:\d{{12}}:pipeline/{pipeline_name}", | ||
update_arn, | ||
) | ||
finally: | ||
try: | ||
pipeline.delete() | ||
except Exception: | ||
pass | ||
|
||
|
||
def test_create_and_update_with_parallelism_config( | ||
sagemaker_session, role, pipeline_name, region_name | ||
): | ||
instance_count = ParameterInteger(name="InstanceCount", default_value=2) | ||
|
||
outputParam = CallbackOutput(output_name="output", output_type=CallbackOutputTypeEnum.String) | ||
|
||
callback_steps = [ | ||
CallbackStep( | ||
name=f"callback-step{count}", | ||
sqs_queue_url="https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will it be an accessible url even if the user is from a different region (i.e., us-west-2)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is not an accessible url. It is actually just a well formatted dummy URL embedded in the pipeline definition to verify the pipeline creation. |
||
inputs={"arg1": "foo"}, | ||
outputs=[outputParam], | ||
) | ||
for count in range(500) | ||
] | ||
pipeline = Pipeline( | ||
name=pipeline_name, | ||
parameters=[instance_count], | ||
steps=callback_steps, | ||
sagemaker_session=sagemaker_session, | ||
) | ||
|
||
try: | ||
response = pipeline.create(role, parallelism_config={"MaxParallelExecutionSteps": 50}) | ||
create_arn = response["PipelineArn"] | ||
assert re.match( | ||
fr"arn:aws:sagemaker:{region_name}:\d{{12}}:pipeline/{pipeline_name}", | ||
create_arn, | ||
) | ||
response = pipeline.describe() | ||
assert response["ParallelismConfiguration"]["MaxParallelExecutionSteps"] == 50 | ||
|
||
pipeline.parameters = [ParameterInteger(name="InstanceCount", default_value=1)] | ||
response = pipeline.update(role, parallelism_config={"MaxParallelExecutionSteps": 55}) | ||
update_arn = response["PipelineArn"] | ||
assert re.match( | ||
fr"arn:aws:sagemaker:{region_name}:\d{{12}}:pipeline/{pipeline_name}", | ||
update_arn, | ||
) | ||
|
||
response = pipeline.describe() | ||
assert response["ParallelismConfiguration"]["MaxParallelExecutionSteps"] == 55 | ||
|
||
finally: | ||
try: | ||
pipeline.delete() | ||
except Exception: | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to add default values to these parameters?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually no. All of them must be explicitly set by the users.