Skip to content

feature: allow conditional parellel builds #2727

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 4, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 94 additions & 56 deletions ci-scripts/queue_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,100 +23,138 @@
).get_caller_identity()["Account"]
bucket_name = "sagemaker-us-west-2-%s" % account

MAX_IN_PROGRESS_BUILDS = 3
INTERVAL_BETWEEN_CONCURRENT_RUNS = 15 # minutes
CLEAN_UP_TICKETS_OLDER_THAN = 8 # hours


def queue_build():
build_id = re.sub("[_/]", "-", os.environ.get("CODEBUILD_BUILD_ID", "CODEBUILD-BUILD-ID"))
source_version = re.sub(
"[_/]",
"-",
os.environ.get("CODEBUILD_SOURCE_VERSION", "CODEBUILD-SOURCE-VERSION"),
)
ticket_number = int(1000 * time.time())
filename = "%s_%s_%s" % (ticket_number, build_id, source_version)

print("Created queue ticket %s" % ticket_number)

_write_ticket(filename)
files = _list_tickets()
_cleanup_tickets_older_than_8_hours(files)
_wait_for_other_builds(files, ticket_number)
_cleanup_tickets_older_than(files)
_wait_for_other_builds(ticket_number)


def _build_info_from_file(file):
filename = file.key.split("/")[1]
filename = file.key.split("/")[2]
ticket_number, build_id, source_version = filename.split("_")
return int(ticket_number), build_id, source_version


def _wait_for_other_builds(files, ticket_number):
newfiles = list(filter(lambda file: not _file_older_than(file), files))
sorted_files = list(sorted(newfiles, key=lambda y: y.key))
def _wait_for_other_builds(ticket_number):
sorted_files = _list_tickets()

print("build queue status:")
print()

for order, file in enumerate(sorted_files):
file_ticket_number, build_id, source_version = _build_info_from_file(file)
print(
"%s -> %s %s, ticket number: %s" % (order, build_id, source_version, file_ticket_number)
"%s -> %s %s, ticket number: %s status: %s"
% (order, build_id, source_version, file_ticket_number, file.key.split("/")[1])
)
print()
build_id = re.sub("[_/]", "-", os.environ.get("CODEBUILD_BUILD_ID", "CODEBUILD-BUILD-ID"))
source_version = re.sub(
"[_/]",
"-",
os.environ.get("CODEBUILD_SOURCE_VERSION", "CODEBUILD-SOURCE-VERSION"),
)
filename = "%s_%s_%s" % (ticket_number, build_id, source_version)
s3_file_obj = _write_ticket(filename, status="waiting")
print("Build %s waiting to be scheduled" % filename)

while True:
_cleanup_tickets_with_terminal_states()
waiting_tickets = _list_tickets("waiting")
if waiting_tickets:
first_waiting_ticket_number, _, _ = _build_info_from_file(_list_tickets("waiting")[0])
else:
first_waiting_ticket_number = ticket_number

if (
len(_list_tickets(status="in-progress")) < 3
and last_in_progress_elapsed_time_check()
and first_waiting_ticket_number == ticket_number
):
# put the build in progress
print("Scheduling build %s for running.." % filename)
s3_file_obj.delete()
_write_ticket(filename, status="in-progress")
break
else:
# wait
time.sleep(30)

for file in sorted_files:
file_ticket_number, build_id, source_version = _build_info_from_file(file)

if file_ticket_number == ticket_number:
def last_in_progress_elapsed_time_check():
in_progress_tickets = _list_tickets("in-progress")
if not in_progress_tickets:
return True
last_in_progress_ticket, _, _ = _build_info_from_file(_list_tickets("in-progress")[-1])
_elapsed_time = int(1000 * time.time()) - last_in_progress_ticket
last_in_progress_elapsed_time = int(_elapsed_time / (1000 * 60)) # in minutes
return last_in_progress_elapsed_time > INTERVAL_BETWEEN_CONCURRENT_RUNS

break
else:
while True:
client = boto3.client("codebuild")
response = client.batch_get_builds(ids=[build_id])
build_status = response["builds"][0]["buildStatus"]

if build_status == "IN_PROGRESS":
print(
"waiting on build %s %s %s" % (build_id, source_version, file_ticket_number)
)
time.sleep(30)
else:
print("build %s finished, deleting lock" % build_id)
file.delete()
break


def _cleanup_tickets_older_than_8_hours(files):

def _cleanup_tickets_with_terminal_states():
files = _list_tickets()
build_ids = []
for file in files:
_, build_id, _ = _build_info_from_file(file)
build_ids.append(build_id)

client = boto3.client("codebuild")
response = client.batch_get_builds(ids=build_ids)

for file, build_details in zip(files, response["builds"]):
_, _build_id_from_file, _ = _build_info_from_file(file)
build_status = build_details["buildStatus"]

if build_status != "IN_PROGRESS" and _build_id_from_file == build_details["id"]:
print(
"Build %s in terminal state: %s, deleting lock"
% (_build_id_from_file, build_status)
)
file.delete()


def _cleanup_tickets_older_than(files):
oldfiles = list(filter(_file_older_than, files))
for file in oldfiles:
print("object %s older than 8 hours. Deleting" % file.key)
file.delete()
return files


def _list_tickets():
def _list_tickets(status=None):
s3 = boto3.resource("s3")
bucket = s3.Bucket(bucket_name)
objects = [file for file in bucket.objects.filter(Prefix="ci-lock/")]
files = list(filter(lambda x: x != "ci-lock/", objects))
return files
prefix = "ci-integ-queue/{}/".format(status) if status else "ci-integ-queue/"
objects = [file for file in bucket.objects.filter(Prefix=prefix)]
files = list(filter(lambda x: x != prefix, objects))
sorted_files = list(sorted(files, key=lambda y: y.key))
return sorted_files


def _file_older_than(file):
timelimit = 1000 * 60 * 60 * 8

timelimit = 1000 * 60 * 60 * CLEAN_UP_TICKETS_OLDER_THAN
file_ticket_number, build_id, source_version = _build_info_from_file(file)
return int(1000 * time.time()) - file_ticket_number > timelimit

return int(time.time()) - file_ticket_number > timelimit


def _write_ticket(ticket_number):

if not os.path.exists("ci-lock"):
os.mkdir("ci-lock")
def _write_ticket(filename, status="waiting"):
file_path = "ci-integ-queue/{}".format(status)
if not os.path.exists(file_path):
os.makedirs(file_path)

filename = "ci-lock/" + ticket_number
with open(filename, "w") as file:
file.write(ticket_number)
boto3.Session().resource("s3").Object(bucket_name, filename).upload_file(filename)
file_full_path = file_path + "/" + filename
with open(file_full_path, "w") as file:
file.write(filename)
s3_file_obj = boto3.Session().resource("s3").Object(bucket_name, file_full_path)
s3_file_obj.upload_file(file_full_path)
print("Build %s is now in state %s" % (filename, status))
return s3_file_obj


if __name__ == "__main__":
Expand Down