Skip to content

Commit 6960f3f

Browse files
qidewenwhenDewen Qi
authored andcommitted
fix: Fix Repack step auto install behavior (aws#3419)
* fix: Fix Repack step auto install behavior * fix doc8 and unit tests Co-authored-by: Dewen Qi <[email protected]>
1 parent dd7b72c commit 6960f3f

File tree

13 files changed

+381
-58
lines changed

13 files changed

+381
-58
lines changed

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,6 @@ venv/
2929
env/
3030
.vscode/
3131
**/tmp
32-
.python-version
32+
.python-version
33+
**/_repack_model.py
34+
**/_repack_script_launcher.sh

src/sagemaker/workflow/_utils.py

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,21 @@
4040
FRAMEWORK_VERSION = "0.23-1"
4141
INSTANCE_TYPE = "ml.m5.large"
4242
REPACK_SCRIPT = "_repack_model.py"
43+
REPACK_SCRIPT_LAUNCHER = "_repack_script_launcher.sh"
44+
LAUNCH_REPACK_SCRIPT_CMD = """
45+
#!/bin/bash
46+
47+
var_dependencies="${SM_HP_DEPENDENCIES}"
48+
var_inference_script="${SM_HP_INFERENCE_SCRIPT}"
49+
var_model_archive="${SM_HP_MODEL_ARCHIVE}"
50+
var_source_dir="${SM_HP_SOURCE_DIR}"
51+
52+
python _repack_model.py \
53+
--dependencies "${var_dependencies}" \
54+
--inference_script "${var_inference_script}" \
55+
--model_archive "${var_model_archive}" \
56+
--source_dir "${var_source_dir}"
57+
"""
4358

4459

4560
class _RepackModelStep(TrainingStep):
@@ -155,7 +170,7 @@ def __init__(
155170
repacker = SKLearn(
156171
framework_version=FRAMEWORK_VERSION,
157172
instance_type=INSTANCE_TYPE,
158-
entry_point=REPACK_SCRIPT,
173+
entry_point=REPACK_SCRIPT_LAUNCHER,
159174
source_dir=self._source_dir,
160175
dependencies=self._dependencies,
161176
sagemaker_session=self.sagemaker_session,
@@ -189,7 +204,7 @@ def _prepare_for_repacking(self):
189204
if self._source_dir is None:
190205
self._establish_source_dir()
191206

192-
self._inject_repack_script()
207+
self._inject_repack_script_and_launcher()
193208

194209
def _establish_source_dir(self):
195210
"""If the source_dir is None, creates it for the repacking job.
@@ -206,18 +221,28 @@ def _establish_source_dir(self):
206221
shutil.copy2(self._entry_point, os.path.join(self._source_dir, self._entry_point_basename))
207222
self._entry_point = self._entry_point_basename
208223

209-
def _inject_repack_script(self):
210-
"""Injects the _repack_model.py script into S3 or local source directory.
224+
def _inject_repack_script_and_launcher(self):
225+
"""Injects the _repack_model.py script and _repack_script_launcher.sh
226+
227+
into S3 or local source directory.
228+
229+
Note: The bash file is needed because if not supplied, the SKLearn
230+
training job will auto install all dependencies listed in requirements.txt.
231+
However, this auto install behavior is not expected in _RepackModelStep,
232+
since it should just simply repack the model along with other supplied files,
233+
e.g. the requirements.txt.
211234
212235
If the source_dir is an S3 path:
213236
1) downloads the source_dir tar.gz
214237
2) extracts it
215238
3) copies the _repack_model.py script into the extracted directory
216-
4) rezips the directory
217-
5) overwrites the S3 source_dir with the new tar.gz
239+
4) creates the _repack_script_launcher.sh in the extracted dir
240+
5) rezips the directory
241+
6) overwrites the S3 source_dir with the new tar.gz
218242
219243
If the source_dir is a local path:
220244
1) copies the _repack_model.py script into the source dir
245+
2) creates the _repack_script_launcher.sh in the source dir
221246
"""
222247
fname = os.path.join(os.path.dirname(__file__), REPACK_SCRIPT)
223248
if self._source_dir.lower().startswith("s3://"):
@@ -231,6 +256,10 @@ def _inject_repack_script(self):
231256
t.extractall(path=targz_contents_dir)
232257

233258
shutil.copy2(fname, os.path.join(targz_contents_dir, REPACK_SCRIPT))
259+
with open(
260+
os.path.join(targz_contents_dir, REPACK_SCRIPT_LAUNCHER), "w"
261+
) as launcher_file:
262+
launcher_file.write(LAUNCH_REPACK_SCRIPT_CMD)
234263

235264
new_targz_path = os.path.join(tmp, "new.tar.gz")
236265
with tarfile.open(new_targz_path, mode="w:gz") as t:
@@ -239,6 +268,8 @@ def _inject_repack_script(self):
239268
_save_model(self._source_dir, new_targz_path, self.sagemaker_session, kms_key=None)
240269
else:
241270
shutil.copy2(fname, os.path.join(self._source_dir, REPACK_SCRIPT))
271+
with open(os.path.join(self._source_dir, REPACK_SCRIPT_LAUNCHER), "w") as launcher_file:
272+
launcher_file.write(LAUNCH_REPACK_SCRIPT_CMD)
242273

243274
@property
244275
def arguments(self) -> RequestType:
Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
import argparse
2+
import json
3+
import logging
4+
import os
5+
import sys
6+
import torch
7+
import torch.distributed as dist
8+
import torch.nn as nn
9+
import torch.nn.functional as F
10+
import torch.optim as optim
11+
import torch.utils.data
12+
import torch.utils.data.distributed
13+
from torchvision import datasets, transforms
14+
15+
logger = logging.getLogger(__name__)
16+
logger.setLevel(logging.DEBUG)
17+
logger.addHandler(logging.StreamHandler(sys.stdout))
18+
19+
20+
class Net(nn.Module):
21+
# Based on https://github.com/pytorch/examples/blob/master/mnist/main.py
22+
def __init__(self):
23+
logger.info("Create neural network module")
24+
25+
super(Net, self).__init__()
26+
self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
27+
self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
28+
self.conv2_drop = nn.Dropout2d()
29+
self.fc1 = nn.Linear(320, 50)
30+
self.fc2 = nn.Linear(50, 10)
31+
32+
def forward(self, x):
33+
x = F.relu(F.max_pool2d(self.conv1(x), 2))
34+
x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
35+
x = x.view(-1, 320)
36+
x = F.relu(self.fc1(x))
37+
x = F.dropout(x, training=self.training)
38+
x = self.fc2(x)
39+
return F.log_softmax(x, dim=1)
40+
41+
42+
def _get_train_data_loader(training_dir, is_distributed, batch_size, **kwargs):
43+
logger.info("Get train data loader")
44+
dataset = datasets.MNIST(
45+
training_dir,
46+
train=True,
47+
transform=transforms.Compose(
48+
[transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
49+
),
50+
download=False, # True sets a dependency on an external site for our canaries.
51+
)
52+
train_sampler = (
53+
torch.utils.data.distributed.DistributedSampler(dataset) if is_distributed else None
54+
)
55+
train_loader = torch.utils.data.DataLoader(
56+
dataset,
57+
batch_size=batch_size,
58+
shuffle=train_sampler is None,
59+
sampler=train_sampler,
60+
**kwargs
61+
)
62+
return train_sampler, train_loader
63+
64+
65+
def _get_test_data_loader(training_dir, **kwargs):
66+
logger.info("Get test data loader")
67+
return torch.utils.data.DataLoader(
68+
datasets.MNIST(
69+
training_dir,
70+
train=False,
71+
transform=transforms.Compose(
72+
[transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
73+
),
74+
download=False, # True sets a dependency on an external site for our canaries.
75+
),
76+
batch_size=1000,
77+
shuffle=True,
78+
**kwargs
79+
)
80+
81+
82+
def _average_gradients(model):
83+
# Gradient averaging.
84+
size = float(dist.get_world_size())
85+
for param in model.parameters():
86+
dist.all_reduce(param.grad.data, op=dist.reduce_op.SUM, group=0)
87+
param.grad.data /= size
88+
89+
90+
def train(args):
91+
world_size = len(args.hosts)
92+
is_distributed = world_size > 1
93+
logger.debug("Number of hosts {}. Distributed training - {}".format(world_size, is_distributed))
94+
use_cuda = args.num_gpus > 0
95+
logger.debug("Number of gpus available - {}".format(args.num_gpus))
96+
kwargs = {"num_workers": 1, "pin_memory": True} if use_cuda else {}
97+
device = torch.device("cuda" if use_cuda else "cpu")
98+
99+
if is_distributed:
100+
# Initialize the distributed environment.
101+
backend = "gloo"
102+
os.environ["WORLD_SIZE"] = str(world_size)
103+
host_rank = args.hosts.index(args.current_host)
104+
dist.init_process_group(backend=backend, rank=host_rank, world_size=world_size)
105+
logger.info(
106+
"Initialized the distributed environment: '{}' backend on {} nodes. ".format(
107+
backend, dist.get_world_size()
108+
)
109+
+ "Current host rank is {}. Is cuda available: {}. Number of gpus: {}".format(
110+
dist.get_rank(), torch.cuda.is_available(), args.num_gpus
111+
)
112+
)
113+
114+
# set the seed for generating random numbers
115+
seed = 1
116+
torch.manual_seed(seed)
117+
if use_cuda:
118+
torch.cuda.manual_seed(seed)
119+
120+
train_sampler, train_loader = _get_train_data_loader(
121+
args.data_dir, is_distributed, args.batch_size, **kwargs
122+
)
123+
test_loader = _get_test_data_loader(args.data_dir, **kwargs)
124+
125+
logger.debug(
126+
"Processes {}/{} ({:.0f}%) of train data".format(
127+
len(train_loader.sampler),
128+
len(train_loader.dataset),
129+
100.0 * len(train_loader.sampler) / len(train_loader.dataset),
130+
)
131+
)
132+
133+
logger.debug(
134+
"Processes {}/{} ({:.0f}%) of test data".format(
135+
len(test_loader.sampler),
136+
len(test_loader.dataset),
137+
100.0 * len(test_loader.sampler) / len(test_loader.dataset),
138+
)
139+
)
140+
141+
model = Net().to(device)
142+
if is_distributed and use_cuda:
143+
# multi-machine multi-gpu case
144+
logger.debug("Multi-machine multi-gpu: using DistributedDataParallel.")
145+
model = torch.nn.parallel.DistributedDataParallel(model)
146+
elif use_cuda:
147+
# single-machine multi-gpu case
148+
logger.debug("Single-machine multi-gpu: using DataParallel().cuda().")
149+
model = torch.nn.DataParallel(model)
150+
else:
151+
# single-machine or multi-machine cpu case
152+
logger.debug("Single-machine/multi-machine cpu: using DataParallel.")
153+
model = torch.nn.DataParallel(model)
154+
155+
optimizer = optim.SGD(model.parameters(), lr=0.1, momentum=0.5)
156+
157+
log_interval = 100
158+
for epoch in range(1, args.epochs + 1):
159+
if is_distributed:
160+
train_sampler.set_epoch(epoch)
161+
model.train()
162+
for batch_idx, (data, target) in enumerate(train_loader, 1):
163+
data, target = data.to(device), target.to(device)
164+
optimizer.zero_grad()
165+
output = model(data)
166+
loss = F.nll_loss(output, target)
167+
loss.backward()
168+
if is_distributed and not use_cuda:
169+
# average gradients manually for multi-machine cpu case only
170+
_average_gradients(model)
171+
optimizer.step()
172+
if batch_idx % log_interval == 0:
173+
logger.debug(
174+
"Train Epoch: {} [{}/{} ({:.0f}%)] Loss: {:.6f}".format(
175+
epoch,
176+
batch_idx * len(data),
177+
len(train_loader.sampler),
178+
100.0 * batch_idx / len(train_loader),
179+
loss.item(),
180+
)
181+
)
182+
accuracy = test(model, test_loader, device)
183+
save_model(model, args.model_dir)
184+
185+
logger.debug("Overall test accuracy: {};".format(accuracy))
186+
187+
188+
def test(model, test_loader, device):
189+
model.eval()
190+
test_loss = 0
191+
correct = 0
192+
with torch.no_grad():
193+
for data, target in test_loader:
194+
data, target = data.to(device), target.to(device)
195+
output = model(data)
196+
test_loss += F.nll_loss(output, target, size_average=False).item() # sum up batch loss
197+
pred = output.max(1, keepdim=True)[1] # get the index of the max log-probability
198+
correct += pred.eq(target.view_as(pred)).sum().item()
199+
200+
test_loss /= len(test_loader.dataset)
201+
accuracy = 100.0 * correct / len(test_loader.dataset)
202+
203+
logger.debug(
204+
"Test set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n".format(
205+
test_loss, correct, len(test_loader.dataset), accuracy
206+
)
207+
)
208+
209+
return accuracy
210+
211+
212+
def model_fn(model_dir):
213+
model = torch.nn.DataParallel(Net())
214+
with open(os.path.join(model_dir, "model.pth"), "rb") as f:
215+
model.load_state_dict(torch.load(f))
216+
return model
217+
218+
219+
def save_model(model, model_dir):
220+
logger.info("Saving the model.")
221+
path = os.path.join(model_dir, "model.pth")
222+
# recommended way from http://pytorch.org/docs/master/notes/serialization.html
223+
torch.save(model.state_dict(), path)
224+
225+
226+
if __name__ == "__main__":
227+
parser = argparse.ArgumentParser()
228+
parser.add_argument("--epochs", type=int, default=1, metavar="N")
229+
parser.add_argument("--batch-size", type=int, default=64, metavar="N")
230+
231+
# Container environment
232+
parser.add_argument("--hosts", type=list, default=json.loads(os.environ["SM_HOSTS"]))
233+
parser.add_argument("--current-host", type=str, default=os.environ["SM_CURRENT_HOST"])
234+
parser.add_argument("--model-dir", type=str, default=os.environ["SM_MODEL_DIR"])
235+
parser.add_argument("--data-dir", type=str, default=os.environ["SM_CHANNEL_TRAINING"])
236+
parser.add_argument("--num-gpus", type=int, default=os.environ["SM_NUM_GPUS"])
237+
parser.add_argument("--num-cpus", type=int, default=os.environ["SM_NUM_CPUS"])
238+
239+
train(parser.parse_args())
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
scipy>=1.8.1
Binary file not shown.
Binary file not shown.

tests/integ/s3_utils.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@
1212
# language governing permissions and limitations under the License.
1313
from __future__ import absolute_import
1414

15+
import os
1516
import re
17+
import tarfile
1618

1719
import boto3
1820
from six.moves.urllib.parse import urlparse
@@ -43,3 +45,14 @@ def assert_s3_file_patterns_exist(sagemaker_session, s3_url, file_patterns):
4345
found = [x["Key"] for x in contents if search_pattern.search(x["Key"])]
4446
if not found:
4547
raise ValueError("File {} is not found under {}".format(pattern, s3_url))
48+
49+
50+
def extract_files_from_s3(s3_url, tmpdir, sagemaker_session):
51+
parsed_url = urlparse(s3_url)
52+
s3 = boto3.resource("s3", region_name=sagemaker_session.boto_region_name)
53+
54+
model = os.path.join(tmpdir, "model")
55+
s3.Bucket(parsed_url.netloc).download_file(parsed_url.path.lstrip("/"), model)
56+
57+
with tarfile.open(model, "r") as tar_file:
58+
tar_file.extractall(tmpdir)

0 commit comments

Comments
 (0)