diff --git a/ci-scripts/queue_build.py b/ci-scripts/queue_build.py index de781be7b1..fcff0b9a9b 100644 --- a/ci-scripts/queue_build.py +++ b/ci-scripts/queue_build.py @@ -23,34 +23,26 @@ ).get_caller_identity()["Account"] bucket_name = "sagemaker-us-west-2-%s" % account +MAX_IN_PROGRESS_BUILDS = 3 +INTERVAL_BETWEEN_CONCURRENT_RUNS = 15 # minutes +CLEAN_UP_TICKETS_OLDER_THAN = 8 # hours + def queue_build(): - build_id = re.sub("[_/]", "-", os.environ.get("CODEBUILD_BUILD_ID", "CODEBUILD-BUILD-ID")) - source_version = re.sub( - "[_/]", - "-", - os.environ.get("CODEBUILD_SOURCE_VERSION", "CODEBUILD-SOURCE-VERSION"), - ) ticket_number = int(1000 * time.time()) - filename = "%s_%s_%s" % (ticket_number, build_id, source_version) - - print("Created queue ticket %s" % ticket_number) - - _write_ticket(filename) files = _list_tickets() - _cleanup_tickets_older_than_8_hours(files) - _wait_for_other_builds(files, ticket_number) + _cleanup_tickets_older_than(files) + _wait_for_other_builds(ticket_number) def _build_info_from_file(file): - filename = file.key.split("/")[1] + filename = file.key.split("/")[2] ticket_number, build_id, source_version = filename.split("_") return int(ticket_number), build_id, source_version -def _wait_for_other_builds(files, ticket_number): - newfiles = list(filter(lambda file: not _file_older_than(file), files)) - sorted_files = list(sorted(newfiles, key=lambda y: y.key)) +def _wait_for_other_builds(ticket_number): + sorted_files = _list_tickets() print("build queue status:") print() @@ -58,33 +50,76 @@ def _wait_for_other_builds(files, ticket_number): for order, file in enumerate(sorted_files): file_ticket_number, build_id, source_version = _build_info_from_file(file) print( - "%s -> %s %s, ticket number: %s" % (order, build_id, source_version, file_ticket_number) + "%s -> %s %s, ticket number: %s status: %s" + % (order, build_id, source_version, file_ticket_number, file.key.split("/")[1]) ) + print() + build_id = re.sub("[_/]", "-", os.environ.get("CODEBUILD_BUILD_ID", "CODEBUILD-BUILD-ID")) + source_version = re.sub( + "[_/]", + "-", + os.environ.get("CODEBUILD_SOURCE_VERSION", "CODEBUILD-SOURCE-VERSION"), + ) + filename = "%s_%s_%s" % (ticket_number, build_id, source_version) + s3_file_obj = _write_ticket(filename, status="waiting") + print("Build %s waiting to be scheduled" % filename) + + while True: + _cleanup_tickets_with_terminal_states() + waiting_tickets = _list_tickets("waiting") + if waiting_tickets: + first_waiting_ticket_number, _, _ = _build_info_from_file(_list_tickets("waiting")[0]) + else: + first_waiting_ticket_number = ticket_number + + if ( + len(_list_tickets(status="in-progress")) < 3 + and last_in_progress_elapsed_time_check() + and first_waiting_ticket_number == ticket_number + ): + # put the build in progress + print("Scheduling build %s for running.." % filename) + s3_file_obj.delete() + _write_ticket(filename, status="in-progress") + break + else: + # wait + time.sleep(30) - for file in sorted_files: - file_ticket_number, build_id, source_version = _build_info_from_file(file) - if file_ticket_number == ticket_number: +def last_in_progress_elapsed_time_check(): + in_progress_tickets = _list_tickets("in-progress") + if not in_progress_tickets: + return True + last_in_progress_ticket, _, _ = _build_info_from_file(_list_tickets("in-progress")[-1]) + _elapsed_time = int(1000 * time.time()) - last_in_progress_ticket + last_in_progress_elapsed_time = int(_elapsed_time / (1000 * 60)) # in minutes + return last_in_progress_elapsed_time > INTERVAL_BETWEEN_CONCURRENT_RUNS - break - else: - while True: - client = boto3.client("codebuild") - response = client.batch_get_builds(ids=[build_id]) - build_status = response["builds"][0]["buildStatus"] - - if build_status == "IN_PROGRESS": - print( - "waiting on build %s %s %s" % (build_id, source_version, file_ticket_number) - ) - time.sleep(30) - else: - print("build %s finished, deleting lock" % build_id) - file.delete() - break - - -def _cleanup_tickets_older_than_8_hours(files): + +def _cleanup_tickets_with_terminal_states(): + files = _list_tickets() + build_ids = [] + for file in files: + _, build_id, _ = _build_info_from_file(file) + build_ids.append(build_id) + + client = boto3.client("codebuild") + response = client.batch_get_builds(ids=build_ids) + + for file, build_details in zip(files, response["builds"]): + _, _build_id_from_file, _ = _build_info_from_file(file) + build_status = build_details["buildStatus"] + + if build_status != "IN_PROGRESS" and _build_id_from_file == build_details["id"]: + print( + "Build %s in terminal state: %s, deleting lock" + % (_build_id_from_file, build_status) + ) + file.delete() + + +def _cleanup_tickets_older_than(files): oldfiles = list(filter(_file_older_than, files)) for file in oldfiles: print("object %s older than 8 hours. Deleting" % file.key) @@ -92,31 +127,34 @@ def _cleanup_tickets_older_than_8_hours(files): return files -def _list_tickets(): +def _list_tickets(status=None): s3 = boto3.resource("s3") bucket = s3.Bucket(bucket_name) - objects = [file for file in bucket.objects.filter(Prefix="ci-lock/")] - files = list(filter(lambda x: x != "ci-lock/", objects)) - return files + prefix = "ci-integ-queue/{}/".format(status) if status else "ci-integ-queue/" + objects = [file for file in bucket.objects.filter(Prefix=prefix)] + files = list(filter(lambda x: x != prefix, objects)) + sorted_files = list(sorted(files, key=lambda y: y.key)) + return sorted_files def _file_older_than(file): - timelimit = 1000 * 60 * 60 * 8 - + timelimit = 1000 * 60 * 60 * CLEAN_UP_TICKETS_OLDER_THAN file_ticket_number, build_id, source_version = _build_info_from_file(file) + return int(1000 * time.time()) - file_ticket_number > timelimit - return int(time.time()) - file_ticket_number > timelimit - - -def _write_ticket(ticket_number): - if not os.path.exists("ci-lock"): - os.mkdir("ci-lock") +def _write_ticket(filename, status="waiting"): + file_path = "ci-integ-queue/{}".format(status) + if not os.path.exists(file_path): + os.makedirs(file_path) - filename = "ci-lock/" + ticket_number - with open(filename, "w") as file: - file.write(ticket_number) - boto3.Session().resource("s3").Object(bucket_name, filename).upload_file(filename) + file_full_path = file_path + "/" + filename + with open(file_full_path, "w") as file: + file.write(filename) + s3_file_obj = boto3.Session().resource("s3").Object(bucket_name, file_full_path) + s3_file_obj.upload_file(file_full_path) + print("Build %s is now in state %s" % (filename, status)) + return s3_file_obj if __name__ == "__main__": diff --git a/doc/workflows/pipelines/sagemaker.workflow.pipelines.rst b/doc/workflows/pipelines/sagemaker.workflow.pipelines.rst index 9071d05145..908621ea1c 100644 --- a/doc/workflows/pipelines/sagemaker.workflow.pipelines.rst +++ b/doc/workflows/pipelines/sagemaker.workflow.pipelines.rst @@ -82,6 +82,12 @@ Pipeline .. autoclass:: sagemaker.workflow.pipeline._PipelineExecution :members: +Parallelism Configuration +------------------------- + +.. autoclass:: sagemaker.workflow.parallelism_config.ParallelismConfiguration + :members: + Pipeline Experiment Config -------------------------- diff --git a/src/sagemaker/session.py b/src/sagemaker/session.py index 26eba556f5..828371c6dc 100644 --- a/src/sagemaker/session.py +++ b/src/sagemaker/session.py @@ -3556,19 +3556,17 @@ def endpoint_from_production_variants( Returns: str: The name of the created ``Endpoint``. """ - if not _deployment_entity_exists( - lambda: self.sagemaker_client.describe_endpoint_config(EndpointConfigName=name) - ): - config_options = {"EndpointConfigName": name, "ProductionVariants": production_variants} - tags = _append_project_tags(tags) - if tags: - config_options["Tags"] = tags - if kms_key: - config_options["KmsKeyId"] = kms_key - if data_capture_config_dict is not None: - config_options["DataCaptureConfig"] = data_capture_config_dict - - self.sagemaker_client.create_endpoint_config(**config_options) + config_options = {"EndpointConfigName": name, "ProductionVariants": production_variants} + tags = _append_project_tags(tags) + if tags: + config_options["Tags"] = tags + if kms_key: + config_options["KmsKeyId"] = kms_key + if data_capture_config_dict is not None: + config_options["DataCaptureConfig"] = data_capture_config_dict + + self.sagemaker_client.create_endpoint_config(**config_options) + return self.create_endpoint(endpoint_name=name, config_name=name, tags=tags, wait=wait) def expand_role(self, role): diff --git a/src/sagemaker/workflow/parallelism_config.py b/src/sagemaker/workflow/parallelism_config.py new file mode 100644 index 0000000000..72c180517a --- /dev/null +++ b/src/sagemaker/workflow/parallelism_config.py @@ -0,0 +1,34 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +"""Pipeline Parallelism Configuration""" +from __future__ import absolute_import +from sagemaker.workflow.entities import RequestType + + +class ParallelismConfiguration: + """Parallelism config for SageMaker pipeline.""" + + def __init__(self, max_parallel_execution_steps: int): + """Create a ParallelismConfiguration + + Args: + max_parallel_execution_steps, int: + max number of steps which could be parallelized + """ + self.max_parallel_execution_steps = max_parallel_execution_steps + + def to_request(self) -> RequestType: + """Returns: the request structure.""" + return { + "MaxParallelExecutionSteps": self.max_parallel_execution_steps, + } diff --git a/src/sagemaker/workflow/pipeline.py b/src/sagemaker/workflow/pipeline.py index 4982c6f5fd..9b793db003 100644 --- a/src/sagemaker/workflow/pipeline.py +++ b/src/sagemaker/workflow/pipeline.py @@ -22,6 +22,7 @@ import botocore from botocore.exceptions import ClientError +from sagemaker import s3 from sagemaker._studio import _append_project_tags from sagemaker.session import Session from sagemaker.workflow.callback_step import CallbackOutput, CallbackStep @@ -34,6 +35,7 @@ from sagemaker.workflow.execution_variables import ExecutionVariables from sagemaker.workflow.parameters import Parameter from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig +from sagemaker.workflow.parallelism_config import ParallelismConfiguration from sagemaker.workflow.properties import Properties from sagemaker.workflow.steps import Step from sagemaker.workflow.step_collections import StepCollection @@ -94,6 +96,7 @@ def create( role_arn: str, description: str = None, tags: List[Dict[str, str]] = None, + parallelism_config: ParallelismConfiguration = None, ) -> Dict[str, Any]: """Creates a Pipeline in the Pipelines service. @@ -102,37 +105,62 @@ def create( description (str): A description of the pipeline. tags (List[Dict[str, str]]): A list of {"Key": "string", "Value": "string"} dicts as tags. + parallelism_config (Optional[ParallelismConfiguration]): Parallelism configuration + that is applied to each of the executions of the pipeline. It takes precedence + over the parallelism configuration of the parent pipeline. Returns: A response dict from the service. """ tags = _append_project_tags(tags) - - kwargs = self._create_args(role_arn, description) + kwargs = self._create_args(role_arn, description, parallelism_config) update_args( kwargs, Tags=tags, ) return self.sagemaker_session.sagemaker_client.create_pipeline(**kwargs) - def _create_args(self, role_arn: str, description: str): + def _create_args( + self, role_arn: str, description: str, parallelism_config: ParallelismConfiguration + ): """Constructs the keyword argument dict for a create_pipeline call. Args: role_arn (str): The role arn that is assumed by pipelines to create step artifacts. description (str): A description of the pipeline. + parallelism_config (Optional[ParallelismConfiguration]): Parallelism configuration + that is applied to each of the executions of the pipeline. It takes precedence + over the parallelism configuration of the parent pipeline. Returns: A keyword argument dict for calling create_pipeline. """ + pipeline_definition = self.definition() kwargs = dict( PipelineName=self.name, - PipelineDefinition=self.definition(), RoleArn=role_arn, ) + + # If pipeline definition is large, upload to S3 bucket and + # provide PipelineDefinitionS3Location to request instead. + if len(pipeline_definition.encode("utf-8")) < 1024 * 100: + kwargs["PipelineDefinition"] = self.definition() + else: + desired_s3_uri = s3.s3_path_join( + "s3://", self.sagemaker_session.default_bucket(), self.name + ) + s3.S3Uploader.upload_string_as_file_body( + body=pipeline_definition, + desired_s3_uri=desired_s3_uri, + sagemaker_session=self.sagemaker_session, + ) + kwargs["PipelineDefinitionS3Location"] = { + "Bucket": self.sagemaker_session.default_bucket(), + "ObjectKey": self.name, + } + update_args( - kwargs, - PipelineDescription=description, + kwargs, PipelineDescription=description, ParallelismConfiguration=parallelism_config ) return kwargs @@ -146,17 +174,25 @@ def describe(self) -> Dict[str, Any]: """ return self.sagemaker_session.sagemaker_client.describe_pipeline(PipelineName=self.name) - def update(self, role_arn: str, description: str = None) -> Dict[str, Any]: + def update( + self, + role_arn: str, + description: str = None, + parallelism_config: ParallelismConfiguration = None, + ) -> Dict[str, Any]: """Updates a Pipeline in the Workflow service. Args: role_arn (str): The role arn that is assumed by pipelines to create step artifacts. description (str): A description of the pipeline. + parallelism_config (Optional[ParallelismConfiguration]): Parallelism configuration + that is applied to each of the executions of the pipeline. It takes precedence + over the parallelism configuration of the parent pipeline. Returns: A response dict from the service. """ - kwargs = self._create_args(role_arn, description) + kwargs = self._create_args(role_arn, description, parallelism_config) return self.sagemaker_session.sagemaker_client.update_pipeline(**kwargs) def upsert( @@ -164,6 +200,7 @@ def upsert( role_arn: str, description: str = None, tags: List[Dict[str, str]] = None, + parallelism_config: ParallelismConfiguration = None, ) -> Dict[str, Any]: """Creates a pipeline or updates it, if it already exists. @@ -172,12 +209,14 @@ def upsert( description (str): A description of the pipeline. tags (List[Dict[str, str]]): A list of {"Key": "string", "Value": "string"} dicts as tags. + parallelism_config (Optional[Config for parallel steps, Parallelism configuration that + is applied to each of. the executions Returns: response dict from service """ try: - response = self.create(role_arn, description, tags) + response = self.create(role_arn, description, tags, parallelism_config) except ClientError as e: error = e.response["Error"] if ( @@ -215,6 +254,7 @@ def start( parameters: Dict[str, Union[str, bool, int, float]] = None, execution_display_name: str = None, execution_description: str = None, + parallelism_config: ParallelismConfiguration = None, ): """Starts a Pipeline execution in the Workflow service. @@ -223,6 +263,9 @@ def start( pipeline parameters. execution_display_name (str): The display name of the pipeline execution. execution_description (str): A description of the execution. + parallelism_config (Optional[ParallelismConfiguration]): Parallelism configuration + that is applied to each of the executions of the pipeline. It takes precedence + over the parallelism configuration of the parent pipeline. Returns: A `_PipelineExecution` instance, if successful. @@ -245,6 +288,7 @@ def start( PipelineParameters=format_start_parameters(parameters), PipelineExecutionDescription=execution_description, PipelineExecutionDisplayName=execution_display_name, + ParallelismConfiguration=parallelism_config, ) response = self.sagemaker_session.sagemaker_client.start_pipeline_execution(**kwargs) return _PipelineExecution( diff --git a/tests/integ/test_workflow.py b/tests/integ/test_workflow.py index 2fe674a203..58b681fd0e 100644 --- a/tests/integ/test_workflow.py +++ b/tests/integ/test_workflow.py @@ -2757,3 +2757,99 @@ def cleanup_feature_group(feature_group: FeatureGroup): except Exception as e: print(f"Delete FeatureGroup failed with error: {e}.") pass + + +def test_large_pipeline(sagemaker_session, role, pipeline_name, region_name): + instance_count = ParameterInteger(name="InstanceCount", default_value=2) + + outputParam = CallbackOutput(output_name="output", output_type=CallbackOutputTypeEnum.String) + + callback_steps = [ + CallbackStep( + name=f"callback-step{count}", + sqs_queue_url="https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue", + inputs={"arg1": "foo"}, + outputs=[outputParam], + ) + for count in range(2000) + ] + pipeline = Pipeline( + name=pipeline_name, + parameters=[instance_count], + steps=callback_steps, + sagemaker_session=sagemaker_session, + ) + + try: + response = pipeline.create(role) + create_arn = response["PipelineArn"] + assert re.match( + fr"arn:aws:sagemaker:{region_name}:\d{{12}}:pipeline/{pipeline_name}", + create_arn, + ) + response = pipeline.describe() + assert len(json.loads(pipeline.describe()["PipelineDefinition"])["Steps"]) == 2000 + + pipeline.parameters = [ParameterInteger(name="InstanceCount", default_value=1)] + response = pipeline.update(role) + update_arn = response["PipelineArn"] + assert re.match( + fr"arn:aws:sagemaker:{region_name}:\d{{12}}:pipeline/{pipeline_name}", + update_arn, + ) + finally: + try: + pipeline.delete() + except Exception: + pass + + +def test_create_and_update_with_parallelism_config( + sagemaker_session, role, pipeline_name, region_name +): + instance_count = ParameterInteger(name="InstanceCount", default_value=2) + + outputParam = CallbackOutput(output_name="output", output_type=CallbackOutputTypeEnum.String) + + callback_steps = [ + CallbackStep( + name=f"callback-step{count}", + sqs_queue_url="https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue", + inputs={"arg1": "foo"}, + outputs=[outputParam], + ) + for count in range(500) + ] + pipeline = Pipeline( + name=pipeline_name, + parameters=[instance_count], + steps=callback_steps, + sagemaker_session=sagemaker_session, + ) + + try: + response = pipeline.create(role, parallelism_config={"MaxParallelExecutionSteps": 50}) + create_arn = response["PipelineArn"] + assert re.match( + fr"arn:aws:sagemaker:{region_name}:\d{{12}}:pipeline/{pipeline_name}", + create_arn, + ) + response = pipeline.describe() + assert response["ParallelismConfiguration"]["MaxParallelExecutionSteps"] == 50 + + pipeline.parameters = [ParameterInteger(name="InstanceCount", default_value=1)] + response = pipeline.update(role, parallelism_config={"MaxParallelExecutionSteps": 55}) + update_arn = response["PipelineArn"] + assert re.match( + fr"arn:aws:sagemaker:{region_name}:\d{{12}}:pipeline/{pipeline_name}", + update_arn, + ) + + response = pipeline.describe() + assert response["ParallelismConfiguration"]["MaxParallelExecutionSteps"] == 55 + + finally: + try: + pipeline.delete() + except Exception: + pass