Skip to content

fix: TrainingStep cache misses due to timestamp based job name #3070

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/sagemaker/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ def __init__(
self._hyperparameters = hyperparameters.copy() if hyperparameters else {}
self.code_location = code_location
self.entry_point = entry_point
self.dependencies = dependencies
self.dependencies = dependencies or []
self.uploaded_code = None
self.tags = add_jumpstart_tags(
tags=tags, training_model_uri=self.model_uri, training_script_uri=self.source_dir
Expand Down
32 changes: 31 additions & 1 deletion src/sagemaker/workflow/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,16 @@ def __init__(
)
warnings.warn(msg)

self.job_name = None
if estimator.source_dir or estimator.entry_point:
# By default, `Estimator` will upload the local code to an S3 path
# containing a timestamp. This causes cache misses whenever a
# pipeline is updated, even if the underlying script hasn't changed.
# To avoid this, hash the contents of the training script and include it
# in the `job_name` passed to the `Estimator`, which will be used
# instead of the timestamped path.
self.job_name = self._generate_code_upload_path()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but job_name later will be excluded from the definition file right? how the backend decide when to use cache or not tho?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Backend uses a hash of all the input arguments to the step + pipeline name + step name. The issue was that the bucket name where all the local scripts were uploaded had a timestamp from when the pipeline was run. This bucket name is passed in as a hyperparameter to the training job.


@property
def arguments(self) -> RequestType:
"""The arguments dictionary that is used to call `create_training_job`.
Expand All @@ -295,7 +305,7 @@ def arguments(self) -> RequestType:
The `TrainingJobName` and `ExperimentConfig` attributes cannot be included.
"""

self.estimator._prepare_for_training()
self.estimator._prepare_for_training(self.job_name)
train_args = _TrainingJob._get_train_args(
self.estimator, self.inputs, experiment_config=dict()
)
Expand All @@ -319,6 +329,26 @@ def to_request(self) -> RequestType:

return request_dict

def _generate_code_upload_path(self) -> str or None:
"""Generate an upload path for local training scripts based on their content."""
from sagemaker.workflow.utilities import hash_files_or_dirs

if self.estimator.source_dir:
source_dir_url = urlparse(self.estimator.source_dir)
if source_dir_url.scheme == "" or source_dir_url.scheme == "file":
code_hash = hash_files_or_dirs(
[self.estimator.source_dir] + self.estimator.dependencies
)
return f"{self.name}-{code_hash}"[:1024]
elif self.estimator.entry_point:
entry_point_url = urlparse(self.estimator.entry_point)
if entry_point_url.scheme == "" or entry_point_url.scheme == "file":
code_hash = hash_files_or_dirs(
[self.estimator.entry_point] + self.estimator.dependencies
)
return f"{self.name}-{code_hash}"[:1024]
return None


class CreateModelStep(ConfigurableRetryStep):
"""`CreateModelStep` for SageMaker Pipelines Workflows."""
Expand Down
81 changes: 76 additions & 5 deletions src/sagemaker/workflow/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,20 @@
"""Utilities to support workflow."""
from __future__ import absolute_import

from pathlib import Path
from typing import List, Sequence, Union
import hashlib
from urllib.parse import unquote, urlparse
from _hashlib import HASH as Hash

from sagemaker.workflow.entities import (
Entity,
RequestType,
)
from sagemaker.workflow.step_collections import StepCollection

BUF_SIZE = 65536 # 64KiB


def list_to_request(entities: Sequence[Union[Entity, StepCollection]]) -> List[RequestType]:
"""Get the request structure for list of entities.
Expand All @@ -49,15 +53,82 @@ def hash_file(path: str) -> str:
Returns:
str: The MD5 hash of the file.
"""
BUF_SIZE = 65536 # read in 64KiB chunks
return _hash_file(path, hashlib.md5()).hexdigest()


def hash_files_or_dirs(paths: List[str]) -> str:
"""Get the MD5 hash of the contents of a list of files or directories.
Hash is changed if:
* input list is changed
* new nested directories/files are added to any directory in the input list
* nested directory/file names are changed for any of the inputted directories
* content of files is edited
Args:
paths: List of file or directory paths
Returns:
str: The MD5 hash of the list of files or directories.
"""
md5 = hashlib.md5()
if path.lower().startswith("file://"):
for path in sorted(paths):
md5 = _hash_file_or_dir(path, md5)
return md5.hexdigest()


def _hash_file_or_dir(path: str, md5: Hash) -> Hash:
"""Updates the inputted Hash with the contents of the current path.
Args:
path: path of file or directory
Returns:
str: The MD5 hash of the file or directory
"""
if isinstance(path, str) and path.lower().startswith("file://"):
path = unquote(urlparse(path).path)
with open(path, "rb") as f:
md5.update(path.encode())
if Path(path).is_dir():
md5 = _hash_dir(path, md5)
elif Path(path).is_file():
md5 = _hash_file(path, md5)
return md5


def _hash_dir(directory: Union[str, Path], md5: Hash) -> Hash:
"""Updates the inputted Hash with the contents of the current path.
Args:
directory: path of the directory
Returns:
str: The MD5 hash of the directory
"""
if not Path(directory).is_dir():
raise ValueError(str(directory) + " is not a valid directory")
for path in sorted(Path(directory).iterdir()):
md5.update(path.name.encode())
if path.is_file():
md5 = _hash_file(path, md5)
elif path.is_dir():
md5 = _hash_dir(path, md5)
return md5


def _hash_file(file: Union[str, Path], md5: Hash) -> Hash:
"""Updates the inputted Hash with the contents of the current path.
Args:
file: path of the file
Returns:
str: The MD5 hash of the file
"""
if isinstance(file, str) and file.lower().startswith("file://"):
file = unquote(urlparse(file).path)
if not Path(file).is_file():
raise ValueError(str(file) + " is not a valid file")
with open(file, "rb") as f:
while True:
data = f.read(BUF_SIZE)
if not data:
break
md5.update(data)

return md5.hexdigest()
return md5
70 changes: 69 additions & 1 deletion tests/unit/sagemaker/workflow/test_utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
from __future__ import absolute_import

import tempfile
from sagemaker.workflow.utilities import hash_file
from sagemaker.workflow.utilities import hash_file, hash_files_or_dirs
from pathlib import Path


def test_hash_file():
Expand All @@ -29,3 +30,70 @@ def test_hash_file_uri():
tmp.write("hashme".encode())
hash = hash_file(f"file:///{tmp.name}")
assert hash == "d41d8cd98f00b204e9800998ecf8427e"


def test_hash_files_or_dirs_with_file():
with tempfile.NamedTemporaryFile() as tmp:
tmp.write("hashme".encode())
hash1 = hash_files_or_dirs([f"file:///{tmp.name}"])
# compute hash again with no change to file
hash2 = hash_files_or_dirs([f"file:///{tmp.name}"])
assert hash1 == hash2


def test_hash_files_or_dirs_with_directory():
with tempfile.TemporaryDirectory() as tmpdirname:
temp_dir = Path(tmpdirname)
file_name = temp_dir / "test.txt"
file_name.write_text("foo bar")
hash1 = hash_files_or_dirs([tmpdirname])
# compute hash again with no change to directory
hash2 = hash_files_or_dirs([tmpdirname])
assert hash1 == hash2


def test_hash_files_or_dirs_change_file_content():
with tempfile.TemporaryDirectory() as tmpdirname:
temp_dir = Path(tmpdirname)
file_name = temp_dir / "test.txt"
file_name.write_text("foo bar")
hash1 = hash_files_or_dirs([tmpdirname])
# change file content
file_name.write_text("new text")
hash2 = hash_files_or_dirs([tmpdirname])
assert hash1 != hash2


def test_hash_files_or_dirs_rename_file():
with tempfile.TemporaryDirectory() as tmpdirname:
temp_dir = Path(tmpdirname)
file_name = temp_dir / "test.txt"
file_name.write_text("foo bar")
hash1 = hash_files_or_dirs([tmpdirname])
# rename file
file_name.rename(temp_dir / "test1.txt")
hash2 = hash_files_or_dirs([tmpdirname])
assert hash1 != hash2


def test_hash_files_or_dirs_add_new_file():
with tempfile.TemporaryDirectory() as tmpdirname:
temp_dir = Path(tmpdirname)
file_name = temp_dir / "test.txt"
file_name.write_text("foo bar")
hash1 = hash_files_or_dirs([tmpdirname])
# add new file
file_name2 = temp_dir / "test2.txt"
file_name2.write_text("test test")
hash2 = hash_files_or_dirs([tmpdirname])
assert hash1 != hash2


def test_hash_files_or_dirs_unsorted_input_list():
with tempfile.NamedTemporaryFile() as tmp1:
tmp1.write("hashme".encode())
with tempfile.NamedTemporaryFile() as tmp2:
tmp2.write("hashme".encode())
hash1 = hash_files_or_dirs([tmp1.name, tmp2.name])
hash2 = hash_files_or_dirs([tmp2.name, tmp1.name])
assert hash1 == hash2
4 changes: 4 additions & 0 deletions tests/unit/sagemaker/workflow/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ def test_repack_model_step(estimator):
assert hyperparameters["inference_script"] == '"dummy_script.py"'
assert hyperparameters["model_archive"] == '"s3://my-bucket/model.tar.gz"'
assert hyperparameters["sagemaker_program"] == '"_repack_model.py"'
assert (
hyperparameters["sagemaker_submit_directory"]
== '"s3://my-bucket/MyRepackModelStep-1be10316814854973ed1b445db3ef84e/source/sourcedir.tar.gz"'
)

del request_dict["Arguments"]["HyperParameters"]
del request_dict["Arguments"]["AlgorithmSpecification"]["TrainingImage"]
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/test_estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1598,7 +1598,7 @@ def test_git_support_with_branch_and_commit_succeed(git_clone_repo, sagemaker_se
git_clone_repo.side_effect = lambda gitconfig, entrypoint, source_dir=None, dependencies=None: {
"entry_point": "/tmp/repo_dir/entry_point",
"source_dir": None,
"dependencies": None,
"dependencies": [],
}
git_config = {"repo": GIT_REPO, "branch": BRANCH, "commit": COMMIT}
entry_point = "entry_point"
Expand Down Expand Up @@ -3448,7 +3448,7 @@ def test_git_support_with_branch_and_commit_succeed_estimator_class(
image_uri=IMAGE_URI,
)
fw.fit()
git_clone_repo.assert_called_once_with(git_config, entry_point, None, None)
git_clone_repo.assert_called_once_with(git_config, entry_point, None, [])


@patch("sagemaker.estimator.Estimator._stage_user_code_in_s3")
Expand Down