diff --git a/.gitignore b/.gitignore index 5b496055e9..9829ed9781 100644 --- a/.gitignore +++ b/.gitignore @@ -29,4 +29,6 @@ venv/ env/ .vscode/ **/tmp -.python-version \ No newline at end of file +.python-version +**/_repack_model.py +**/_repack_script_launcher.sh \ No newline at end of file diff --git a/src/sagemaker/workflow/_utils.py b/src/sagemaker/workflow/_utils.py index f8a99996a5..8ba65f1eee 100644 --- a/src/sagemaker/workflow/_utils.py +++ b/src/sagemaker/workflow/_utils.py @@ -40,6 +40,21 @@ FRAMEWORK_VERSION = "0.23-1" INSTANCE_TYPE = "ml.m5.large" REPACK_SCRIPT = "_repack_model.py" +REPACK_SCRIPT_LAUNCHER = "_repack_script_launcher.sh" +LAUNCH_REPACK_SCRIPT_CMD = """ +#!/bin/bash + +var_dependencies="${SM_HP_DEPENDENCIES}" +var_inference_script="${SM_HP_INFERENCE_SCRIPT}" +var_model_archive="${SM_HP_MODEL_ARCHIVE}" +var_source_dir="${SM_HP_SOURCE_DIR}" + +python _repack_model.py \ +--dependencies "${var_dependencies}" \ +--inference_script "${var_inference_script}" \ +--model_archive "${var_model_archive}" \ +--source_dir "${var_source_dir}" +""" class _RepackModelStep(TrainingStep): @@ -155,7 +170,7 @@ def __init__( repacker = SKLearn( framework_version=FRAMEWORK_VERSION, instance_type=INSTANCE_TYPE, - entry_point=REPACK_SCRIPT, + entry_point=REPACK_SCRIPT_LAUNCHER, source_dir=self._source_dir, dependencies=self._dependencies, sagemaker_session=self.sagemaker_session, @@ -189,7 +204,7 @@ def _prepare_for_repacking(self): if self._source_dir is None: self._establish_source_dir() - self._inject_repack_script() + self._inject_repack_script_and_launcher() def _establish_source_dir(self): """If the source_dir is None, creates it for the repacking job. @@ -206,18 +221,28 @@ def _establish_source_dir(self): shutil.copy2(self._entry_point, os.path.join(self._source_dir, self._entry_point_basename)) self._entry_point = self._entry_point_basename - def _inject_repack_script(self): - """Injects the _repack_model.py script into S3 or local source directory. + def _inject_repack_script_and_launcher(self): + """Injects the _repack_model.py script and _repack_script_launcher.sh + + into S3 or local source directory. + + Note: The bash file is needed because if not supplied, the SKLearn + training job will auto install all dependencies listed in requirements.txt. + However, this auto install behavior is not expected in _RepackModelStep, + since it should just simply repack the model along with other supplied files, + e.g. the requirements.txt. If the source_dir is an S3 path: 1) downloads the source_dir tar.gz 2) extracts it 3) copies the _repack_model.py script into the extracted directory - 4) rezips the directory - 5) overwrites the S3 source_dir with the new tar.gz + 4) creates the _repack_script_launcher.sh in the extracted dir + 5) rezips the directory + 6) overwrites the S3 source_dir with the new tar.gz If the source_dir is a local path: 1) copies the _repack_model.py script into the source dir + 2) creates the _repack_script_launcher.sh in the source dir """ fname = os.path.join(os.path.dirname(__file__), REPACK_SCRIPT) if self._source_dir.lower().startswith("s3://"): @@ -231,6 +256,10 @@ def _inject_repack_script(self): t.extractall(path=targz_contents_dir) shutil.copy2(fname, os.path.join(targz_contents_dir, REPACK_SCRIPT)) + with open( + os.path.join(targz_contents_dir, REPACK_SCRIPT_LAUNCHER), "w" + ) as launcher_file: + launcher_file.write(LAUNCH_REPACK_SCRIPT_CMD) new_targz_path = os.path.join(tmp, "new.tar.gz") with tarfile.open(new_targz_path, mode="w:gz") as t: @@ -239,6 +268,8 @@ def _inject_repack_script(self): _save_model(self._source_dir, new_targz_path, self.sagemaker_session, kms_key=None) else: shutil.copy2(fname, os.path.join(self._source_dir, REPACK_SCRIPT)) + with open(os.path.join(self._source_dir, REPACK_SCRIPT_LAUNCHER), "w") as launcher_file: + launcher_file.write(LAUNCH_REPACK_SCRIPT_CMD) @property def arguments(self) -> RequestType: diff --git a/tests/data/pipeline/model_step/pytorch_mnist/mnist.py b/tests/data/pipeline/model_step/pytorch_mnist/mnist.py new file mode 100644 index 0000000000..ef1c15ae60 --- /dev/null +++ b/tests/data/pipeline/model_step/pytorch_mnist/mnist.py @@ -0,0 +1,239 @@ +import argparse +import json +import logging +import os +import sys +import torch +import torch.distributed as dist +import torch.nn as nn +import torch.nn.functional as F +import torch.optim as optim +import torch.utils.data +import torch.utils.data.distributed +from torchvision import datasets, transforms + +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) +logger.addHandler(logging.StreamHandler(sys.stdout)) + + +class Net(nn.Module): + # Based on https://github.com/pytorch/examples/blob/master/mnist/main.py + def __init__(self): + logger.info("Create neural network module") + + super(Net, self).__init__() + self.conv1 = nn.Conv2d(1, 10, kernel_size=5) + self.conv2 = nn.Conv2d(10, 20, kernel_size=5) + self.conv2_drop = nn.Dropout2d() + self.fc1 = nn.Linear(320, 50) + self.fc2 = nn.Linear(50, 10) + + def forward(self, x): + x = F.relu(F.max_pool2d(self.conv1(x), 2)) + x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2)) + x = x.view(-1, 320) + x = F.relu(self.fc1(x)) + x = F.dropout(x, training=self.training) + x = self.fc2(x) + return F.log_softmax(x, dim=1) + + +def _get_train_data_loader(training_dir, is_distributed, batch_size, **kwargs): + logger.info("Get train data loader") + dataset = datasets.MNIST( + training_dir, + train=True, + transform=transforms.Compose( + [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))] + ), + download=False, # True sets a dependency on an external site for our canaries. + ) + train_sampler = ( + torch.utils.data.distributed.DistributedSampler(dataset) if is_distributed else None + ) + train_loader = torch.utils.data.DataLoader( + dataset, + batch_size=batch_size, + shuffle=train_sampler is None, + sampler=train_sampler, + **kwargs + ) + return train_sampler, train_loader + + +def _get_test_data_loader(training_dir, **kwargs): + logger.info("Get test data loader") + return torch.utils.data.DataLoader( + datasets.MNIST( + training_dir, + train=False, + transform=transforms.Compose( + [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))] + ), + download=False, # True sets a dependency on an external site for our canaries. + ), + batch_size=1000, + shuffle=True, + **kwargs + ) + + +def _average_gradients(model): + # Gradient averaging. + size = float(dist.get_world_size()) + for param in model.parameters(): + dist.all_reduce(param.grad.data, op=dist.reduce_op.SUM, group=0) + param.grad.data /= size + + +def train(args): + world_size = len(args.hosts) + is_distributed = world_size > 1 + logger.debug("Number of hosts {}. Distributed training - {}".format(world_size, is_distributed)) + use_cuda = args.num_gpus > 0 + logger.debug("Number of gpus available - {}".format(args.num_gpus)) + kwargs = {"num_workers": 1, "pin_memory": True} if use_cuda else {} + device = torch.device("cuda" if use_cuda else "cpu") + + if is_distributed: + # Initialize the distributed environment. + backend = "gloo" + os.environ["WORLD_SIZE"] = str(world_size) + host_rank = args.hosts.index(args.current_host) + dist.init_process_group(backend=backend, rank=host_rank, world_size=world_size) + logger.info( + "Initialized the distributed environment: '{}' backend on {} nodes. ".format( + backend, dist.get_world_size() + ) + + "Current host rank is {}. Is cuda available: {}. Number of gpus: {}".format( + dist.get_rank(), torch.cuda.is_available(), args.num_gpus + ) + ) + + # set the seed for generating random numbers + seed = 1 + torch.manual_seed(seed) + if use_cuda: + torch.cuda.manual_seed(seed) + + train_sampler, train_loader = _get_train_data_loader( + args.data_dir, is_distributed, args.batch_size, **kwargs + ) + test_loader = _get_test_data_loader(args.data_dir, **kwargs) + + logger.debug( + "Processes {}/{} ({:.0f}%) of train data".format( + len(train_loader.sampler), + len(train_loader.dataset), + 100.0 * len(train_loader.sampler) / len(train_loader.dataset), + ) + ) + + logger.debug( + "Processes {}/{} ({:.0f}%) of test data".format( + len(test_loader.sampler), + len(test_loader.dataset), + 100.0 * len(test_loader.sampler) / len(test_loader.dataset), + ) + ) + + model = Net().to(device) + if is_distributed and use_cuda: + # multi-machine multi-gpu case + logger.debug("Multi-machine multi-gpu: using DistributedDataParallel.") + model = torch.nn.parallel.DistributedDataParallel(model) + elif use_cuda: + # single-machine multi-gpu case + logger.debug("Single-machine multi-gpu: using DataParallel().cuda().") + model = torch.nn.DataParallel(model) + else: + # single-machine or multi-machine cpu case + logger.debug("Single-machine/multi-machine cpu: using DataParallel.") + model = torch.nn.DataParallel(model) + + optimizer = optim.SGD(model.parameters(), lr=0.1, momentum=0.5) + + log_interval = 100 + for epoch in range(1, args.epochs + 1): + if is_distributed: + train_sampler.set_epoch(epoch) + model.train() + for batch_idx, (data, target) in enumerate(train_loader, 1): + data, target = data.to(device), target.to(device) + optimizer.zero_grad() + output = model(data) + loss = F.nll_loss(output, target) + loss.backward() + if is_distributed and not use_cuda: + # average gradients manually for multi-machine cpu case only + _average_gradients(model) + optimizer.step() + if batch_idx % log_interval == 0: + logger.debug( + "Train Epoch: {} [{}/{} ({:.0f}%)] Loss: {:.6f}".format( + epoch, + batch_idx * len(data), + len(train_loader.sampler), + 100.0 * batch_idx / len(train_loader), + loss.item(), + ) + ) + accuracy = test(model, test_loader, device) + save_model(model, args.model_dir) + + logger.debug("Overall test accuracy: {};".format(accuracy)) + + +def test(model, test_loader, device): + model.eval() + test_loss = 0 + correct = 0 + with torch.no_grad(): + for data, target in test_loader: + data, target = data.to(device), target.to(device) + output = model(data) + test_loss += F.nll_loss(output, target, size_average=False).item() # sum up batch loss + pred = output.max(1, keepdim=True)[1] # get the index of the max log-probability + correct += pred.eq(target.view_as(pred)).sum().item() + + test_loss /= len(test_loader.dataset) + accuracy = 100.0 * correct / len(test_loader.dataset) + + logger.debug( + "Test set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n".format( + test_loss, correct, len(test_loader.dataset), accuracy + ) + ) + + return accuracy + + +def model_fn(model_dir): + model = torch.nn.DataParallel(Net()) + with open(os.path.join(model_dir, "model.pth"), "rb") as f: + model.load_state_dict(torch.load(f)) + return model + + +def save_model(model, model_dir): + logger.info("Saving the model.") + path = os.path.join(model_dir, "model.pth") + # recommended way from http://pytorch.org/docs/master/notes/serialization.html + torch.save(model.state_dict(), path) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--epochs", type=int, default=1, metavar="N") + parser.add_argument("--batch-size", type=int, default=64, metavar="N") + + # Container environment + parser.add_argument("--hosts", type=list, default=json.loads(os.environ["SM_HOSTS"])) + parser.add_argument("--current-host", type=str, default=os.environ["SM_CURRENT_HOST"]) + parser.add_argument("--model-dir", type=str, default=os.environ["SM_MODEL_DIR"]) + parser.add_argument("--data-dir", type=str, default=os.environ["SM_CHANNEL_TRAINING"]) + parser.add_argument("--num-gpus", type=int, default=os.environ["SM_NUM_GPUS"]) + parser.add_argument("--num-cpus", type=int, default=os.environ["SM_NUM_CPUS"]) + + train(parser.parse_args()) diff --git a/tests/data/pipeline/model_step/pytorch_mnist/requirements.txt b/tests/data/pipeline/model_step/pytorch_mnist/requirements.txt new file mode 100644 index 0000000000..56d09228be --- /dev/null +++ b/tests/data/pipeline/model_step/pytorch_mnist/requirements.txt @@ -0,0 +1 @@ +scipy>=1.8.1 diff --git a/tests/data/pipeline/model_step/pytorch_mnist/training/MNIST/processed/test.pt b/tests/data/pipeline/model_step/pytorch_mnist/training/MNIST/processed/test.pt new file mode 100644 index 0000000000..1a15176532 Binary files /dev/null and b/tests/data/pipeline/model_step/pytorch_mnist/training/MNIST/processed/test.pt differ diff --git a/tests/data/pipeline/model_step/pytorch_mnist/training/MNIST/processed/training.pt b/tests/data/pipeline/model_step/pytorch_mnist/training/MNIST/processed/training.pt new file mode 100644 index 0000000000..77438dd6a0 Binary files /dev/null and b/tests/data/pipeline/model_step/pytorch_mnist/training/MNIST/processed/training.pt differ diff --git a/tests/integ/s3_utils.py b/tests/integ/s3_utils.py index 6f09364b81..58a403341e 100644 --- a/tests/integ/s3_utils.py +++ b/tests/integ/s3_utils.py @@ -12,7 +12,9 @@ # language governing permissions and limitations under the License. from __future__ import absolute_import +import os import re +import tarfile import boto3 from six.moves.urllib.parse import urlparse @@ -43,3 +45,14 @@ def assert_s3_file_patterns_exist(sagemaker_session, s3_url, file_patterns): found = [x["Key"] for x in contents if search_pattern.search(x["Key"])] if not found: raise ValueError("File {} is not found under {}".format(pattern, s3_url)) + + +def extract_files_from_s3(s3_url, tmpdir, sagemaker_session): + parsed_url = urlparse(s3_url) + s3 = boto3.resource("s3", region_name=sagemaker_session.boto_region_name) + + model = os.path.join(tmpdir, "model") + s3.Bucket(parsed_url.netloc).download_file(parsed_url.path.lstrip("/"), model) + + with tarfile.open(model, "r") as tar_file: + tar_file.extractall(tmpdir) diff --git a/tests/integ/sagemaker/workflow/test_workflow.py b/tests/integ/sagemaker/workflow/test_workflow.py index 6d6a498761..c91d900cf5 100644 --- a/tests/integ/sagemaker/workflow/test_workflow.py +++ b/tests/integ/sagemaker/workflow/test_workflow.py @@ -15,6 +15,7 @@ import json import os import re +import tempfile import time from contextlib import contextmanager @@ -23,7 +24,12 @@ from botocore.exceptions import WaiterError import pandas as pd -from sagemaker.workflow.model_step import ModelStep, _REGISTER_MODEL_NAME_BASE +from tests.integ.s3_utils import extract_files_from_s3 +from sagemaker.workflow.model_step import ( + ModelStep, + _REGISTER_MODEL_NAME_BASE, + _REPACK_MODEL_NAME_BASE, +) from sagemaker.parameter import IntegerParameter from sagemaker.pytorch import PyTorch, PyTorchModel from sagemaker.tuner import HyperparameterTuner @@ -1021,8 +1027,8 @@ def test_model_registration_with_tuning_model( pipeline_name, region_name, ): - base_dir = os.path.join(DATA_DIR, "pytorch_mnist") - entry_point = os.path.join(base_dir, "mnist.py") + base_dir = os.path.join(DATA_DIR, "pipeline/model_step/pytorch_mnist") + entry_point = "mnist.py" input_path = pipeline_session.upload_data( path=os.path.join(base_dir, "training"), key_prefix="integ-test-data/pytorch_mnist/training", @@ -1036,9 +1042,10 @@ def test_model_registration_with_tuning_model( # since instance_type is used to retrieve image_uri in compile time (PySDK) pytorch_estimator = PyTorch( entry_point=entry_point, + source_dir=base_dir, role=role, - framework_version="1.5.0", - py_version="py3", + framework_version="1.10", + py_version="py38", instance_count=instance_count, instance_type=instance_type, sagemaker_session=pipeline_session, @@ -1073,7 +1080,9 @@ def test_model_registration_with_tuning_model( s3_bucket=pipeline_session.default_bucket(), ), entry_point=entry_point, - framework_version="1.5.0", + source_dir=base_dir, + framework_version="1.10", + py_version="py38", sagemaker_session=pipeline_session, ) step_model_regis_args = model.register( @@ -1119,9 +1128,38 @@ def test_model_registration_with_tuning_model( assert step["StepStatus"] == "Succeeded" if _REGISTER_MODEL_NAME_BASE in step["StepName"]: assert step["Metadata"]["RegisterModel"] + if _REPACK_MODEL_NAME_BASE in step["StepName"]: + _verify_repack_output(step, pipeline_session) assert len(execution_steps) == 3 finally: try: pipeline.delete() except Exception: pass + + +def _verify_repack_output(repack_step_dict, sagemaker_session): + # This is to verify if the `requirements.txt` provided in ModelStep + # is not auto installed in the Repack step but is successfully repacked + # in the new model.tar.gz + # The repack step is using an old version of SKLearn framework "0.23-1" + # so if the `requirements.txt` is auto installed, it should raise an exception + # caused by the unsupported library version listed in the `requirements.txt` + training_job_arn = repack_step_dict["Metadata"]["TrainingJob"]["Arn"] + job_description = sagemaker_session.sagemaker_client.describe_training_job( + TrainingJobName=training_job_arn.split("/")[1] + ) + model_uri = job_description["ModelArtifacts"]["S3ModelArtifacts"] + with tempfile.TemporaryDirectory() as tmp: + extract_files_from_s3(s3_url=model_uri, tmpdir=tmp, sagemaker_session=sagemaker_session) + + def walk(): + results = set() + for root, dirs, files in os.walk(tmp): + relative_path = root.replace(tmp, "") + for f in files: + results.add(f"{relative_path}/{f}") + return results + + tar_files = walk() + assert {"/code/mnist.py", "/code/requirements.txt", "/model.pth"}.issubset(tar_files) diff --git a/tests/integ/test_horovod.py b/tests/integ/test_horovod.py index 7be3bc1abd..39b0b1af81 100644 --- a/tests/integ/test_horovod.py +++ b/tests/integ/test_horovod.py @@ -14,14 +14,12 @@ import json import os -import tarfile -import boto3 import pytest -from six.moves.urllib.parse import urlparse import sagemaker.utils import tests.integ as integ +from tests.integ.s3_utils import extract_files_from_s3 from tests.integ.utils import gpu_list, retry_with_instance_list from sagemaker.tensorflow import TensorFlow from tests.integ import timeout @@ -74,17 +72,6 @@ def read_json(file, tmp): return json.load(f) -def extract_files_from_s3(s3_url, tmpdir, sagemaker_session): - parsed_url = urlparse(s3_url) - s3 = boto3.resource("s3", region_name=sagemaker_session.boto_region_name) - - model = os.path.join(tmpdir, "model") - s3.Bucket(parsed_url.netloc).download_file(parsed_url.path.lstrip("/"), model) - - with tarfile.open(model, "r") as tar_file: - tar_file.extractall(tmpdir) - - def _create_and_fit_estimator(sagemaker_session, tf_version, py_version, instance_type, tmpdir): job_name = sagemaker.utils.unique_name_from_base("tf-horovod") estimator = TensorFlow( diff --git a/tests/integ/test_horovod_mx.py b/tests/integ/test_horovod_mx.py index eba48b2f8d..7bd6a641e0 100644 --- a/tests/integ/test_horovod_mx.py +++ b/tests/integ/test_horovod_mx.py @@ -14,14 +14,12 @@ import json import os -import tarfile -import boto3 import pytest -from six.moves.urllib.parse import urlparse import sagemaker.utils import tests.integ as integ +from tests.integ.s3_utils import extract_files_from_s3 from sagemaker.mxnet import MXNet from tests.integ import timeout from tests.integ.utils import gpu_list, retry_with_instance_list @@ -74,17 +72,6 @@ def read_json(file, tmp): return json.load(f) -def extract_files_from_s3(s3_url, tmpdir, sagemaker_session): - parsed_url = urlparse(s3_url) - s3 = boto3.resource("s3", region_name=sagemaker_session.boto_region_name) - - model = os.path.join(tmpdir, "model") - s3.Bucket(parsed_url.netloc).download_file(parsed_url.path.lstrip("/"), model) - - with tarfile.open(model, "r") as tar_file: - tar_file.extractall(tmpdir) - - def _create_and_fit_estimator(mxnet_version, py_version, sagemaker_session, instance_type, tmpdir): job_name = sagemaker.utils.unique_name_from_base("mx-horovod") estimator = MXNet( diff --git a/tests/unit/sagemaker/workflow/test_model_step.py b/tests/unit/sagemaker/workflow/test_model_step.py index cfeb8d5a03..9874ba9c23 100644 --- a/tests/unit/sagemaker/workflow/test_model_step.py +++ b/tests/unit/sagemaker/workflow/test_model_step.py @@ -32,6 +32,7 @@ from sagemaker.tensorflow import TensorFlowModel from sagemaker.transformer import Transformer from sagemaker.tuner import HyperparameterTuner +from sagemaker.workflow._utils import REPACK_SCRIPT_LAUNCHER from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo from sagemaker.workflow.model_step import ( @@ -188,7 +189,9 @@ def test_register_model_with_runtime_repack(pipeline_session, model_data_param, } assert arguments["HyperParameters"]["inference_script"] == '"dummy_script.py"' assert arguments["HyperParameters"]["model_archive"] == {"Get": "Parameters.ModelData"} - assert arguments["HyperParameters"]["sagemaker_program"] == '"_repack_model.py"' + assert ( + arguments["HyperParameters"]["sagemaker_program"] == f'"{REPACK_SCRIPT_LAUNCHER}"' + ) assert "s3://" in arguments["HyperParameters"]["sagemaker_submit_directory"] assert arguments["HyperParameters"]["dependencies"] == "null" assert step["RetryPolicies"] == [ @@ -269,7 +272,9 @@ def test_create_model_with_runtime_repack(pipeline_session, model_data_param, mo } assert arguments["HyperParameters"]["inference_script"] == '"dummy_script.py"' assert arguments["HyperParameters"]["model_archive"] == {"Get": "Parameters.ModelData"} - assert arguments["HyperParameters"]["sagemaker_program"] == '"_repack_model.py"' + assert ( + arguments["HyperParameters"]["sagemaker_program"] == f'"{REPACK_SCRIPT_LAUNCHER}"' + ) assert "s3://" in arguments["HyperParameters"]["sagemaker_submit_directory"] assert arguments["HyperParameters"]["dependencies"] == "null" assert "repack a model with customer scripts" in step["Description"] @@ -360,7 +365,9 @@ def test_create_pipeline_model_with_runtime_repack(pipeline_session, model_data_ } assert arguments["HyperParameters"]["inference_script"] == '"dummy_script.py"' assert arguments["HyperParameters"]["model_archive"] == {"Get": "Parameters.ModelData"} - assert arguments["HyperParameters"]["sagemaker_program"] == '"_repack_model.py"' + assert ( + arguments["HyperParameters"]["sagemaker_program"] == f'"{REPACK_SCRIPT_LAUNCHER}"' + ) assert "s3://" in arguments["HyperParameters"]["sagemaker_submit_directory"] assert arguments["HyperParameters"]["dependencies"] == "null" assert step["RetryPolicies"] == [ @@ -460,7 +467,9 @@ def test_register_pipeline_model_with_runtime_repack(pipeline_session, model_dat } assert arguments["HyperParameters"]["inference_script"] == '"dummy_script.py"' assert arguments["HyperParameters"]["model_archive"] == {"Get": "Parameters.ModelData"} - assert arguments["HyperParameters"]["sagemaker_program"] == '"_repack_model.py"' + assert ( + arguments["HyperParameters"]["sagemaker_program"] == f'"{REPACK_SCRIPT_LAUNCHER}"' + ) assert "s3://" in arguments["HyperParameters"]["sagemaker_submit_directory"] assert arguments["HyperParameters"]["dependencies"] == "null" elif step["Type"] == "RegisterModel": @@ -641,7 +650,9 @@ def test_conditional_model_create_and_regis( } assert arguments["HyperParameters"]["inference_script"] == '"dummy_script.py"' assert arguments["HyperParameters"]["model_archive"] == {"Get": "Parameters.ModelData"} - assert arguments["HyperParameters"]["sagemaker_program"] == '"_repack_model.py"' + assert ( + arguments["HyperParameters"]["sagemaker_program"] == f'"{REPACK_SCRIPT_LAUNCHER}"' + ) assert "s3://" in arguments["HyperParameters"]["sagemaker_submit_directory"] assert arguments["HyperParameters"]["dependencies"] == "null" elif step["Type"] == "RegisterModel": diff --git a/tests/unit/sagemaker/workflow/test_step_collections.py b/tests/unit/sagemaker/workflow/test_step_collections.py index d3d1ab022b..2bf47a79d0 100644 --- a/tests/unit/sagemaker/workflow/test_step_collections.py +++ b/tests/unit/sagemaker/workflow/test_step_collections.py @@ -20,6 +20,7 @@ import pytest from sagemaker.drift_check_baselines import DriftCheckBaselines +from sagemaker.workflow._utils import REPACK_SCRIPT_LAUNCHER from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.conditions import ConditionEquals from sagemaker.workflow.model_step import ( @@ -799,7 +800,7 @@ def test_register_model_with_model_repack_with_estimator( "inference_script": '"dummy_script.py"', "dependencies": f'"{dummy_requirements}"', "model_archive": '"s3://my-bucket/model.tar.gz"', - "sagemaker_program": '"_repack_model.py"', + "sagemaker_program": f'"{REPACK_SCRIPT_LAUNCHER}"', "sagemaker_container_log_level": "20", "sagemaker_region": f'"{REGION}"', "source_dir": "null", @@ -925,7 +926,7 @@ def test_register_model_with_model_repack_with_model(model, model_metrics, drift "HyperParameters": { "inference_script": '"dummy_script.py"', "model_archive": '"s3://my-bucket/model.tar.gz"', - "sagemaker_program": '"_repack_model.py"', + "sagemaker_program": f'"{REPACK_SCRIPT_LAUNCHER}"', "sagemaker_container_log_level": "20", "sagemaker_region": f'"{REGION}"', "dependencies": "null", @@ -1055,7 +1056,7 @@ def test_register_model_with_model_repack_with_pipeline_model( "dependencies": "null", "inference_script": '"dummy_script.py"', "model_archive": '"s3://my-bucket/model.tar.gz"', - "sagemaker_program": '"_repack_model.py"', + "sagemaker_program": f'"{REPACK_SCRIPT_LAUNCHER}"', "sagemaker_container_log_level": "20", "sagemaker_region": f'"{REGION}"', "source_dir": "null", @@ -1267,7 +1268,7 @@ def test_estimator_transformer_with_model_repack_with_estimator(estimator): "model_archive": '"s3://my-bucket/model.tar.gz"', "dependencies": "null", "source_dir": "null", - "sagemaker_program": '"_repack_model.py"', + "sagemaker_program": f'"{REPACK_SCRIPT_LAUNCHER}"', "sagemaker_container_log_level": "20", "sagemaker_region": '"us-west-2"', }, diff --git a/tests/unit/sagemaker/workflow/test_utils.py b/tests/unit/sagemaker/workflow/test_utils.py index e4eb05110c..dcbf5a6421 100644 --- a/tests/unit/sagemaker/workflow/test_utils.py +++ b/tests/unit/sagemaker/workflow/test_utils.py @@ -26,7 +26,12 @@ ) from sagemaker.estimator import Estimator -from sagemaker.workflow._utils import _RepackModelStep, _RegisterModelStep +from sagemaker.workflow._utils import ( + _RepackModelStep, + _RegisterModelStep, + REPACK_SCRIPT, + REPACK_SCRIPT_LAUNCHER, +) from sagemaker.workflow.properties import Properties from tests.unit.test_utils import FakeS3, list_tar_files from tests.unit import DATA_DIR @@ -115,14 +120,19 @@ def test_repack_model_step(estimator): depends_on=["TestStep"], ) request_dict = step.to_request() + # No source_dir supplied to _RepackModelStep + # so a temp dir will be created and + # the repack script and launcher files will be moved/created there + assert os.path.isfile(f"{step._source_dir}/{REPACK_SCRIPT}") + assert os.path.isfile(f"{step._source_dir}/{REPACK_SCRIPT_LAUNCHER}") hyperparameters = request_dict["Arguments"]["HyperParameters"] assert hyperparameters["inference_script"] == '"dummy_script.py"' assert hyperparameters["model_archive"] == '"s3://my-bucket/model.tar.gz"' - assert hyperparameters["sagemaker_program"] == '"_repack_model.py"' + assert hyperparameters["sagemaker_program"] == f'"{REPACK_SCRIPT_LAUNCHER}"' assert ( hyperparameters["sagemaker_submit_directory"] - == '"s3://my-bucket/MyRepackModelStep-1be10316814854973ed1b445db3ef84e/source/sourcedir.tar.gz"' + == '"s3://my-bucket/MyRepackModelStep-b5ea77f701b47a8d075605497462ccc2/source/sourcedir.tar.gz"' ) del request_dict["Arguments"]["HyperParameters"] @@ -195,14 +205,17 @@ def test_repack_model_step_with_source_dir(estimator, source_dir): source_dir=source_dir, ) request_dict = step.to_request() - assert os.path.isfile(f"{source_dir}/_repack_model.py") + # The repack script and launcher files will be moved/created to + # the specified source_dir + assert os.path.isfile(f"{source_dir}/{REPACK_SCRIPT}") + assert os.path.isfile(f"{source_dir}/{REPACK_SCRIPT_LAUNCHER}") hyperparameters = request_dict["Arguments"]["HyperParameters"] assert hyperparameters["inference_script"] == '"inference.py"' assert hyperparameters["model_archive"].expr == { "Std:Join": {"On": "", "Values": [{"Get": "Steps.MyStep"}]} } - assert hyperparameters["sagemaker_program"] == '"_repack_model.py"' + assert hyperparameters["sagemaker_program"] == f'"{REPACK_SCRIPT_LAUNCHER}"' del request_dict["Arguments"]["HyperParameters"] del request_dict["Arguments"]["AlgorithmSpecification"]["TrainingImage"] @@ -250,7 +263,6 @@ def fake_s3(tmp): def test_inject_repack_script_s3(estimator, tmp, fake_s3): - create_file_tree( tmp, [ @@ -274,12 +286,13 @@ def test_inject_repack_script_s3(estimator, tmp, fake_s3): fake_s3.tar_and_upload("model-dir", "s3://fake/location") - step._inject_repack_script() + step._prepare_for_repacking() assert list_tar_files(fake_s3.fake_upload_path, tmp) == { "/aa", "/foo/inference.py", - "/_repack_model.py", + f"/{REPACK_SCRIPT}", + f"/{REPACK_SCRIPT_LAUNCHER}", }