Skip to content

fix: Fix Repack step auto install behavior #3419

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,6 @@ venv/
env/
.vscode/
**/tmp
.python-version
.python-version
**/_repack_model.py
**/_repack_script_launcher.sh
43 changes: 37 additions & 6 deletions src/sagemaker/workflow/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,21 @@
FRAMEWORK_VERSION = "0.23-1"
INSTANCE_TYPE = "ml.m5.large"
REPACK_SCRIPT = "_repack_model.py"
REPACK_SCRIPT_LAUNCHER = "_repack_script_launcher.sh"
LAUNCH_REPACK_SCRIPT_CMD = """
#!/bin/bash

var_dependencies="${SM_HP_DEPENDENCIES}"
var_inference_script="${SM_HP_INFERENCE_SCRIPT}"
var_model_archive="${SM_HP_MODEL_ARCHIVE}"
var_source_dir="${SM_HP_SOURCE_DIR}"

python _repack_model.py \
--dependencies "${var_dependencies}" \
--inference_script "${var_inference_script}" \
--model_archive "${var_model_archive}" \
--source_dir "${var_source_dir}"
"""


class _RepackModelStep(TrainingStep):
Expand Down Expand Up @@ -155,7 +170,7 @@ def __init__(
repacker = SKLearn(
framework_version=FRAMEWORK_VERSION,
instance_type=INSTANCE_TYPE,
entry_point=REPACK_SCRIPT,
entry_point=REPACK_SCRIPT_LAUNCHER,
source_dir=self._source_dir,
dependencies=self._dependencies,
sagemaker_session=self.sagemaker_session,
Expand Down Expand Up @@ -189,7 +204,7 @@ def _prepare_for_repacking(self):
if self._source_dir is None:
self._establish_source_dir()

self._inject_repack_script()
self._inject_repack_script_and_launcher()

def _establish_source_dir(self):
"""If the source_dir is None, creates it for the repacking job.
Expand All @@ -206,18 +221,28 @@ def _establish_source_dir(self):
shutil.copy2(self._entry_point, os.path.join(self._source_dir, self._entry_point_basename))
self._entry_point = self._entry_point_basename

def _inject_repack_script(self):
"""Injects the _repack_model.py script into S3 or local source directory.
def _inject_repack_script_and_launcher(self):
"""Injects the _repack_model.py script and _repack_script_launcher.sh

into S3 or local source directory.

Note: The bash file is needed because if not supplied, the SKLearn
training job will auto install all dependencies listed in requirements.txt.
However, this auto install behavior is not expected in _RepackModelStep,
since it should just simply repack the model along with other supplied files,
e.g. the requirements.txt.

If the source_dir is an S3 path:
1) downloads the source_dir tar.gz
2) extracts it
3) copies the _repack_model.py script into the extracted directory
4) rezips the directory
5) overwrites the S3 source_dir with the new tar.gz
4) creates the _repack_script_launcher.sh in the extracted dir
5) rezips the directory
6) overwrites the S3 source_dir with the new tar.gz

If the source_dir is a local path:
1) copies the _repack_model.py script into the source dir
2) creates the _repack_script_launcher.sh in the source dir
"""
fname = os.path.join(os.path.dirname(__file__), REPACK_SCRIPT)
if self._source_dir.lower().startswith("s3://"):
Expand All @@ -231,6 +256,10 @@ def _inject_repack_script(self):
t.extractall(path=targz_contents_dir)

shutil.copy2(fname, os.path.join(targz_contents_dir, REPACK_SCRIPT))
with open(
os.path.join(targz_contents_dir, REPACK_SCRIPT_LAUNCHER), "w"
) as launcher_file:
launcher_file.write(LAUNCH_REPACK_SCRIPT_CMD)

new_targz_path = os.path.join(tmp, "new.tar.gz")
with tarfile.open(new_targz_path, mode="w:gz") as t:
Expand All @@ -239,6 +268,8 @@ def _inject_repack_script(self):
_save_model(self._source_dir, new_targz_path, self.sagemaker_session, kms_key=None)
else:
shutil.copy2(fname, os.path.join(self._source_dir, REPACK_SCRIPT))
with open(os.path.join(self._source_dir, REPACK_SCRIPT_LAUNCHER), "w") as launcher_file:
launcher_file.write(LAUNCH_REPACK_SCRIPT_CMD)

@property
def arguments(self) -> RequestType:
Expand Down
239 changes: 239 additions & 0 deletions tests/data/pipeline/model_step/pytorch_mnist/mnist.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
import argparse
import json
import logging
import os
import sys
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data
import torch.utils.data.distributed
from torchvision import datasets, transforms

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))


class Net(nn.Module):
# Based on https://github.com/pytorch/examples/blob/master/mnist/main.py
def __init__(self):
logger.info("Create neural network module")

super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
self.conv2_drop = nn.Dropout2d()
self.fc1 = nn.Linear(320, 50)
self.fc2 = nn.Linear(50, 10)

def forward(self, x):
x = F.relu(F.max_pool2d(self.conv1(x), 2))
x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
x = x.view(-1, 320)
x = F.relu(self.fc1(x))
x = F.dropout(x, training=self.training)
x = self.fc2(x)
return F.log_softmax(x, dim=1)


def _get_train_data_loader(training_dir, is_distributed, batch_size, **kwargs):
logger.info("Get train data loader")
dataset = datasets.MNIST(
training_dir,
train=True,
transform=transforms.Compose(
[transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
),
download=False, # True sets a dependency on an external site for our canaries.
)
train_sampler = (
torch.utils.data.distributed.DistributedSampler(dataset) if is_distributed else None
)
train_loader = torch.utils.data.DataLoader(
dataset,
batch_size=batch_size,
shuffle=train_sampler is None,
sampler=train_sampler,
**kwargs
)
return train_sampler, train_loader


def _get_test_data_loader(training_dir, **kwargs):
logger.info("Get test data loader")
return torch.utils.data.DataLoader(
datasets.MNIST(
training_dir,
train=False,
transform=transforms.Compose(
[transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
),
download=False, # True sets a dependency on an external site for our canaries.
),
batch_size=1000,
shuffle=True,
**kwargs
)


def _average_gradients(model):
# Gradient averaging.
size = float(dist.get_world_size())
for param in model.parameters():
dist.all_reduce(param.grad.data, op=dist.reduce_op.SUM, group=0)
param.grad.data /= size


def train(args):
world_size = len(args.hosts)
is_distributed = world_size > 1
logger.debug("Number of hosts {}. Distributed training - {}".format(world_size, is_distributed))
use_cuda = args.num_gpus > 0
logger.debug("Number of gpus available - {}".format(args.num_gpus))
kwargs = {"num_workers": 1, "pin_memory": True} if use_cuda else {}
device = torch.device("cuda" if use_cuda else "cpu")

if is_distributed:
# Initialize the distributed environment.
backend = "gloo"
os.environ["WORLD_SIZE"] = str(world_size)
host_rank = args.hosts.index(args.current_host)
dist.init_process_group(backend=backend, rank=host_rank, world_size=world_size)
logger.info(
"Initialized the distributed environment: '{}' backend on {} nodes. ".format(
backend, dist.get_world_size()
)
+ "Current host rank is {}. Is cuda available: {}. Number of gpus: {}".format(
dist.get_rank(), torch.cuda.is_available(), args.num_gpus
)
)

# set the seed for generating random numbers
seed = 1
torch.manual_seed(seed)
if use_cuda:
torch.cuda.manual_seed(seed)

train_sampler, train_loader = _get_train_data_loader(
args.data_dir, is_distributed, args.batch_size, **kwargs
)
test_loader = _get_test_data_loader(args.data_dir, **kwargs)

logger.debug(
"Processes {}/{} ({:.0f}%) of train data".format(
len(train_loader.sampler),
len(train_loader.dataset),
100.0 * len(train_loader.sampler) / len(train_loader.dataset),
)
)

logger.debug(
"Processes {}/{} ({:.0f}%) of test data".format(
len(test_loader.sampler),
len(test_loader.dataset),
100.0 * len(test_loader.sampler) / len(test_loader.dataset),
)
)

model = Net().to(device)
if is_distributed and use_cuda:
# multi-machine multi-gpu case
logger.debug("Multi-machine multi-gpu: using DistributedDataParallel.")
model = torch.nn.parallel.DistributedDataParallel(model)
elif use_cuda:
# single-machine multi-gpu case
logger.debug("Single-machine multi-gpu: using DataParallel().cuda().")
model = torch.nn.DataParallel(model)
else:
# single-machine or multi-machine cpu case
logger.debug("Single-machine/multi-machine cpu: using DataParallel.")
model = torch.nn.DataParallel(model)

optimizer = optim.SGD(model.parameters(), lr=0.1, momentum=0.5)

log_interval = 100
for epoch in range(1, args.epochs + 1):
if is_distributed:
train_sampler.set_epoch(epoch)
model.train()
for batch_idx, (data, target) in enumerate(train_loader, 1):
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
if is_distributed and not use_cuda:
# average gradients manually for multi-machine cpu case only
_average_gradients(model)
optimizer.step()
if batch_idx % log_interval == 0:
logger.debug(
"Train Epoch: {} [{}/{} ({:.0f}%)] Loss: {:.6f}".format(
epoch,
batch_idx * len(data),
len(train_loader.sampler),
100.0 * batch_idx / len(train_loader),
loss.item(),
)
)
accuracy = test(model, test_loader, device)
save_model(model, args.model_dir)

logger.debug("Overall test accuracy: {};".format(accuracy))


def test(model, test_loader, device):
model.eval()
test_loss = 0
correct = 0
with torch.no_grad():
for data, target in test_loader:
data, target = data.to(device), target.to(device)
output = model(data)
test_loss += F.nll_loss(output, target, size_average=False).item() # sum up batch loss
pred = output.max(1, keepdim=True)[1] # get the index of the max log-probability
correct += pred.eq(target.view_as(pred)).sum().item()

test_loss /= len(test_loader.dataset)
accuracy = 100.0 * correct / len(test_loader.dataset)

logger.debug(
"Test set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n".format(
test_loss, correct, len(test_loader.dataset), accuracy
)
)

return accuracy


def model_fn(model_dir):
model = torch.nn.DataParallel(Net())
with open(os.path.join(model_dir, "model.pth"), "rb") as f:
model.load_state_dict(torch.load(f))
return model


def save_model(model, model_dir):
logger.info("Saving the model.")
path = os.path.join(model_dir, "model.pth")
# recommended way from http://pytorch.org/docs/master/notes/serialization.html
torch.save(model.state_dict(), path)


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--epochs", type=int, default=1, metavar="N")
parser.add_argument("--batch-size", type=int, default=64, metavar="N")

# Container environment
parser.add_argument("--hosts", type=list, default=json.loads(os.environ["SM_HOSTS"]))
parser.add_argument("--current-host", type=str, default=os.environ["SM_CURRENT_HOST"])
parser.add_argument("--model-dir", type=str, default=os.environ["SM_MODEL_DIR"])
parser.add_argument("--data-dir", type=str, default=os.environ["SM_CHANNEL_TRAINING"])
parser.add_argument("--num-gpus", type=int, default=os.environ["SM_NUM_GPUS"])
parser.add_argument("--num-cpus", type=int, default=os.environ["SM_NUM_CPUS"])

train(parser.parse_args())
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
scipy>=1.8.1
Binary file not shown.
Binary file not shown.
13 changes: 13 additions & 0 deletions tests/integ/s3_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
# language governing permissions and limitations under the License.
from __future__ import absolute_import

import os
import re
import tarfile

import boto3
from six.moves.urllib.parse import urlparse
Expand Down Expand Up @@ -43,3 +45,14 @@ def assert_s3_file_patterns_exist(sagemaker_session, s3_url, file_patterns):
found = [x["Key"] for x in contents if search_pattern.search(x["Key"])]
if not found:
raise ValueError("File {} is not found under {}".format(pattern, s3_url))


def extract_files_from_s3(s3_url, tmpdir, sagemaker_session):
parsed_url = urlparse(s3_url)
s3 = boto3.resource("s3", region_name=sagemaker_session.boto_region_name)

model = os.path.join(tmpdir, "model")
s3.Bucket(parsed_url.netloc).download_file(parsed_url.path.lstrip("/"), model)

with tarfile.open(model, "r") as tar_file:
tar_file.extractall(tmpdir)
Loading