From 95ad797c88dd262821d6eadbff94275725ddfa99 Mon Sep 17 00:00:00 2001 From: Ignacio Quintero Date: Mon, 29 Oct 2018 14:58:42 -0700 Subject: [PATCH 1/3] local mode: support output_path. Can be either file:// or s3:// - This also changes the default behavior of local mode to use the SDK provided default S3 bucket if nothing is passed. This makes it easier for customers to create models in SageMaker too since their Model Artifacts will already be a tarfile in S3. --- CHANGELOG.rst | 4 + src/sagemaker/estimator.py | 3 + src/sagemaker/fw_utils.py | 18 +-- src/sagemaker/local/data.py | 7 + src/sagemaker/local/entities.py | 11 +- src/sagemaker/local/image.py | 129 +++++++-------- src/sagemaker/local/local_session.py | 2 +- src/sagemaker/local/utils.py | 10 +- src/sagemaker/session.py | 2 + src/sagemaker/utils.py | 24 +++ tests/integ/test_local_mode.py | 50 +++--- tests/unit/test_estimator.py | 20 +++ tests/unit/test_image.py | 226 ++++++++++++++------------- tests/unit/test_local_entities.py | 2 +- tests/unit/test_local_utils.py | 11 +- tests/unit/test_utils.py | 21 +++ tox.ini | 3 +- 17 files changed, 321 insertions(+), 222 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 516043774b..367b9b2999 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -2,6 +2,10 @@ CHANGELOG ========= +1.12.1dev +========= +* enhancement: Local Mode: support output_path. Can be either file:// or s3:// + 1.12.0 ====== diff --git a/src/sagemaker/estimator.py b/src/sagemaker/estimator.py index 7325129a14..d81a0939e3 100644 --- a/src/sagemaker/estimator.py +++ b/src/sagemaker/estimator.py @@ -104,6 +104,9 @@ def __init__(self, role, train_instance_count, train_instance_type, self.base_job_name = base_job_name self._current_job_name = None + if (not self.sagemaker_session.local_mode + and output_path and output_path.startswith('file://')): + raise RuntimeError('file:// output paths are only supported in Local Mode') self.output_path = output_path self.output_kms_key = output_kms_key self.latest_training_job = None diff --git a/src/sagemaker/fw_utils.py b/src/sagemaker/fw_utils.py index 5c3d28245d..b35c0ca041 100644 --- a/src/sagemaker/fw_utils.py +++ b/src/sagemaker/fw_utils.py @@ -14,12 +14,10 @@ import os import re -import tarfile -import tempfile from collections import namedtuple from six.moves.urllib.parse import urlparse -from sagemaker.utils import name_from_image +import sagemaker.utils """This module contains utility functions shared across ``Framework`` components.""" @@ -128,14 +126,9 @@ def tar_and_upload_dir(session, bucket, s3_key_prefix, script, directory): s3 = session.resource('s3') key = '{}/{}'.format(s3_key_prefix, 'sourcedir.tar.gz') - with tempfile.TemporaryFile() as f: - with tarfile.open(mode='w:gz', fileobj=f) as t: - for sf in source_files: - # Add all files from the directory into the root of the directory structure of the tar - t.add(sf, arcname=os.path.basename(sf)) - # Need to reset the file descriptor position after writing to prepare for read - f.seek(0) - s3.Object(bucket, key).put(Body=f) + tar_file = sagemaker.utils.create_tar_file(source_files) + s3.Object(bucket, key).upload_file(tar_file) + os.remove(tar_file) return UploadedCode(s3_prefix='s3://{}/{}'.format(bucket, key), script_name=script_name) @@ -226,7 +219,8 @@ def model_code_key_prefix(code_location_key_prefix, model_name, image): Returns: str: the key prefix to be used in uploading code """ - return '/'.join(filter(None, [code_location_key_prefix, model_name or name_from_image(image)])) + training_job_name = sagemaker.utils.name_from_image(image) + return '/'.join(filter(None, [code_location_key_prefix, model_name or training_job_name])) def empty_framework_version_warning(default_version): diff --git a/src/sagemaker/local/data.py b/src/sagemaker/local/data.py index 6c90467233..5baa900891 100644 --- a/src/sagemaker/local/data.py +++ b/src/sagemaker/local/data.py @@ -13,6 +13,7 @@ from __future__ import absolute_import import os +import platform import sys import tempfile from abc import ABCMeta @@ -162,6 +163,12 @@ def __init__(self, bucket, prefix, sagemaker_session): root_dir = os.path.abspath(root_dir) working_dir = tempfile.mkdtemp(dir=root_dir) + # Docker cannot mount Mac OS /var folder properly see + # https://forums.docker.com/t/var-folders-isnt-mounted-properly/9600 + # Only apply this workaround if the user didn't provide an alternate storage root dir. + if root_dir is None and platform.system() == 'Darwin': + working_dir = '/private{}'.format(working_dir) + sagemaker.utils.download_folder(bucket, prefix, working_dir, sagemaker_session) self.files = LocalFileDataSource(working_dir) diff --git a/src/sagemaker/local/entities.py b/src/sagemaker/local/entities.py index 199db7d5cb..6d7a4c7a4a 100644 --- a/src/sagemaker/local/entities.py +++ b/src/sagemaker/local/entities.py @@ -46,15 +46,20 @@ def __init__(self, container): self.start_time = None self.end_time = None - def start(self, input_data_config, hyperparameters, job_name): + def start(self, input_data_config, output_data_config, hyperparameters, job_name): for channel in input_data_config: if channel['DataSource'] and 'S3DataSource' in channel['DataSource']: data_distribution = channel['DataSource']['S3DataSource']['S3DataDistributionType'] + data_uri = channel['DataSource']['S3DataSource']['S3Uri'] elif channel['DataSource'] and 'FileDataSource' in channel['DataSource']: data_distribution = channel['DataSource']['FileDataSource']['FileDataDistributionType'] + data_uri = channel['DataSource']['FileDataSource']['FileUri'] else: raise ValueError('Need channel[\'DataSource\'] to have [\'S3DataSource\'] or [\'FileDataSource\']') + # use a single Data URI - this makes handling S3 and File Data easier down the stack + channel['DataUri'] = data_uri + if data_distribution != 'FullyReplicated': raise RuntimeError('DataDistribution: %s is not currently supported in Local Mode' % data_distribution) @@ -62,7 +67,7 @@ def start(self, input_data_config, hyperparameters, job_name): self.start = datetime.datetime.now() self.state = self._TRAINING - self.model_artifacts = self.container.train(input_data_config, hyperparameters, job_name) + self.model_artifacts = self.container.train(input_data_config, output_data_config, hyperparameters, job_name) self.end = datetime.datetime.now() self.state = self._COMPLETED @@ -298,7 +303,7 @@ def _perform_batch_inference(self, input_data, output_data, **kwargs): if 'AssembleWith' in output_data and output_data['AssembleWith'] == 'Line': f.write(b'\n') - move_to_destination(working_dir, output_data['S3OutputPath'], self.local_session) + move_to_destination(working_dir, output_data['S3OutputPath'], self.name, self.local_session) self.container.stop_serving() diff --git a/src/sagemaker/local/image.py b/src/sagemaker/local/image.py index 67f488fae4..1c3e1a9b9f 100644 --- a/src/sagemaker/local/image.py +++ b/src/sagemaker/local/image.py @@ -33,6 +33,9 @@ import yaml import sagemaker +import sagemaker.local.data +import sagemaker.local.utils +import sagemaker.utils CONTAINER_PREFIX = 'algo' DOCKER_COMPOSE_FILENAME = 'docker-compose.yaml' @@ -78,7 +81,7 @@ def __init__(self, instance_type, instance_count, image, sagemaker_session=None) self.container_root = None self.container = None - def train(self, input_data_config, hyperparameters, job_name): + def train(self, input_data_config, output_data_config, hyperparameters, job_name): """Run a training job locally using docker-compose. Args: input_data_config (dict): The Input Data Configuration, this contains data such as the @@ -126,23 +129,17 @@ def train(self, input_data_config, hyperparameters, job_name): msg = "Failed to run: %s, %s" % (compose_command, str(e)) raise RuntimeError(msg) - s3_artifacts = self.retrieve_artifacts(compose_data) + artifacts = self.retrieve_artifacts(compose_data, output_data_config, job_name) # free up the training data directory as it may contain # lots of data downloaded from S3. This doesn't delete any local # data that was just mounted to the container. - _delete_tree(data_dir) - _delete_tree(shared_dir) - # Also free the container config files. - for host in self.hosts: - container_config_path = os.path.join(self.container_root, host) - _delete_tree(container_config_path) - - self._cleanup() - # Print our Job Complete line to have a simmilar experience to training on SageMaker where you + dirs_to_delete = [data_dir, shared_dir] + self._cleanup(dirs_to_delete) + # Print our Job Complete line to have a similar experience to training on SageMaker where you # see this line at the end. print('===== Job Complete =====') - return s3_artifacts + return artifacts def serve(self, model_dir, environment): """Host a local endpoint using docker-compose. @@ -188,7 +185,7 @@ def stop_serving(self): # for serving we can delete everything in the container root. _delete_tree(self.container_root) - def retrieve_artifacts(self, compose_data): + def retrieve_artifacts(self, compose_data, output_data_config, job_name): """Get the model artifacts from all the container nodes. Used after training completes to gather the data from all the individual containers. As the @@ -201,26 +198,49 @@ def retrieve_artifacts(self, compose_data): Returns: Local path to the collected model artifacts. """ - # Grab the model artifacts from all the Nodes. - s3_artifacts = os.path.join(self.container_root, 's3_artifacts') - os.mkdir(s3_artifacts) + # We need a directory to store the artfiacts from all the nodes + # and another one to contained the compressed final artifacts + artifacts = os.path.join(self.container_root, 'artifacts') + compressed_artifacts = os.path.join(self.container_root, 'compressed_artifacts') + os.mkdir(artifacts) + + model_artifacts = os.path.join(artifacts, 'model') + output_artifacts = os.path.join(artifacts, 'output') - s3_model_artifacts = os.path.join(s3_artifacts, 'model') - s3_output_artifacts = os.path.join(s3_artifacts, 'output') - os.mkdir(s3_model_artifacts) - os.mkdir(s3_output_artifacts) + artifact_dirs = [model_artifacts, output_artifacts, compressed_artifacts] + for d in artifact_dirs: + os.mkdir(d) + # Gather the artifacts from all nodes into artifacts/model and artifacts/output for host in self.hosts: volumes = compose_data['services'][str(host)]['volumes'] - for volume in volumes: host_dir, container_dir = volume.split(':') if container_dir == '/opt/ml/model': - sagemaker.local.utils.recursive_copy(host_dir, s3_model_artifacts) + sagemaker.local.utils.recursive_copy(host_dir, model_artifacts) elif container_dir == '/opt/ml/output': - sagemaker.local.utils.recursive_copy(host_dir, s3_output_artifacts) + sagemaker.local.utils.recursive_copy(host_dir, output_artifacts) + + # Tar Artifacts -> model.tar.gz and output.tar.gz + model_files = [os.path.join(model_artifacts, name) for name in os.listdir(model_artifacts)] + output_files = [os.path.join(output_artifacts, name) for name in os.listdir(output_artifacts)] + sagemaker.utils.create_tar_file(model_files, os.path.join(compressed_artifacts, 'model.tar.gz')) + sagemaker.utils.create_tar_file(output_files, os.path.join(compressed_artifacts, 'output.tar.gz')) + + if output_data_config['S3OutputPath'] == '': + output_data = 'file://%s' % compressed_artifacts + else: + # Now we just need to move the compressed artifacts to wherever they are required + output_data = sagemaker.local.utils.move_to_destination( + compressed_artifacts, + output_data_config['S3OutputPath'], + job_name, + self.sagemaker_session) + + _delete_tree(model_artifacts) + _delete_tree(output_artifacts) - return s3_model_artifacts + return os.path.join(output_data, 'model.tar.gz') def write_config_files(self, host, hyperparameters, input_data_config): """Write the config files for the training containers. @@ -235,7 +255,6 @@ def write_config_files(self, host, hyperparameters, input_data_config): Returns: None """ - config_path = os.path.join(self.container_root, host, 'input', 'config') resource_config = { @@ -261,29 +280,13 @@ def _prepare_training_volumes(self, data_dir, input_data_config, hyperparameters # mount the local directory to the container. For S3 Data we will download the S3 data # first. for channel in input_data_config: - if channel['DataSource'] and 'S3DataSource' in channel['DataSource']: - uri = channel['DataSource']['S3DataSource']['S3Uri'] - elif channel['DataSource'] and 'FileDataSource' in channel['DataSource']: - uri = channel['DataSource']['FileDataSource']['FileUri'] - else: - raise ValueError('Need channel[\'DataSource\'] to have' - ' [\'S3DataSource\'] or [\'FileDataSource\']') - - parsed_uri = urlparse(uri) - key = parsed_uri.path.lstrip('/') - + uri = channel['DataUri'] channel_name = channel['ChannelName'] channel_dir = os.path.join(data_dir, channel_name) os.mkdir(channel_dir) - if parsed_uri.scheme == 's3': - bucket_name = parsed_uri.netloc - sagemaker.utils.download_folder(bucket_name, key, channel_dir, self.sagemaker_session) - elif parsed_uri.scheme == 'file': - path = parsed_uri.path - volumes.append(_Volume(path, channel=channel_name)) - else: - raise ValueError('Unknown URI scheme {}'.format(parsed_uri.scheme)) + data_source = sagemaker.local.data.get_data_source_instance(uri, self.sagemaker_session) + volumes.append(_Volume(data_source.get_root_dir(), channel=channel_name)) # If there is a training script directory and it is a local directory, # mount it to the container. @@ -301,25 +304,20 @@ def _prepare_serving_volumes(self, model_location): volumes = [] host = self.hosts[0] # Make the model available to the container. If this is a local file just mount it to - # the container as a volume. If it is an S3 location download it and extract the tar file. + # the container as a volume. If it is an S3 location, the DataSource will download it, we + # just need to extract the tar file. host_dir = os.path.join(self.container_root, host) os.makedirs(host_dir) - if model_location.startswith('s3'): - container_model_dir = os.path.join(self.container_root, host, 'model') - os.makedirs(container_model_dir) + model_data_source = sagemaker.local.data.get_data_source_instance( + model_location, self.sagemaker_session) - parsed_uri = urlparse(model_location) - filename = os.path.basename(parsed_uri.path) - tar_location = os.path.join(container_model_dir, filename) - sagemaker.utils.download_file(parsed_uri.netloc, parsed_uri.path, tar_location, self.sagemaker_session) + for filename in model_data_source.get_file_list(): + if tarfile.is_tarfile(filename): + with tarfile.open(filename) as tar: + tar.extractall(path=model_data_source.get_root_dir()) - if tarfile.is_tarfile(tar_location): - with tarfile.open(tar_location) as tar: - tar.extractall(path=container_model_dir) - volumes.append(_Volume(container_model_dir, '/opt/ml/model')) - else: - volumes.append(_Volume(model_location, '/opt/ml/model')) + volumes.append(_Volume(model_data_source.get_root_dir(), '/opt/ml/model')) return volumes @@ -368,7 +366,6 @@ def _generate_compose_file(self, command, additional_volumes=None, additional_en 'networks': { 'sagemaker-local': {'name': 'sagemaker-local'} } - } docker_compose_path = os.path.join(self.container_root, DOCKER_COMPOSE_FILENAME) @@ -469,9 +466,15 @@ def _build_optml_volumes(self, host, subdirs): return volumes - def _cleanup(self): - # we don't need to cleanup anything at the moment - pass + def _cleanup(self, dirs_to_delete=None): + if dirs_to_delete: + for d in dirs_to_delete: + _delete_tree(d) + + # Free the container config files. + for host in self.hosts: + container_config_path = os.path.join(self.container_root, host) + _delete_tree(container_config_path) class _HostingContainer(Thread): @@ -610,7 +613,7 @@ def _aws_credentials(session): 'AWS_SECRET_ACCESS_KEY=%s' % (str(secret_key)) ] elif not _aws_credentials_available_in_metadata_service(): - logger.warn("Using the short-lived AWS credentials found in session. They might expire while running.") + logger.warning("Using the short-lived AWS credentials found in session. They might expire while running.") return [ 'AWS_ACCESS_KEY_ID=%s' % (str(access_key)), 'AWS_SECRET_ACCESS_KEY=%s' % (str(secret_key)), diff --git a/src/sagemaker/local/local_session.py b/src/sagemaker/local/local_session.py index 8361e18dab..134d90ffe3 100644 --- a/src/sagemaker/local/local_session.py +++ b/src/sagemaker/local/local_session.py @@ -71,7 +71,7 @@ def create_training_job(self, TrainingJobName, AlgorithmSpecification, InputData AlgorithmSpecification['TrainingImage'], self.sagemaker_session) training_job = _LocalTrainingJob(container) hyperparameters = kwargs['HyperParameters'] if 'HyperParameters' in kwargs else {} - training_job.start(InputDataConfig, hyperparameters, TrainingJobName) + training_job.start(InputDataConfig, OutputDataConfig, hyperparameters, TrainingJobName) LocalSagemakerClient._training_jobs[TrainingJobName] = training_job diff --git a/src/sagemaker/local/utils.py b/src/sagemaker/local/utils.py index 355c4f77d2..a79f36df78 100644 --- a/src/sagemaker/local/utils.py +++ b/src/sagemaker/local/utils.py @@ -39,25 +39,31 @@ def copy_directory_structure(destination_directory, relative_path): os.makedirs(destination_directory, relative_path) -def move_to_destination(source, destination, sagemaker_session): +def move_to_destination(source, destination, job_name, sagemaker_session): """move source to destination. Can handle uploading to S3 Args: source (str): root directory to move destination (str): file:// or s3:// URI that source will be moved to. sagemaker_session (sagemaker.Session): a sagemaker_session to interact with S3 if needed + + Returns: + (str): destination URI """ parsed_uri = urlparse(destination) if parsed_uri.scheme == 'file': recursive_copy(source, parsed_uri.path) + final_uri = destination elif parsed_uri.scheme == 's3': bucket = parsed_uri.netloc - path = parsed_uri.path.strip('/') + path = "%s%s" % (parsed_uri.path.lstrip('/'), job_name) + final_uri = 's3://%s/%s' % (bucket, path) sagemaker_session.upload_data(source, bucket, path) else: raise ValueError('Invalid destination URI, must be s3:// or file://, got: %s' % destination) shutil.rmtree(source) + return final_uri def recursive_copy(source, destination): diff --git a/src/sagemaker/session.py b/src/sagemaker/session.py index 66e3d8ec2f..f339e64f58 100644 --- a/src/sagemaker/session.py +++ b/src/sagemaker/session.py @@ -134,6 +134,7 @@ def upload_data(self, path, bucket=None, key_prefix='data'): files = [] key_suffix = None if os.path.isdir(path): + print('%s is dir!' % path) for dirpath, dirnames, filenames in os.walk(path): for name in filenames: local_path = os.path.join(dirpath, name) @@ -150,6 +151,7 @@ def upload_data(self, path, bucket=None, key_prefix='data'): s3 = self.boto_session.resource('s3') for local_path, s3_key in files: + print("%s -> %s" % (local_path, s3_key)) s3.Object(bucket, s3_key).upload_file(local_path) s3_uri = 's3://{}/{}'.format(bucket, key_prefix) diff --git a/src/sagemaker/utils.py b/src/sagemaker/utils.py index 160b78a1a4..9cd4bc0296 100644 --- a/src/sagemaker/utils.py +++ b/src/sagemaker/utils.py @@ -16,6 +16,8 @@ import os import re import sys +import tarfile +import tempfile import time from datetime import datetime @@ -242,6 +244,28 @@ def download_folder(bucket_name, prefix, target, sagemaker_session): obj.download_file(file_path) +def create_tar_file(source_files, target=None): + """Create a tar file containing all the source_files + + Args: + source_files (List[str]): List of file paths that will be contained in the tar file + + Returns: + (str): path to created tar file + + """ + if target: + filename = target + else: + _, filename = tempfile.mkstemp() + + with tarfile.open(filename, mode='w:gz') as t: + for sf in source_files: + # Add all files from the directory into the root of the directory structure of the tar + t.add(sf, arcname=os.path.basename(sf)) + return filename + + def download_file(bucket_name, path, target, sagemaker_session): """Download a Single File from S3 into a local path diff --git a/tests/integ/test_local_mode.py b/tests/integ/test_local_mode.py index ace201a7fa..b51c844c8e 100644 --- a/tests/integ/test_local_mode.py +++ b/tests/integ/test_local_mode.py @@ -21,9 +21,8 @@ import pytest from sagemaker.local import LocalSession, LocalSagemakerRuntimeClient, LocalSagemakerClient -from sagemaker.mxnet import MXNet, MXNetModel +from sagemaker.mxnet import MXNet from sagemaker.tensorflow import TensorFlow -from sagemaker.fw_utils import tar_and_upload_dir from tests.integ import DATA_DIR, PYTHON_VERSION from tests.integ.timeout import timeout @@ -58,21 +57,24 @@ def _initialize(self, boto_session, sagemaker_client, sagemaker_runtime_client): @pytest.fixture(scope='module') def mxnet_model(sagemaker_local_session): - script_path = os.path.join(DATA_DIR, 'mxnet_mnist', 'mnist.py') - data_path = os.path.join(DATA_DIR, 'mxnet_mnist') + def _create_model(output_path): + script_path = os.path.join(DATA_DIR, 'mxnet_mnist', 'mnist.py') + data_path = os.path.join(DATA_DIR, 'mxnet_mnist') - mx = MXNet(entry_point=script_path, role='SageMakerRole', - train_instance_count=1, train_instance_type='local', - sagemaker_session=sagemaker_local_session) + mx = MXNet(entry_point=script_path, role='SageMakerRole', + train_instance_count=1, train_instance_type='local', + output_path=output_path, + sagemaker_session=sagemaker_local_session) - train_input = mx.sagemaker_session.upload_data(path=os.path.join(data_path, 'train'), - key_prefix='integ-test-data/mxnet_mnist/train') - test_input = mx.sagemaker_session.upload_data(path=os.path.join(data_path, 'test'), - key_prefix='integ-test-data/mxnet_mnist/test') + train_input = mx.sagemaker_session.upload_data(path=os.path.join(data_path, 'train'), + key_prefix='integ-test-data/mxnet_mnist/train') + test_input = mx.sagemaker_session.upload_data(path=os.path.join(data_path, 'test'), + key_prefix='integ-test-data/mxnet_mnist/test') - mx.fit({'train': train_input, 'test': test_input}) - model = mx.create_model(1) - return model + mx.fit({'train': train_input, 'test': test_input}) + model = mx.create_model(1) + return model + return _create_model @pytest.mark.skipif(PYTHON_VERSION != 'py2', reason="TensorFlow image supports only python 2.") @@ -259,15 +261,9 @@ def test_local_mode_serving_from_s3_model(sagemaker_local_session, mxnet_model): local_mode_lock_fd = open(LOCK_PATH, 'w') local_mode_lock = local_mode_lock_fd.fileno() - model_data = mxnet_model.model_data - boto_session = sagemaker_local_session.boto_session - default_bucket = sagemaker_local_session.default_bucket() - uploaded_data = tar_and_upload_dir(boto_session, default_bucket, - 'test_mxnet_local_mode', '', model_data) - - s3_model = MXNetModel(model_data=uploaded_data.s3_prefix, role='SageMakerRole', - entry_point=mxnet_model.entry_point, image=mxnet_model.image, - sagemaker_session=sagemaker_local_session) + path = 's3://%s' % sagemaker_local_session.default_bucket() + s3_model = mxnet_model(path) + s3_model.sagemaker_session = sagemaker_local_session predictor = None try: @@ -285,7 +281,7 @@ def test_local_mode_serving_from_s3_model(sagemaker_local_session, mxnet_model): fcntl.lockf(local_mode_lock, fcntl.LOCK_UN) -def test_local_mode_serving_from_local_model(sagemaker_local_session, mxnet_model): +def test_local_mode_serving_from_local_model(tmpdir, sagemaker_local_session, mxnet_model): local_mode_lock_fd = open(LOCK_PATH, 'w') local_mode_lock = local_mode_lock_fd.fileno() predictor = None @@ -295,8 +291,10 @@ def test_local_mode_serving_from_local_model(sagemaker_local_session, mxnet_mode # to allow concurrent test execution. The serving test is really fast so it still # makes sense to allow this behavior. fcntl.lockf(local_mode_lock, fcntl.LOCK_EX) - mxnet_model.sagemaker_session = sagemaker_local_session - predictor = mxnet_model.deploy(initial_instance_count=1, instance_type='local') + path = 'file://%s' % (str(tmpdir)) + model = mxnet_model(path) + model.sagemaker_session = sagemaker_local_session + predictor = model.deploy(initial_instance_count=1, instance_type='local') data = numpy.zeros(shape=(1, 1, 28, 28)) predictor.predict(data) finally: diff --git a/tests/unit/test_estimator.py b/tests/unit/test_estimator.py index 40eacfb6d3..b2cc2cb542 100644 --- a/tests/unit/test_estimator.py +++ b/tests/unit/test_estimator.py @@ -867,4 +867,24 @@ def test_distributed_gpu_local_mode(LocalSession): with pytest.raises(RuntimeError): Estimator(IMAGE_NAME, ROLE, 3, 'local_gpu', output_path=OUTPUT_PATH) + +@patch('sagemaker.estimator.LocalSession') +def test_local_mode_file_output_path(local_session_class): + local_session = Mock() + local_session.local_mode = True + local_session_class.return_value = local_session + + e = Estimator(IMAGE_NAME, ROLE, INSTANCE_COUNT, 'local', output_path='file:///tmp/model/') + assert e.output_path == 'file:///tmp/model/' + + +@patch('sagemaker.estimator.Session') +def test_file_output_path_not_supported_outside_local_mode(session_class): + session = Mock() + session.local_mode = False + session_class.return_value = session + + with pytest.raises(RuntimeError): + Estimator(IMAGE_NAME, ROLE, INSTANCE_COUNT, INSTANCE_TYPE, output_path='file:///tmp/model') + ################################################################################# diff --git a/tests/unit/test_image.py b/tests/unit/test_image.py index ac4cab8ae2..3a4da32784 100644 --- a/tests/unit/test_image.py +++ b/tests/unit/test_image.py @@ -22,10 +22,11 @@ import json import os import subprocess +import tarfile import pytest import yaml -from mock import call, patch, Mock, MagicMock +from mock import patch, Mock, MagicMock import sagemaker from sagemaker.local.image import _SageMakerContainer, _aws_credentials @@ -37,6 +38,7 @@ INPUT_DATA_CONFIG = [ { 'ChannelName': 'a', + 'DataUri': 'file:///tmp/source1', 'DataSource': { 'FileDataSource': { 'FileDataDistributionType': 'FullyReplicated', @@ -46,6 +48,7 @@ }, { 'ChannelName': 'b', + 'DataUri': 's3://my-own-bucket/prefix', 'DataSource': { 'S3DataSource': { 'S3DataDistributionType': 'FullyReplicated', @@ -55,11 +58,15 @@ } } ] + +OUTPUT_DATA_CONFIG = { + 'S3OutputPath': '' +} + HYPERPARAMETERS = {'a': 1, 'b': json.dumps('bee'), 'sagemaker_submit_directory': json.dumps('s3://my_bucket/code')} - LOCAL_CODE_HYPERPARAMETERS = {'a': 1, 'b': 2, 'sagemaker_submit_directory': json.dumps('file:///tmp/code')} @@ -81,7 +88,6 @@ def sagemaker_session(): @patch('sagemaker.local.local_session.LocalSession') def test_write_config_file(LocalSession, tmpdir): - sagemaker_container = _SageMakerContainer('local', 2, 'my-image') sagemaker_container.container_root = str(tmpdir.mkdir('container-root')) host = "algo-1" @@ -144,50 +150,59 @@ def test_retrieve_artifacts(LocalSession, tmpdir): } } - dirs1 = ['model', 'model/data'] - dirs2 = ['model', 'model/data', 'model/tmp'] - dirs3 = ['output', 'output/data'] - dirs4 = ['output', 'output/data', 'output/log'] - - files1 = ['model/data/model.json', 'model/data/variables.csv'] - files2 = ['model/data/model.json', 'model/data/variables2.csv', 'model/tmp/something-else.json'] - files3 = ['output/data/loss.json', 'output/data/accuracy.json'] - files4 = ['output/data/loss.json', 'output/data/accuracy2.json', 'output/log/warnings.txt'] - - expected = ['model', 'model/data/', 'model/data/model.json', 'model/data/variables.csv', - 'model/data/variables2.csv', 'model/tmp/something-else.json', 'output', 'output/data', 'output/log', - 'output/data/loss.json', 'output/data/accuracy.json', 'output/data/accuracy2.json', - 'output/log/warnings.txt'] - - for d in dirs1: - os.mkdir(os.path.join(volume1, d)) - for d in dirs2: - os.mkdir(os.path.join(volume2, d)) - for d in dirs3: - os.mkdir(os.path.join(volume1, d)) - for d in dirs4: - os.mkdir(os.path.join(volume2, d)) + dirs = [ + ('model', volume1), ('model/data', volume1), + ('model', volume2), ('model/data', volume2), ('model/tmp', volume2), + ('output', volume1), ('output/data', volume1), + ('output', volume2), ('output/data', volume2), ('output/log', volume2) + ] + + files = [ + ('model/data/model.json', volume1), ('model/data/variables.csv', volume1), + ('model/data/model.json', volume2), ('model/data/variables2.csv', volume2), + ('model/tmp/something-else.json', volume2), + ('output/data/loss.json', volume1), ('output/data/accuracy.json', volume1), + ('output/data/loss.json', volume2), ('output/data/accuracy2.json', volume2), + ('output/log/warnings.txt', volume2) + ] + + expected_model = ['data', 'data/model.json', 'data/variables.csv', + 'data/variables2.csv', 'tmp/something-else.json'] + expected_output = ['data', 'log', 'data/loss.json', 'data/accuracy.json', 'data/accuracy2.json', + 'log/warnings.txt'] + + for d, volume in dirs: + os.mkdir(os.path.join(volume, d)) # create all the files - for f in files1: - open(os.path.join(volume1, f), 'a').close() - for f in files2: - open(os.path.join(volume2, f), 'a').close() - for f in files3: - open(os.path.join(volume1, f), 'a').close() - for f in files4: - open(os.path.join(volume2, f), 'a').close() + for f, volume in files: + open(os.path.join(volume, f), 'a').close() - s3_model_artifacts = sagemaker_container.retrieve_artifacts(compose_data) - s3_artifacts = os.path.dirname(s3_model_artifacts) + output_path = str(tmpdir.mkdir('exported_files')) + output_data_config = { + 'S3OutputPath': 'file://%s' % output_path + } - for f in expected: - assert set(os.listdir(s3_artifacts)) == set(['model', 'output']) - assert os.path.exists(os.path.join(s3_artifacts, f)) + model_artifacts = sagemaker_container.retrieve_artifacts( + compose_data, output_data_config, sagemaker_session).replace('file://', '') + artifacts = os.path.dirname(model_artifacts) + # we have both the tar files + assert set(os.listdir(artifacts)) == {'model.tar.gz', 'output.tar.gz'} + + # check that the tar files contain what we expect + tar = tarfile.open(os.path.join(output_path, 'model.tar.gz')) + model_tar_files = [m.name for m in tar.getmembers()] + for f in expected_model: + assert f in model_tar_files + + tar = tarfile.open(os.path.join(output_path, 'output.tar.gz')) + output_tar_files = [m.name for m in tar.getmembers()] + for f in expected_output: + assert f in output_tar_files -def test_stream_output(): +def test_stream_output(): # it should raise an exception if the command fails with pytest.raises(RuntimeError): p = subprocess.Popen(['ls', '/some/unknown/path'], @@ -203,7 +218,6 @@ def test_stream_output(): def test_check_output(): - with pytest.raises(Exception): sagemaker.local.image._check_output(['ls', '/some/unknown/path']) @@ -216,14 +230,12 @@ def test_check_output(): assert output == msg -@patch('sagemaker.local.local_session.LocalSession') -@patch('sagemaker.local.image._stream_output') +@patch('sagemaker.local.local_session.LocalSession', Mock()) +@patch('sagemaker.local.image._stream_output', Mock()) +@patch('sagemaker.local.image._SageMakerContainer._cleanup', Mock()) +@patch('sagemaker.local.data.get_data_source_instance', Mock()) @patch('subprocess.Popen') -@patch('sagemaker.local.image._SageMakerContainer._cleanup') -@patch('sagemaker.utils.download_folder') -def test_train(download_folder, _cleanup, popen, _stream_output, LocalSession, - tmpdir, sagemaker_session): - +def test_train(popen, tmpdir, sagemaker_session): directories = [str(tmpdir.mkdir('container-root')), str(tmpdir.mkdir('data'))] with patch('sagemaker.local.image._SageMakerContainer._create_tmp_folder', side_effect=directories): @@ -231,14 +243,10 @@ def test_train(download_folder, _cleanup, popen, _stream_output, LocalSession, instance_count = 2 image = 'my-image' sagemaker_container = _SageMakerContainer('local', instance_count, image, sagemaker_session=sagemaker_session) - sagemaker_container.train(INPUT_DATA_CONFIG, HYPERPARAMETERS, TRAINING_JOB_NAME) - - channel_dir = os.path.join(directories[1], 'b') - download_folder_calls = [call('my-own-bucket', 'prefix', channel_dir, sagemaker_session)] - download_folder.assert_has_calls(download_folder_calls) + sagemaker_container.train( + INPUT_DATA_CONFIG, OUTPUT_DATA_CONFIG, HYPERPARAMETERS, TRAINING_JOB_NAME) docker_compose_file = os.path.join(sagemaker_container.container_root, 'docker-compose.yaml') - call_args = popen.call_args[0][0] assert call_args is not None @@ -260,20 +268,19 @@ def test_train(download_folder, _cleanup, popen, _stream_output, LocalSession, assert os.path.exists(os.path.join(sagemaker_container.container_root, 'output/data')) -@patch('sagemaker.local.local_session.LocalSession') -@patch('sagemaker.local.image._stream_output') -@patch('sagemaker.local.image._SageMakerContainer._cleanup') -@patch('sagemaker.utils.download_folder') -def test_train_with_hyperparameters_without_job_name(download_folder, _cleanup, _stream_output, LocalSession, tmpdir): - +@patch('sagemaker.local.local_session.LocalSession', Mock()) +@patch('sagemaker.local.image._stream_output', Mock()) +@patch('sagemaker.local.image._SageMakerContainer._cleanup', Mock()) +@patch('sagemaker.local.data.get_data_source_instance', Mock()) +def test_train_with_hyperparameters_without_job_name(tmpdir, sagemaker_session): directories = [str(tmpdir.mkdir('container-root')), str(tmpdir.mkdir('data'))] with patch('sagemaker.local.image._SageMakerContainer._create_tmp_folder', side_effect=directories): - instance_count = 2 image = 'my-image' - sagemaker_container = _SageMakerContainer('local', instance_count, image, sagemaker_session=LocalSession) - sagemaker_container.train(INPUT_DATA_CONFIG, HYPERPARAMETERS, TRAINING_JOB_NAME) + sagemaker_container = _SageMakerContainer('local', instance_count, image, sagemaker_session=sagemaker_session) + sagemaker_container.train( + INPUT_DATA_CONFIG, OUTPUT_DATA_CONFIG, HYPERPARAMETERS, TRAINING_JOB_NAME) docker_compose_file = os.path.join(sagemaker_container.container_root, 'docker-compose.yaml') @@ -283,12 +290,12 @@ def test_train_with_hyperparameters_without_job_name(download_folder, _cleanup, assert 'TRAINING_JOB_NAME={}'.format(TRAINING_JOB_NAME) in config['services'][h]['environment'] -@patch('sagemaker.local.local_session.LocalSession') +@patch('sagemaker.local.local_session.LocalSession', Mock()) @patch('sagemaker.local.image._stream_output', side_effect=RuntimeError('this is expected')) -@patch('subprocess.Popen') -@patch('sagemaker.local.image._SageMakerContainer._cleanup') -@patch('sagemaker.utils.download_folder') -def test_train_error(download_folder, _cleanup, popen, _stream_output, LocalSession, tmpdir, sagemaker_session): +@patch('sagemaker.local.image._SageMakerContainer._cleanup', Mock()) +@patch('sagemaker.local.data.get_data_source_instance', Mock()) +@patch('subprocess.Popen', Mock()) +def test_train_error(_stream_output, tmpdir, sagemaker_session): directories = [str(tmpdir.mkdir('container-root')), str(tmpdir.mkdir('data'))] with patch('sagemaker.local.image._SageMakerContainer._create_tmp_folder', side_effect=directories): @@ -297,18 +304,18 @@ def test_train_error(download_folder, _cleanup, popen, _stream_output, LocalSess sagemaker_container = _SageMakerContainer('local', instance_count, image, sagemaker_session=sagemaker_session) with pytest.raises(RuntimeError) as e: - sagemaker_container.train(INPUT_DATA_CONFIG, HYPERPARAMETERS, TRAINING_JOB_NAME) + sagemaker_container.train( + INPUT_DATA_CONFIG, OUTPUT_DATA_CONFIG, HYPERPARAMETERS, TRAINING_JOB_NAME) assert 'this is expected' in str(e) -@patch('sagemaker.local.local_session.LocalSession') -@patch('sagemaker.local.image._stream_output') -@patch('subprocess.Popen') -@patch('sagemaker.local.image._SageMakerContainer._cleanup') -@patch('sagemaker.utils.download_folder') -def test_train_local_code(download_folder, _cleanup, popen, _stream_output, - _local_session, tmpdir, sagemaker_session): +@patch('sagemaker.local.local_session.LocalSession', Mock()) +@patch('sagemaker.local.image._stream_output', Mock()) +@patch('sagemaker.local.image._SageMakerContainer._cleanup', Mock()) +@patch('sagemaker.local.data.get_data_source_instance', Mock()) +@patch('subprocess.Popen', Mock()) +def test_train_local_code(tmpdir, sagemaker_session): directories = [str(tmpdir.mkdir('container-root')), str(tmpdir.mkdir('data'))] with patch('sagemaker.local.image._SageMakerContainer._create_tmp_folder', side_effect=directories): @@ -317,7 +324,8 @@ def test_train_local_code(download_folder, _cleanup, popen, _stream_output, sagemaker_container = _SageMakerContainer('local', instance_count, image, sagemaker_session=sagemaker_session) - sagemaker_container.train(INPUT_DATA_CONFIG, LOCAL_CODE_HYPERPARAMETERS, TRAINING_JOB_NAME) + sagemaker_container.train( + INPUT_DATA_CONFIG, OUTPUT_DATA_CONFIG, LOCAL_CODE_HYPERPARAMETERS, TRAINING_JOB_NAME) docker_compose_file = os.path.join(sagemaker_container.container_root, 'docker-compose.yaml') @@ -345,7 +353,7 @@ def test_container_has_gpu_support(tmpdir, sagemaker_session): assert docker_host['runtime'] == 'nvidia' -def test_container_does_not_enable_nvidia_docker_for_cpu_containers(tmpdir, sagemaker_session): +def test_container_does_not_enable_nvidia_docker_for_cpu_containers(sagemaker_session): instance_count = 1 image = 'my-image' sagemaker_container = _SageMakerContainer('local', instance_count, image, @@ -355,14 +363,13 @@ def test_container_does_not_enable_nvidia_docker_for_cpu_containers(tmpdir, sage assert 'runtime' not in docker_host -@patch('sagemaker.local.image._HostingContainer.run') -@patch('shutil.copy') -@patch('shutil.copytree') -def test_serve(up, copy, copytree, tmpdir, sagemaker_session): - +@patch('sagemaker.local.image._HostingContainer.run', Mock()) +@patch('sagemaker.local.image._SageMakerContainer._prepare_serving_volumes', Mock(return_value=[])) +@patch('shutil.copy', Mock()) +@patch('shutil.copytree', Mock()) +def test_serve(tmpdir, sagemaker_session): with patch('sagemaker.local.image._SageMakerContainer._create_tmp_folder', return_value=str(tmpdir.mkdir('container-root'))): - image = 'my-image' sagemaker_container = _SageMakerContainer('local', 1, image, sagemaker_session=sagemaker_session) environment = { @@ -382,14 +389,13 @@ def test_serve(up, copy, copytree, tmpdir, sagemaker_session): assert config['services'][h]['command'] == 'serve' -@patch('sagemaker.local.image._HostingContainer.run') -@patch('shutil.copy') -@patch('shutil.copytree') -def test_serve_local_code(up, copy, copytree, tmpdir, sagemaker_session): - +@patch('sagemaker.local.image._HostingContainer.run', Mock()) +@patch('sagemaker.local.image._SageMakerContainer._prepare_serving_volumes', Mock(return_value=[])) +@patch('shutil.copy', Mock()) +@patch('shutil.copytree', Mock()) +def test_serve_local_code(tmpdir, sagemaker_session): with patch('sagemaker.local.image._SageMakerContainer._create_tmp_folder', return_value=str(tmpdir.mkdir('container-root'))): - image = 'my-image' sagemaker_container = _SageMakerContainer('local', 1, image, sagemaker_session=sagemaker_session) environment = { @@ -413,14 +419,13 @@ def test_serve_local_code(up, copy, copytree, tmpdir, sagemaker_session): assert '%s:/opt/ml/code' % '/tmp/code' in volumes -@patch('sagemaker.local.image._HostingContainer.run') -@patch('shutil.copy') -@patch('shutil.copytree') -def test_serve_local_code_no_env(up, copy, copytree, tmpdir, sagemaker_session): - +@patch('sagemaker.local.image._HostingContainer.run', Mock()) +@patch('sagemaker.local.image._SageMakerContainer._prepare_serving_volumes', Mock(return_value=[])) +@patch('shutil.copy', Mock()) +@patch('shutil.copytree', Mock()) +def test_serve_local_code_no_env(tmpdir, sagemaker_session): with patch('sagemaker.local.image._SageMakerContainer._create_tmp_folder', return_value=str(tmpdir.mkdir('container-root'))): - image = 'my-image' sagemaker_container = _SageMakerContainer('local', 1, image, sagemaker_session=sagemaker_session) sagemaker_container.serve('/some/model/path', {}) @@ -435,36 +440,41 @@ def test_serve_local_code_no_env(up, copy, copytree, tmpdir, sagemaker_session): assert config['services'][h]['command'] == 'serve' -@patch('sagemaker.utils.download_file') +@patch('sagemaker.local.data.get_data_source_instance') @patch('tarfile.is_tarfile') @patch('tarfile.open', MagicMock()) @patch('os.makedirs', Mock()) -def test_prepare_serving_volumes_with_s3_model(is_tarfile, download_file, sagemaker_session): - +def test_prepare_serving_volumes_with_s3_model(is_tarfile, get_data_source_instance, sagemaker_session): sagemaker_container = _SageMakerContainer('local', 1, 'some-image', sagemaker_session=sagemaker_session) sagemaker_container.container_root = '/tmp/container_root' - container_model_dir = os.path.join('/tmp/container_root/', sagemaker_container.hosts[0], 'model') + s3_data_source = Mock() + s3_data_source.get_root_dir.return_value = '/tmp/downloaded/data/' + s3_data_source.get_file_list.return_value = ['/tmp/downloaded/data/my_model.tar.gz'] + get_data_source_instance.return_value = s3_data_source is_tarfile.return_value = True volumes = sagemaker_container._prepare_serving_volumes('s3://bucket/my_model.tar.gz') - - tar_location = os.path.join(container_model_dir, 'my_model.tar.gz') - download_file.assert_called_with('bucket', '/my_model.tar.gz', tar_location, sagemaker_session) - is_tarfile.assert_called_with(tar_location) + is_tarfile.assert_called_with('/tmp/downloaded/data/my_model.tar.gz') assert len(volumes) == 1 assert volumes[0].container_dir == '/opt/ml/model' - assert volumes[0].host_dir == container_model_dir + assert volumes[0].host_dir == '/tmp/downloaded/data/' +@patch('sagemaker.local.data.get_data_source_instance') +@patch('tarfile.is_tarfile', Mock(return_value=False)) @patch('os.makedirs', Mock()) -def test_prepare_serving_volumes_with_local_model(sagemaker_session): - +def test_prepare_serving_volumes_with_local_model(get_data_source_instance, sagemaker_session): sagemaker_container = _SageMakerContainer('local', 1, 'some-image', sagemaker_session=sagemaker_session) sagemaker_container.container_root = '/tmp/container_root' - volumes = sagemaker_container._prepare_serving_volumes('/path/to/my_model') + local_file_data_source = Mock() + local_file_data_source.get_root_dir.return_value = '/path/to/my_model' + local_file_data_source.get_file_list.return_value = ['/path/to/my_model/model'] + get_data_source_instance.return_value = local_file_data_source + + volumes = sagemaker_container._prepare_serving_volumes('file:///path/to/my_model') assert len(volumes) == 1 assert volumes[0].container_dir == '/opt/ml/model' diff --git a/tests/unit/test_local_entities.py b/tests/unit/test_local_entities.py index 86180951dd..6261031bdf 100644 --- a/tests/unit/test_local_entities.py +++ b/tests/unit/test_local_entities.py @@ -157,7 +157,7 @@ def test_local_transform_job_perform_batch_inference(get_config_value, move_to_d local_transform_job._perform_batch_inference(input_data, output_data, **transform_kwargs) - dir, output, session = move_to_destination.call_args[0] + dir, output, job_name, session = move_to_destination.call_args[0] assert output == 's3://bucket/output' output_files = os.listdir(dir) assert len(output_files) == 2 diff --git a/tests/unit/test_local_utils.py b/tests/unit/test_local_utils.py index a4f2bb35fb..5f48b68602 100644 --- a/tests/unit/test_local_utils.py +++ b/tests/unit/test_local_utils.py @@ -15,7 +15,7 @@ import pytest from mock import patch, Mock -import sagemaker +import sagemaker.local.utils BUCKET_NAME = 'some-nice-bucket' @@ -24,14 +24,15 @@ @patch('sagemaker.local.utils.recursive_copy') def test_move_to_destination(recursive_copy): # local files will just be recursive copied - sagemaker.local.utils.move_to_destination('/tmp/data', 'file:///target/dir/', None) + sagemaker.local.utils.move_to_destination('/tmp/data', 'file:///target/dir/', 'job', None) recursive_copy.assert_called() # s3 destination will upload to S3 sms = Mock() - sagemaker.local.utils.move_to_destination('/tmp/data', 's3://bucket/path', sms) + sagemaker.local.utils.move_to_destination('/tmp/data', 's3://bucket/path', 'job', sms) sms.upload_data.assert_called() - # weird destination, should raise an exception + +def test_move_to_destination_illegal_destination(): with pytest.raises(ValueError): - sagemaker.local.utils.move_to_destination('/tmp/data', 'ftp://ftp/in/2018', None) + sagemaker.local.utils.move_to_destination('/tmp/data', 'ftp://ftp/in/2018', 'job', None) diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index 14dcff4a96..0a1521fb54 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -252,3 +252,24 @@ def test_download_file(): '/tmp/file.tar.gz', session) bucket_mock.download_file.assert_called_with('prefix/path/file.tar.gz', '/tmp/file.tar.gz') + + +@patch('tarfile.open') +def test_create_tar_file_with_provided_path(open): + open.return_value = open + open.__enter__ = Mock() + open.__exit__ = Mock(return_value=None) + file_list = ['/tmp/a', '/tmp/b'] + path = sagemaker.utils.create_tar_file(file_list, target='/my/custom/path.tar.gz') + assert path == '/my/custom/path.tar.gz' + + +@patch('tarfile.open') +@patch('tempfile.mkstemp', Mock(return_value=(None, '/auto/generated/path'))) +def test_create_tar_file_with_auto_generated_path(open): + open.return_value = open + open.__enter__ = Mock() + open.__exit__ = Mock(return_value=None) + file_list = ['/tmp/a', '/tmp/b'] + path = sagemaker.utils.create_tar_file(file_list) + assert path == '/auto/generated/path' diff --git a/tox.ini b/tox.ini index 45847d1024..9660734fd0 100644 --- a/tox.ini +++ b/tox.ini @@ -39,7 +39,8 @@ ignore = FI54, FI55, FI56, - FI57 + FI57, + W503 require-code = True From 6c30025fb61e810fec937b23a10d6d4113d62325 Mon Sep 17 00:00:00 2001 From: Ignacio Quintero Date: Fri, 2 Nov 2018 10:43:36 -0700 Subject: [PATCH 2/3] Remove stray print statements --- src/sagemaker/session.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/sagemaker/session.py b/src/sagemaker/session.py index f339e64f58..66e3d8ec2f 100644 --- a/src/sagemaker/session.py +++ b/src/sagemaker/session.py @@ -134,7 +134,6 @@ def upload_data(self, path, bucket=None, key_prefix='data'): files = [] key_suffix = None if os.path.isdir(path): - print('%s is dir!' % path) for dirpath, dirnames, filenames in os.walk(path): for name in filenames: local_path = os.path.join(dirpath, name) @@ -151,7 +150,6 @@ def upload_data(self, path, bucket=None, key_prefix='data'): s3 = self.boto_session.resource('s3') for local_path, s3_key in files: - print("%s -> %s" % (local_path, s3_key)) s3.Object(bucket, s3_key).upload_file(local_path) s3_uri = 's3://{}/{}'.format(bucket, key_prefix) From f441e4d0cc02b018b21105205e30241c699223a5 Mon Sep 17 00:00:00 2001 From: Ignacio Quintero Date: Fri, 2 Nov 2018 14:45:55 -0700 Subject: [PATCH 3/3] Fix #451 while we are touching local mode train() --- src/sagemaker/local/image.py | 12 ++++++-- tests/unit/test_image.py | 54 ++++++++++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 3 deletions(-) diff --git a/src/sagemaker/local/image.py b/src/sagemaker/local/image.py index 1c3e1a9b9f..e950893e7b 100644 --- a/src/sagemaker/local/image.py +++ b/src/sagemaker/local/image.py @@ -262,9 +262,15 @@ def write_config_files(self, host, hyperparameters, input_data_config): 'hosts': self.hosts } - json_input_data_config = { - c['ChannelName']: {'ContentType': 'application/octet-stream'} for c in input_data_config - } + print(input_data_config) + json_input_data_config = {} + for c in input_data_config: + channel_name = c['ChannelName'] + json_input_data_config[channel_name] = { + 'TrainingInputMode': 'File' + } + if 'ContentType' in c: + json_input_data_config[channel_name]['ContentType'] = c['ContentType'] _write_json_file(os.path.join(config_path, 'hyperparameters.json'), hyperparameters) _write_json_file(os.path.join(config_path, 'resourceconfig.json'), resource_config) diff --git a/tests/unit/test_image.py b/tests/unit/test_image.py index 3a4da32784..1d5f62b8a2 100644 --- a/tests/unit/test_image.py +++ b/tests/unit/test_image.py @@ -126,6 +126,60 @@ def test_write_config_file(LocalSession, tmpdir): assert channel['ChannelName'] in input_data_config_data +@patch('sagemaker.local.local_session.LocalSession') +def test_write_config_files_input_content_type(LocalSession, tmpdir): + sagemaker_container = _SageMakerContainer('local', 1, 'my-image') + sagemaker_container.container_root = str(tmpdir.mkdir('container-root')) + host = 'algo-1' + + sagemaker.local.image._create_config_file_directories(sagemaker_container.container_root, host) + + container_root = sagemaker_container.container_root + config_file_root = os.path.join(container_root, host, 'input', 'config') + + input_data_config_file = os.path.join(config_file_root, 'inputdataconfig.json') + + # write the config files, and then lets check they exist and have the right content. + input_data_config = [ + { + 'ChannelName': 'channel_a', + 'DataUri': 'file:///tmp/source1', + 'ContentType': 'text/csv', + 'DataSource': { + 'FileDataSource': { + 'FileDataDistributionType': 'FullyReplicated', + 'FileUri': 'file:///tmp/source1' + } + } + }, + { + 'ChannelName': 'channel_b', + 'DataUri': 's3://my-own-bucket/prefix', + 'DataSource': { + 'S3DataSource': { + 'S3DataDistributionType': 'FullyReplicated', + 'S3DataType': 'S3Prefix', + 'S3Uri': 's3://my-own-bucket/prefix' + } + } + } + ] + sagemaker_container.write_config_files(host, HYPERPARAMETERS, input_data_config) + + assert os.path.exists(input_data_config_file) + parsed_input_config = json.load(open(input_data_config_file)) + # Validate Input Data Config + for channel in input_data_config: + assert channel['ChannelName'] in parsed_input_config + + # Channel A has a content type + assert 'ContentType' in parsed_input_config['channel_a'] + assert parsed_input_config['channel_a']['ContentType'] == 'text/csv' + + # Channel B does not have content type + assert 'ContentType' not in parsed_input_config['channel_b'] + + @patch('sagemaker.local.local_session.LocalSession') def test_retrieve_artifacts(LocalSession, tmpdir): sagemaker_container = _SageMakerContainer('local', 2, 'my-image')