Skip to content

local mode: support output_path #449

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Nov 2, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
CHANGELOG
=========

1.12.1dev
=========
* enhancement: Local Mode: support output_path. Can be either file:// or s3://

1.12.0
======

Expand Down
3 changes: 3 additions & 0 deletions src/sagemaker/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ def __init__(self, role, train_instance_count, train_instance_type,

self.base_job_name = base_job_name
self._current_job_name = None
if (not self.sagemaker_session.local_mode
and output_path and output_path.startswith('file://')):
raise RuntimeError('file:// output paths are only supported in Local Mode')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👌

self.output_path = output_path
self.output_kms_key = output_kms_key
self.latest_training_job = None
Expand Down
18 changes: 6 additions & 12 deletions src/sagemaker/fw_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@

import os
import re
import tarfile
import tempfile
from collections import namedtuple
from six.moves.urllib.parse import urlparse

from sagemaker.utils import name_from_image
import sagemaker.utils

"""This module contains utility functions shared across ``Framework`` components."""

Expand Down Expand Up @@ -128,14 +126,9 @@ def tar_and_upload_dir(session, bucket, s3_key_prefix, script, directory):
s3 = session.resource('s3')
key = '{}/{}'.format(s3_key_prefix, 'sourcedir.tar.gz')

with tempfile.TemporaryFile() as f:
with tarfile.open(mode='w:gz', fileobj=f) as t:
for sf in source_files:
# Add all files from the directory into the root of the directory structure of the tar
t.add(sf, arcname=os.path.basename(sf))
# Need to reset the file descriptor position after writing to prepare for read
f.seek(0)
s3.Object(bucket, key).put(Body=f)
tar_file = sagemaker.utils.create_tar_file(source_files)
s3.Object(bucket, key).upload_file(tar_file)
os.remove(tar_file)

return UploadedCode(s3_prefix='s3://{}/{}'.format(bucket, key), script_name=script_name)

Expand Down Expand Up @@ -226,7 +219,8 @@ def model_code_key_prefix(code_location_key_prefix, model_name, image):
Returns:
str: the key prefix to be used in uploading code
"""
return '/'.join(filter(None, [code_location_key_prefix, model_name or name_from_image(image)]))
training_job_name = sagemaker.utils.name_from_image(image)
return '/'.join(filter(None, [code_location_key_prefix, model_name or training_job_name]))


def empty_framework_version_warning(default_version):
Expand Down
7 changes: 7 additions & 0 deletions src/sagemaker/local/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from __future__ import absolute_import

import os
import platform
import sys
import tempfile
from abc import ABCMeta
Expand Down Expand Up @@ -162,6 +163,12 @@ def __init__(self, bucket, prefix, sagemaker_session):
root_dir = os.path.abspath(root_dir)

working_dir = tempfile.mkdtemp(dir=root_dir)
# Docker cannot mount Mac OS /var folder properly see
# https://forums.docker.com/t/var-folders-isnt-mounted-properly/9600
# Only apply this workaround if the user didn't provide an alternate storage root dir.
if root_dir is None and platform.system() == 'Darwin':
working_dir = '/private{}'.format(working_dir)

sagemaker.utils.download_folder(bucket, prefix, working_dir, sagemaker_session)
self.files = LocalFileDataSource(working_dir)

Expand Down
11 changes: 8 additions & 3 deletions src/sagemaker/local/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,23 +46,28 @@ def __init__(self, container):
self.start_time = None
self.end_time = None

def start(self, input_data_config, hyperparameters, job_name):
def start(self, input_data_config, output_data_config, hyperparameters, job_name):
for channel in input_data_config:
if channel['DataSource'] and 'S3DataSource' in channel['DataSource']:
data_distribution = channel['DataSource']['S3DataSource']['S3DataDistributionType']
data_uri = channel['DataSource']['S3DataSource']['S3Uri']
elif channel['DataSource'] and 'FileDataSource' in channel['DataSource']:
data_distribution = channel['DataSource']['FileDataSource']['FileDataDistributionType']
data_uri = channel['DataSource']['FileDataSource']['FileUri']
else:
raise ValueError('Need channel[\'DataSource\'] to have [\'S3DataSource\'] or [\'FileDataSource\']')

# use a single Data URI - this makes handling S3 and File Data easier down the stack
channel['DataUri'] = data_uri

if data_distribution != 'FullyReplicated':
raise RuntimeError('DataDistribution: %s is not currently supported in Local Mode' %
data_distribution)

self.start = datetime.datetime.now()
self.state = self._TRAINING

self.model_artifacts = self.container.train(input_data_config, hyperparameters, job_name)
self.model_artifacts = self.container.train(input_data_config, output_data_config, hyperparameters, job_name)
self.end = datetime.datetime.now()
self.state = self._COMPLETED

Expand Down Expand Up @@ -298,7 +303,7 @@ def _perform_batch_inference(self, input_data, output_data, **kwargs):
if 'AssembleWith' in output_data and output_data['AssembleWith'] == 'Line':
f.write(b'\n')

move_to_destination(working_dir, output_data['S3OutputPath'], self.local_session)
move_to_destination(working_dir, output_data['S3OutputPath'], self.name, self.local_session)
self.container.stop_serving()


Expand Down
129 changes: 66 additions & 63 deletions src/sagemaker/local/image.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import yaml

import sagemaker
import sagemaker.local.data
import sagemaker.local.utils
import sagemaker.utils

CONTAINER_PREFIX = 'algo'
DOCKER_COMPOSE_FILENAME = 'docker-compose.yaml'
Expand Down Expand Up @@ -78,7 +81,7 @@ def __init__(self, instance_type, instance_count, image, sagemaker_session=None)
self.container_root = None
self.container = None

def train(self, input_data_config, hyperparameters, job_name):
def train(self, input_data_config, output_data_config, hyperparameters, job_name):
"""Run a training job locally using docker-compose.
Args:
input_data_config (dict): The Input Data Configuration, this contains data such as the
Expand Down Expand Up @@ -126,23 +129,17 @@ def train(self, input_data_config, hyperparameters, job_name):
msg = "Failed to run: %s, %s" % (compose_command, str(e))
raise RuntimeError(msg)

s3_artifacts = self.retrieve_artifacts(compose_data)
artifacts = self.retrieve_artifacts(compose_data, output_data_config, job_name)

# free up the training data directory as it may contain
# lots of data downloaded from S3. This doesn't delete any local
# data that was just mounted to the container.
_delete_tree(data_dir)
_delete_tree(shared_dir)
# Also free the container config files.
for host in self.hosts:
container_config_path = os.path.join(self.container_root, host)
_delete_tree(container_config_path)

self._cleanup()
# Print our Job Complete line to have a simmilar experience to training on SageMaker where you
dirs_to_delete = [data_dir, shared_dir]
self._cleanup(dirs_to_delete)
# Print our Job Complete line to have a similar experience to training on SageMaker where you
# see this line at the end.
print('===== Job Complete =====')
return s3_artifacts
return artifacts

def serve(self, model_dir, environment):
"""Host a local endpoint using docker-compose.
Expand Down Expand Up @@ -188,7 +185,7 @@ def stop_serving(self):
# for serving we can delete everything in the container root.
_delete_tree(self.container_root)

def retrieve_artifacts(self, compose_data):
def retrieve_artifacts(self, compose_data, output_data_config, job_name):
"""Get the model artifacts from all the container nodes.

Used after training completes to gather the data from all the individual containers. As the
Expand All @@ -201,26 +198,49 @@ def retrieve_artifacts(self, compose_data):
Returns: Local path to the collected model artifacts.

"""
# Grab the model artifacts from all the Nodes.
s3_artifacts = os.path.join(self.container_root, 's3_artifacts')
os.mkdir(s3_artifacts)
# We need a directory to store the artfiacts from all the nodes
# and another one to contained the compressed final artifacts
artifacts = os.path.join(self.container_root, 'artifacts')
compressed_artifacts = os.path.join(self.container_root, 'compressed_artifacts')
os.mkdir(artifacts)

model_artifacts = os.path.join(artifacts, 'model')
output_artifacts = os.path.join(artifacts, 'output')

s3_model_artifacts = os.path.join(s3_artifacts, 'model')
s3_output_artifacts = os.path.join(s3_artifacts, 'output')
os.mkdir(s3_model_artifacts)
os.mkdir(s3_output_artifacts)
artifact_dirs = [model_artifacts, output_artifacts, compressed_artifacts]
for d in artifact_dirs:
os.mkdir(d)

# Gather the artifacts from all nodes into artifacts/model and artifacts/output
for host in self.hosts:
volumes = compose_data['services'][str(host)]['volumes']

for volume in volumes:
host_dir, container_dir = volume.split(':')
if container_dir == '/opt/ml/model':
sagemaker.local.utils.recursive_copy(host_dir, s3_model_artifacts)
sagemaker.local.utils.recursive_copy(host_dir, model_artifacts)
elif container_dir == '/opt/ml/output':
sagemaker.local.utils.recursive_copy(host_dir, s3_output_artifacts)
sagemaker.local.utils.recursive_copy(host_dir, output_artifacts)

# Tar Artifacts -> model.tar.gz and output.tar.gz
model_files = [os.path.join(model_artifacts, name) for name in os.listdir(model_artifacts)]
output_files = [os.path.join(output_artifacts, name) for name in os.listdir(output_artifacts)]
sagemaker.utils.create_tar_file(model_files, os.path.join(compressed_artifacts, 'model.tar.gz'))
sagemaker.utils.create_tar_file(output_files, os.path.join(compressed_artifacts, 'output.tar.gz'))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should that be a function? (all 5 lines)
that seems a bit repetitive.


if output_data_config['S3OutputPath'] == '':
output_data = 'file://%s' % compressed_artifacts
else:
# Now we just need to move the compressed artifacts to wherever they are required
output_data = sagemaker.local.utils.move_to_destination(
compressed_artifacts,
output_data_config['S3OutputPath'],
job_name,
self.sagemaker_session)

_delete_tree(model_artifacts)
_delete_tree(output_artifacts)

return s3_model_artifacts
return os.path.join(output_data, 'model.tar.gz')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we moved both files should we return both (model and output) too??

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, SageMaker just returns the S3ModelArtifacts - if you want to look at output.tar.gz you basically have to do a replace in the string. This gets sent directly to the local client describeTrainingJob()


def write_config_files(self, host, hyperparameters, input_data_config):
"""Write the config files for the training containers.
Expand All @@ -235,7 +255,6 @@ def write_config_files(self, host, hyperparameters, input_data_config):
Returns: None

"""

config_path = os.path.join(self.container_root, host, 'input', 'config')

resource_config = {
Expand All @@ -261,29 +280,13 @@ def _prepare_training_volumes(self, data_dir, input_data_config, hyperparameters
# mount the local directory to the container. For S3 Data we will download the S3 data
# first.
for channel in input_data_config:
if channel['DataSource'] and 'S3DataSource' in channel['DataSource']:
uri = channel['DataSource']['S3DataSource']['S3Uri']
elif channel['DataSource'] and 'FileDataSource' in channel['DataSource']:
uri = channel['DataSource']['FileDataSource']['FileUri']
else:
raise ValueError('Need channel[\'DataSource\'] to have'
' [\'S3DataSource\'] or [\'FileDataSource\']')

parsed_uri = urlparse(uri)
key = parsed_uri.path.lstrip('/')

uri = channel['DataUri']
channel_name = channel['ChannelName']
channel_dir = os.path.join(data_dir, channel_name)
os.mkdir(channel_dir)

if parsed_uri.scheme == 's3':
bucket_name = parsed_uri.netloc
sagemaker.utils.download_folder(bucket_name, key, channel_dir, self.sagemaker_session)
elif parsed_uri.scheme == 'file':
path = parsed_uri.path
volumes.append(_Volume(path, channel=channel_name))
else:
raise ValueError('Unknown URI scheme {}'.format(parsed_uri.scheme))
data_source = sagemaker.local.data.get_data_source_instance(uri, self.sagemaker_session)
volumes.append(_Volume(data_source.get_root_dir(), channel=channel_name))

# If there is a training script directory and it is a local directory,
# mount it to the container.
Expand All @@ -301,25 +304,20 @@ def _prepare_serving_volumes(self, model_location):
volumes = []
host = self.hosts[0]
# Make the model available to the container. If this is a local file just mount it to
# the container as a volume. If it is an S3 location download it and extract the tar file.
# the container as a volume. If it is an S3 location, the DataSource will download it, we
# just need to extract the tar file.
host_dir = os.path.join(self.container_root, host)
os.makedirs(host_dir)

if model_location.startswith('s3'):
container_model_dir = os.path.join(self.container_root, host, 'model')
os.makedirs(container_model_dir)
model_data_source = sagemaker.local.data.get_data_source_instance(
model_location, self.sagemaker_session)

parsed_uri = urlparse(model_location)
filename = os.path.basename(parsed_uri.path)
tar_location = os.path.join(container_model_dir, filename)
sagemaker.utils.download_file(parsed_uri.netloc, parsed_uri.path, tar_location, self.sagemaker_session)
for filename in model_data_source.get_file_list():
if tarfile.is_tarfile(filename):
with tarfile.open(filename) as tar:
tar.extractall(path=model_data_source.get_root_dir())

if tarfile.is_tarfile(tar_location):
with tarfile.open(tar_location) as tar:
tar.extractall(path=container_model_dir)
volumes.append(_Volume(container_model_dir, '/opt/ml/model'))
else:
volumes.append(_Volume(model_location, '/opt/ml/model'))
volumes.append(_Volume(model_data_source.get_root_dir(), '/opt/ml/model'))

return volumes

Expand Down Expand Up @@ -368,7 +366,6 @@ def _generate_compose_file(self, command, additional_volumes=None, additional_en
'networks': {
'sagemaker-local': {'name': 'sagemaker-local'}
}

}

docker_compose_path = os.path.join(self.container_root, DOCKER_COMPOSE_FILENAME)
Expand Down Expand Up @@ -469,9 +466,15 @@ def _build_optml_volumes(self, host, subdirs):

return volumes

def _cleanup(self):
# we don't need to cleanup anything at the moment
pass
def _cleanup(self, dirs_to_delete=None):
if dirs_to_delete:
for d in dirs_to_delete:
_delete_tree(d)

# Free the container config files.
for host in self.hosts:
container_config_path = os.path.join(self.container_root, host)
_delete_tree(container_config_path)


class _HostingContainer(Thread):
Expand Down Expand Up @@ -610,7 +613,7 @@ def _aws_credentials(session):
'AWS_SECRET_ACCESS_KEY=%s' % (str(secret_key))
]
elif not _aws_credentials_available_in_metadata_service():
logger.warn("Using the short-lived AWS credentials found in session. They might expire while running.")
logger.warning("Using the short-lived AWS credentials found in session. They might expire while running.")
return [
'AWS_ACCESS_KEY_ID=%s' % (str(access_key)),
'AWS_SECRET_ACCESS_KEY=%s' % (str(secret_key)),
Expand Down
2 changes: 1 addition & 1 deletion src/sagemaker/local/local_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def create_training_job(self, TrainingJobName, AlgorithmSpecification, InputData
AlgorithmSpecification['TrainingImage'], self.sagemaker_session)
training_job = _LocalTrainingJob(container)
hyperparameters = kwargs['HyperParameters'] if 'HyperParameters' in kwargs else {}
training_job.start(InputDataConfig, hyperparameters, TrainingJobName)
training_job.start(InputDataConfig, OutputDataConfig, hyperparameters, TrainingJobName)

LocalSagemakerClient._training_jobs[TrainingJobName] = training_job

Expand Down
10 changes: 8 additions & 2 deletions src/sagemaker/local/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,25 +39,31 @@ def copy_directory_structure(destination_directory, relative_path):
os.makedirs(destination_directory, relative_path)


def move_to_destination(source, destination, sagemaker_session):
def move_to_destination(source, destination, job_name, sagemaker_session):
"""move source to destination. Can handle uploading to S3

Args:
source (str): root directory to move
destination (str): file:// or s3:// URI that source will be moved to.
sagemaker_session (sagemaker.Session): a sagemaker_session to interact with S3 if needed

Returns:
(str): destination URI
"""
parsed_uri = urlparse(destination)
if parsed_uri.scheme == 'file':
recursive_copy(source, parsed_uri.path)
final_uri = destination
elif parsed_uri.scheme == 's3':
bucket = parsed_uri.netloc
path = parsed_uri.path.strip('/')
path = "%s%s" % (parsed_uri.path.lstrip('/'), job_name)
final_uri = 's3://%s/%s' % (bucket, path)
sagemaker_session.upload_data(source, bucket, path)
else:
raise ValueError('Invalid destination URI, must be s3:// or file://, got: %s' % destination)

shutil.rmtree(source)
return final_uri


def recursive_copy(source, destination):
Expand Down
Loading