Skip to content

Feature/large pipeline #2706

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from
150 changes: 94 additions & 56 deletions ci-scripts/queue_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,100 +23,138 @@
).get_caller_identity()["Account"]
bucket_name = "sagemaker-us-west-2-%s" % account

MAX_IN_PROGRESS_BUILDS = 3
INTERVAL_BETWEEN_CONCURRENT_RUNS = 15 # minutes
CLEAN_UP_TICKETS_OLDER_THAN = 8 # hours


def queue_build():
build_id = re.sub("[_/]", "-", os.environ.get("CODEBUILD_BUILD_ID", "CODEBUILD-BUILD-ID"))
source_version = re.sub(
"[_/]",
"-",
os.environ.get("CODEBUILD_SOURCE_VERSION", "CODEBUILD-SOURCE-VERSION"),
)
ticket_number = int(1000 * time.time())
filename = "%s_%s_%s" % (ticket_number, build_id, source_version)

print("Created queue ticket %s" % ticket_number)

_write_ticket(filename)
files = _list_tickets()
_cleanup_tickets_older_than_8_hours(files)
_wait_for_other_builds(files, ticket_number)
_cleanup_tickets_older_than(files)
_wait_for_other_builds(ticket_number)


def _build_info_from_file(file):
filename = file.key.split("/")[1]
filename = file.key.split("/")[2]
ticket_number, build_id, source_version = filename.split("_")
return int(ticket_number), build_id, source_version


def _wait_for_other_builds(files, ticket_number):
newfiles = list(filter(lambda file: not _file_older_than(file), files))
sorted_files = list(sorted(newfiles, key=lambda y: y.key))
def _wait_for_other_builds(ticket_number):
sorted_files = _list_tickets()

print("build queue status:")
print()

for order, file in enumerate(sorted_files):
file_ticket_number, build_id, source_version = _build_info_from_file(file)
print(
"%s -> %s %s, ticket number: %s" % (order, build_id, source_version, file_ticket_number)
"%s -> %s %s, ticket number: %s status: %s"
% (order, build_id, source_version, file_ticket_number, file.key.split("/")[1])
)
print()
build_id = re.sub("[_/]", "-", os.environ.get("CODEBUILD_BUILD_ID", "CODEBUILD-BUILD-ID"))
source_version = re.sub(
"[_/]",
"-",
os.environ.get("CODEBUILD_SOURCE_VERSION", "CODEBUILD-SOURCE-VERSION"),
)
filename = "%s_%s_%s" % (ticket_number, build_id, source_version)
s3_file_obj = _write_ticket(filename, status="waiting")
print("Build %s waiting to be scheduled" % filename)

while True:
_cleanup_tickets_with_terminal_states()
waiting_tickets = _list_tickets("waiting")
if waiting_tickets:
first_waiting_ticket_number, _, _ = _build_info_from_file(_list_tickets("waiting")[0])
else:
first_waiting_ticket_number = ticket_number

if (
len(_list_tickets(status="in-progress")) < 3
and last_in_progress_elapsed_time_check()
and first_waiting_ticket_number == ticket_number
):
# put the build in progress
print("Scheduling build %s for running.." % filename)
s3_file_obj.delete()
_write_ticket(filename, status="in-progress")
break
else:
# wait
time.sleep(30)

for file in sorted_files:
file_ticket_number, build_id, source_version = _build_info_from_file(file)

if file_ticket_number == ticket_number:
def last_in_progress_elapsed_time_check():
in_progress_tickets = _list_tickets("in-progress")
if not in_progress_tickets:
return True
last_in_progress_ticket, _, _ = _build_info_from_file(_list_tickets("in-progress")[-1])
_elapsed_time = int(1000 * time.time()) - last_in_progress_ticket
last_in_progress_elapsed_time = int(_elapsed_time / (1000 * 60)) # in minutes
return last_in_progress_elapsed_time > INTERVAL_BETWEEN_CONCURRENT_RUNS

break
else:
while True:
client = boto3.client("codebuild")
response = client.batch_get_builds(ids=[build_id])
build_status = response["builds"][0]["buildStatus"]

if build_status == "IN_PROGRESS":
print(
"waiting on build %s %s %s" % (build_id, source_version, file_ticket_number)
)
time.sleep(30)
else:
print("build %s finished, deleting lock" % build_id)
file.delete()
break


def _cleanup_tickets_older_than_8_hours(files):

def _cleanup_tickets_with_terminal_states():
files = _list_tickets()
build_ids = []
for file in files:
_, build_id, _ = _build_info_from_file(file)
build_ids.append(build_id)

client = boto3.client("codebuild")
response = client.batch_get_builds(ids=build_ids)

for file, build_details in zip(files, response["builds"]):
_, _build_id_from_file, _ = _build_info_from_file(file)
build_status = build_details["buildStatus"]

if build_status != "IN_PROGRESS" and _build_id_from_file == build_details["id"]:
print(
"Build %s in terminal state: %s, deleting lock"
% (_build_id_from_file, build_status)
)
file.delete()


def _cleanup_tickets_older_than(files):
oldfiles = list(filter(_file_older_than, files))
for file in oldfiles:
print("object %s older than 8 hours. Deleting" % file.key)
file.delete()
return files


def _list_tickets():
def _list_tickets(status=None):
s3 = boto3.resource("s3")
bucket = s3.Bucket(bucket_name)
objects = [file for file in bucket.objects.filter(Prefix="ci-lock/")]
files = list(filter(lambda x: x != "ci-lock/", objects))
return files
prefix = "ci-integ-queue/{}/".format(status) if status else "ci-integ-queue/"
objects = [file for file in bucket.objects.filter(Prefix=prefix)]
files = list(filter(lambda x: x != prefix, objects))
sorted_files = list(sorted(files, key=lambda y: y.key))
return sorted_files


def _file_older_than(file):
timelimit = 1000 * 60 * 60 * 8

timelimit = 1000 * 60 * 60 * CLEAN_UP_TICKETS_OLDER_THAN
file_ticket_number, build_id, source_version = _build_info_from_file(file)
return int(1000 * time.time()) - file_ticket_number > timelimit

return int(time.time()) - file_ticket_number > timelimit


def _write_ticket(ticket_number):

if not os.path.exists("ci-lock"):
os.mkdir("ci-lock")
def _write_ticket(filename, status="waiting"):
file_path = "ci-integ-queue/{}".format(status)
if not os.path.exists(file_path):
os.makedirs(file_path)

filename = "ci-lock/" + ticket_number
with open(filename, "w") as file:
file.write(ticket_number)
boto3.Session().resource("s3").Object(bucket_name, filename).upload_file(filename)
file_full_path = file_path + "/" + filename
with open(file_full_path, "w") as file:
file.write(filename)
s3_file_obj = boto3.Session().resource("s3").Object(bucket_name, file_full_path)
s3_file_obj.upload_file(file_full_path)
print("Build %s is now in state %s" % (filename, status))
return s3_file_obj


if __name__ == "__main__":
Expand Down
6 changes: 6 additions & 0 deletions doc/workflows/pipelines/sagemaker.workflow.pipelines.rst
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ Pipeline
.. autoclass:: sagemaker.workflow.pipeline._PipelineExecution
:members:

Parallelism Configuration
-------------------------

.. autoclass:: sagemaker.workflow.parallelism_config.ParallelismConfiguration
:members:

Pipeline Experiment Config
--------------------------

Expand Down
24 changes: 11 additions & 13 deletions src/sagemaker/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -3556,19 +3556,17 @@ def endpoint_from_production_variants(
Returns:
str: The name of the created ``Endpoint``.
"""
if not _deployment_entity_exists(
lambda: self.sagemaker_client.describe_endpoint_config(EndpointConfigName=name)
):
config_options = {"EndpointConfigName": name, "ProductionVariants": production_variants}
tags = _append_project_tags(tags)
if tags:
config_options["Tags"] = tags
if kms_key:
config_options["KmsKeyId"] = kms_key
if data_capture_config_dict is not None:
config_options["DataCaptureConfig"] = data_capture_config_dict

self.sagemaker_client.create_endpoint_config(**config_options)
config_options = {"EndpointConfigName": name, "ProductionVariants": production_variants}
tags = _append_project_tags(tags)
if tags:
config_options["Tags"] = tags
if kms_key:
config_options["KmsKeyId"] = kms_key
if data_capture_config_dict is not None:
config_options["DataCaptureConfig"] = data_capture_config_dict

self.sagemaker_client.create_endpoint_config(**config_options)

return self.create_endpoint(endpoint_name=name, config_name=name, tags=tags, wait=wait)

def expand_role(self, role):
Expand Down
34 changes: 34 additions & 0 deletions src/sagemaker/workflow/parallelism_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""Pipeline Parallelism Configuration"""
from __future__ import absolute_import
from sagemaker.workflow.entities import RequestType


class ParallelismConfiguration:
"""Parallelism config for SageMaker pipeline."""

def __init__(self, max_parallel_execution_steps: int):
"""Create a ParallelismConfiguration

Args:
max_parallel_execution_steps, int:
max number of steps which could be parallelized
"""
self.max_parallel_execution_steps = max_parallel_execution_steps

def to_request(self) -> RequestType:
"""Returns: the request structure."""
return {
"MaxParallelExecutionSteps": self.max_parallel_execution_steps,
}
Loading