diff --git a/src/sagemaker/estimator.py b/src/sagemaker/estimator.py index fc4d98b596..2aa8d83225 100644 --- a/src/sagemaker/estimator.py +++ b/src/sagemaker/estimator.py @@ -457,7 +457,7 @@ def __init__( self._hyperparameters = hyperparameters.copy() if hyperparameters else {} self.code_location = code_location self.entry_point = entry_point - self.dependencies = dependencies + self.dependencies = dependencies or [] self.uploaded_code = None self.tags = add_jumpstart_tags( tags=tags, training_model_uri=self.model_uri, training_script_uri=self.source_dir diff --git a/src/sagemaker/workflow/steps.py b/src/sagemaker/workflow/steps.py index 1ef63ef915..7b6a06e0a1 100644 --- a/src/sagemaker/workflow/steps.py +++ b/src/sagemaker/workflow/steps.py @@ -287,6 +287,16 @@ def __init__( ) warnings.warn(msg) + self.job_name = None + if estimator.source_dir or estimator.entry_point: + # By default, `Estimator` will upload the local code to an S3 path + # containing a timestamp. This causes cache misses whenever a + # pipeline is updated, even if the underlying script hasn't changed. + # To avoid this, hash the contents of the training script and include it + # in the `job_name` passed to the `Estimator`, which will be used + # instead of the timestamped path. + self.job_name = self._generate_code_upload_path() + @property def arguments(self) -> RequestType: """The arguments dictionary that is used to call `create_training_job`. @@ -295,7 +305,7 @@ def arguments(self) -> RequestType: The `TrainingJobName` and `ExperimentConfig` attributes cannot be included. """ - self.estimator._prepare_for_training() + self.estimator._prepare_for_training(self.job_name) train_args = _TrainingJob._get_train_args( self.estimator, self.inputs, experiment_config=dict() ) @@ -319,6 +329,26 @@ def to_request(self) -> RequestType: return request_dict + def _generate_code_upload_path(self) -> str or None: + """Generate an upload path for local training scripts based on their content.""" + from sagemaker.workflow.utilities import hash_files_or_dirs + + if self.estimator.source_dir: + source_dir_url = urlparse(self.estimator.source_dir) + if source_dir_url.scheme == "" or source_dir_url.scheme == "file": + code_hash = hash_files_or_dirs( + [self.estimator.source_dir] + self.estimator.dependencies + ) + return f"{self.name}-{code_hash}"[:1024] + elif self.estimator.entry_point: + entry_point_url = urlparse(self.estimator.entry_point) + if entry_point_url.scheme == "" or entry_point_url.scheme == "file": + code_hash = hash_files_or_dirs( + [self.estimator.entry_point] + self.estimator.dependencies + ) + return f"{self.name}-{code_hash}"[:1024] + return None + class CreateModelStep(ConfigurableRetryStep): """`CreateModelStep` for SageMaker Pipelines Workflows.""" diff --git a/src/sagemaker/workflow/utilities.py b/src/sagemaker/workflow/utilities.py index 16a832a14f..8f6370f5a4 100644 --- a/src/sagemaker/workflow/utilities.py +++ b/src/sagemaker/workflow/utilities.py @@ -13,9 +13,11 @@ """Utilities to support workflow.""" from __future__ import absolute_import +from pathlib import Path from typing import List, Sequence, Union import hashlib from urllib.parse import unquote, urlparse +from _hashlib import HASH as Hash from sagemaker.workflow.entities import ( Entity, @@ -23,6 +25,8 @@ ) from sagemaker.workflow.step_collections import StepCollection +BUF_SIZE = 65536 # 64KiB + def list_to_request(entities: Sequence[Union[Entity, StepCollection]]) -> List[RequestType]: """Get the request structure for list of entities. @@ -49,15 +53,82 @@ def hash_file(path: str) -> str: Returns: str: The MD5 hash of the file. """ - BUF_SIZE = 65536 # read in 64KiB chunks + return _hash_file(path, hashlib.md5()).hexdigest() + + +def hash_files_or_dirs(paths: List[str]) -> str: + """Get the MD5 hash of the contents of a list of files or directories. + + Hash is changed if: + * input list is changed + * new nested directories/files are added to any directory in the input list + * nested directory/file names are changed for any of the inputted directories + * content of files is edited + + Args: + paths: List of file or directory paths + Returns: + str: The MD5 hash of the list of files or directories. + """ md5 = hashlib.md5() - if path.lower().startswith("file://"): + for path in sorted(paths): + md5 = _hash_file_or_dir(path, md5) + return md5.hexdigest() + + +def _hash_file_or_dir(path: str, md5: Hash) -> Hash: + """Updates the inputted Hash with the contents of the current path. + + Args: + path: path of file or directory + Returns: + str: The MD5 hash of the file or directory + """ + if isinstance(path, str) and path.lower().startswith("file://"): path = unquote(urlparse(path).path) - with open(path, "rb") as f: + md5.update(path.encode()) + if Path(path).is_dir(): + md5 = _hash_dir(path, md5) + elif Path(path).is_file(): + md5 = _hash_file(path, md5) + return md5 + + +def _hash_dir(directory: Union[str, Path], md5: Hash) -> Hash: + """Updates the inputted Hash with the contents of the current path. + + Args: + directory: path of the directory + Returns: + str: The MD5 hash of the directory + """ + if not Path(directory).is_dir(): + raise ValueError(str(directory) + " is not a valid directory") + for path in sorted(Path(directory).iterdir()): + md5.update(path.name.encode()) + if path.is_file(): + md5 = _hash_file(path, md5) + elif path.is_dir(): + md5 = _hash_dir(path, md5) + return md5 + + +def _hash_file(file: Union[str, Path], md5: Hash) -> Hash: + """Updates the inputted Hash with the contents of the current path. + + Args: + file: path of the file + Returns: + str: The MD5 hash of the file + """ + if isinstance(file, str) and file.lower().startswith("file://"): + file = unquote(urlparse(file).path) + if not Path(file).is_file(): + raise ValueError(str(file) + " is not a valid file") + with open(file, "rb") as f: while True: data = f.read(BUF_SIZE) if not data: break md5.update(data) - - return md5.hexdigest() + return md5 diff --git a/tests/unit/sagemaker/workflow/test_utilities.py b/tests/unit/sagemaker/workflow/test_utilities.py index d128d8b31d..35afeae1fd 100644 --- a/tests/unit/sagemaker/workflow/test_utilities.py +++ b/tests/unit/sagemaker/workflow/test_utilities.py @@ -14,7 +14,8 @@ from __future__ import absolute_import import tempfile -from sagemaker.workflow.utilities import hash_file +from sagemaker.workflow.utilities import hash_file, hash_files_or_dirs +from pathlib import Path def test_hash_file(): @@ -29,3 +30,70 @@ def test_hash_file_uri(): tmp.write("hashme".encode()) hash = hash_file(f"file:///{tmp.name}") assert hash == "d41d8cd98f00b204e9800998ecf8427e" + + +def test_hash_files_or_dirs_with_file(): + with tempfile.NamedTemporaryFile() as tmp: + tmp.write("hashme".encode()) + hash1 = hash_files_or_dirs([f"file:///{tmp.name}"]) + # compute hash again with no change to file + hash2 = hash_files_or_dirs([f"file:///{tmp.name}"]) + assert hash1 == hash2 + + +def test_hash_files_or_dirs_with_directory(): + with tempfile.TemporaryDirectory() as tmpdirname: + temp_dir = Path(tmpdirname) + file_name = temp_dir / "test.txt" + file_name.write_text("foo bar") + hash1 = hash_files_or_dirs([tmpdirname]) + # compute hash again with no change to directory + hash2 = hash_files_or_dirs([tmpdirname]) + assert hash1 == hash2 + + +def test_hash_files_or_dirs_change_file_content(): + with tempfile.TemporaryDirectory() as tmpdirname: + temp_dir = Path(tmpdirname) + file_name = temp_dir / "test.txt" + file_name.write_text("foo bar") + hash1 = hash_files_or_dirs([tmpdirname]) + # change file content + file_name.write_text("new text") + hash2 = hash_files_or_dirs([tmpdirname]) + assert hash1 != hash2 + + +def test_hash_files_or_dirs_rename_file(): + with tempfile.TemporaryDirectory() as tmpdirname: + temp_dir = Path(tmpdirname) + file_name = temp_dir / "test.txt" + file_name.write_text("foo bar") + hash1 = hash_files_or_dirs([tmpdirname]) + # rename file + file_name.rename(temp_dir / "test1.txt") + hash2 = hash_files_or_dirs([tmpdirname]) + assert hash1 != hash2 + + +def test_hash_files_or_dirs_add_new_file(): + with tempfile.TemporaryDirectory() as tmpdirname: + temp_dir = Path(tmpdirname) + file_name = temp_dir / "test.txt" + file_name.write_text("foo bar") + hash1 = hash_files_or_dirs([tmpdirname]) + # add new file + file_name2 = temp_dir / "test2.txt" + file_name2.write_text("test test") + hash2 = hash_files_or_dirs([tmpdirname]) + assert hash1 != hash2 + + +def test_hash_files_or_dirs_unsorted_input_list(): + with tempfile.NamedTemporaryFile() as tmp1: + tmp1.write("hashme".encode()) + with tempfile.NamedTemporaryFile() as tmp2: + tmp2.write("hashme".encode()) + hash1 = hash_files_or_dirs([tmp1.name, tmp2.name]) + hash2 = hash_files_or_dirs([tmp2.name, tmp1.name]) + assert hash1 == hash2 diff --git a/tests/unit/sagemaker/workflow/test_utils.py b/tests/unit/sagemaker/workflow/test_utils.py index 413afd7b58..dc081b10c6 100644 --- a/tests/unit/sagemaker/workflow/test_utils.py +++ b/tests/unit/sagemaker/workflow/test_utils.py @@ -120,6 +120,10 @@ def test_repack_model_step(estimator): assert hyperparameters["inference_script"] == '"dummy_script.py"' assert hyperparameters["model_archive"] == '"s3://my-bucket/model.tar.gz"' assert hyperparameters["sagemaker_program"] == '"_repack_model.py"' + assert ( + hyperparameters["sagemaker_submit_directory"] + == '"s3://my-bucket/MyRepackModelStep-1be10316814854973ed1b445db3ef84e/source/sourcedir.tar.gz"' + ) del request_dict["Arguments"]["HyperParameters"] del request_dict["Arguments"]["AlgorithmSpecification"]["TrainingImage"] diff --git a/tests/unit/test_estimator.py b/tests/unit/test_estimator.py index 222d61b3d0..e0f5069bf2 100644 --- a/tests/unit/test_estimator.py +++ b/tests/unit/test_estimator.py @@ -1598,7 +1598,7 @@ def test_git_support_with_branch_and_commit_succeed(git_clone_repo, sagemaker_se git_clone_repo.side_effect = lambda gitconfig, entrypoint, source_dir=None, dependencies=None: { "entry_point": "/tmp/repo_dir/entry_point", "source_dir": None, - "dependencies": None, + "dependencies": [], } git_config = {"repo": GIT_REPO, "branch": BRANCH, "commit": COMMIT} entry_point = "entry_point" @@ -3448,7 +3448,7 @@ def test_git_support_with_branch_and_commit_succeed_estimator_class( image_uri=IMAGE_URI, ) fw.fit() - git_clone_repo.assert_called_once_with(git_config, entry_point, None, None) + git_clone_repo.assert_called_once_with(git_config, entry_point, None, []) @patch("sagemaker.estimator.Estimator._stage_user_code_in_s3")