diff --git a/CHANGELOG.rst b/CHANGELOG.rst index dd29ed26fa..a8900784e6 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -2,11 +2,13 @@ CHANGELOG ========= -1.13.0 -====== +======== +1.13.dev +======== * feature: Estimator: add input mode to training channels * feature: Estimator: add model_uri and model_channel_name parameters +* enhancement: Local Mode: support output_path. Can be either file:// or s3:// 1.12.0 ====== diff --git a/src/sagemaker/estimator.py b/src/sagemaker/estimator.py index 6e087abe8c..11b1ca6051 100644 --- a/src/sagemaker/estimator.py +++ b/src/sagemaker/estimator.py @@ -118,6 +118,9 @@ def __init__(self, role, train_instance_count, train_instance_type, self.base_job_name = base_job_name self._current_job_name = None + if (not self.sagemaker_session.local_mode + and output_path and output_path.startswith('file://')): + raise RuntimeError('file:// output paths are only supported in Local Mode') self.output_path = output_path self.output_kms_key = output_kms_key self.latest_training_job = None diff --git a/src/sagemaker/fw_utils.py b/src/sagemaker/fw_utils.py index 5c3d28245d..b35c0ca041 100644 --- a/src/sagemaker/fw_utils.py +++ b/src/sagemaker/fw_utils.py @@ -14,12 +14,10 @@ import os import re -import tarfile -import tempfile from collections import namedtuple from six.moves.urllib.parse import urlparse -from sagemaker.utils import name_from_image +import sagemaker.utils """This module contains utility functions shared across ``Framework`` components.""" @@ -128,14 +126,9 @@ def tar_and_upload_dir(session, bucket, s3_key_prefix, script, directory): s3 = session.resource('s3') key = '{}/{}'.format(s3_key_prefix, 'sourcedir.tar.gz') - with tempfile.TemporaryFile() as f: - with tarfile.open(mode='w:gz', fileobj=f) as t: - for sf in source_files: - # Add all files from the directory into the root of the directory structure of the tar - t.add(sf, arcname=os.path.basename(sf)) - # Need to reset the file descriptor position after writing to prepare for read - f.seek(0) - s3.Object(bucket, key).put(Body=f) + tar_file = sagemaker.utils.create_tar_file(source_files) + s3.Object(bucket, key).upload_file(tar_file) + os.remove(tar_file) return UploadedCode(s3_prefix='s3://{}/{}'.format(bucket, key), script_name=script_name) @@ -226,7 +219,8 @@ def model_code_key_prefix(code_location_key_prefix, model_name, image): Returns: str: the key prefix to be used in uploading code """ - return '/'.join(filter(None, [code_location_key_prefix, model_name or name_from_image(image)])) + training_job_name = sagemaker.utils.name_from_image(image) + return '/'.join(filter(None, [code_location_key_prefix, model_name or training_job_name])) def empty_framework_version_warning(default_version): diff --git a/src/sagemaker/local/data.py b/src/sagemaker/local/data.py index 6c90467233..5baa900891 100644 --- a/src/sagemaker/local/data.py +++ b/src/sagemaker/local/data.py @@ -13,6 +13,7 @@ from __future__ import absolute_import import os +import platform import sys import tempfile from abc import ABCMeta @@ -162,6 +163,12 @@ def __init__(self, bucket, prefix, sagemaker_session): root_dir = os.path.abspath(root_dir) working_dir = tempfile.mkdtemp(dir=root_dir) + # Docker cannot mount Mac OS /var folder properly see + # https://forums.docker.com/t/var-folders-isnt-mounted-properly/9600 + # Only apply this workaround if the user didn't provide an alternate storage root dir. + if root_dir is None and platform.system() == 'Darwin': + working_dir = '/private{}'.format(working_dir) + sagemaker.utils.download_folder(bucket, prefix, working_dir, sagemaker_session) self.files = LocalFileDataSource(working_dir) diff --git a/src/sagemaker/local/entities.py b/src/sagemaker/local/entities.py index 199db7d5cb..6d7a4c7a4a 100644 --- a/src/sagemaker/local/entities.py +++ b/src/sagemaker/local/entities.py @@ -46,15 +46,20 @@ def __init__(self, container): self.start_time = None self.end_time = None - def start(self, input_data_config, hyperparameters, job_name): + def start(self, input_data_config, output_data_config, hyperparameters, job_name): for channel in input_data_config: if channel['DataSource'] and 'S3DataSource' in channel['DataSource']: data_distribution = channel['DataSource']['S3DataSource']['S3DataDistributionType'] + data_uri = channel['DataSource']['S3DataSource']['S3Uri'] elif channel['DataSource'] and 'FileDataSource' in channel['DataSource']: data_distribution = channel['DataSource']['FileDataSource']['FileDataDistributionType'] + data_uri = channel['DataSource']['FileDataSource']['FileUri'] else: raise ValueError('Need channel[\'DataSource\'] to have [\'S3DataSource\'] or [\'FileDataSource\']') + # use a single Data URI - this makes handling S3 and File Data easier down the stack + channel['DataUri'] = data_uri + if data_distribution != 'FullyReplicated': raise RuntimeError('DataDistribution: %s is not currently supported in Local Mode' % data_distribution) @@ -62,7 +67,7 @@ def start(self, input_data_config, hyperparameters, job_name): self.start = datetime.datetime.now() self.state = self._TRAINING - self.model_artifacts = self.container.train(input_data_config, hyperparameters, job_name) + self.model_artifacts = self.container.train(input_data_config, output_data_config, hyperparameters, job_name) self.end = datetime.datetime.now() self.state = self._COMPLETED @@ -298,7 +303,7 @@ def _perform_batch_inference(self, input_data, output_data, **kwargs): if 'AssembleWith' in output_data and output_data['AssembleWith'] == 'Line': f.write(b'\n') - move_to_destination(working_dir, output_data['S3OutputPath'], self.local_session) + move_to_destination(working_dir, output_data['S3OutputPath'], self.name, self.local_session) self.container.stop_serving() diff --git a/src/sagemaker/local/image.py b/src/sagemaker/local/image.py index 67f488fae4..e950893e7b 100644 --- a/src/sagemaker/local/image.py +++ b/src/sagemaker/local/image.py @@ -33,6 +33,9 @@ import yaml import sagemaker +import sagemaker.local.data +import sagemaker.local.utils +import sagemaker.utils CONTAINER_PREFIX = 'algo' DOCKER_COMPOSE_FILENAME = 'docker-compose.yaml' @@ -78,7 +81,7 @@ def __init__(self, instance_type, instance_count, image, sagemaker_session=None) self.container_root = None self.container = None - def train(self, input_data_config, hyperparameters, job_name): + def train(self, input_data_config, output_data_config, hyperparameters, job_name): """Run a training job locally using docker-compose. Args: input_data_config (dict): The Input Data Configuration, this contains data such as the @@ -126,23 +129,17 @@ def train(self, input_data_config, hyperparameters, job_name): msg = "Failed to run: %s, %s" % (compose_command, str(e)) raise RuntimeError(msg) - s3_artifacts = self.retrieve_artifacts(compose_data) + artifacts = self.retrieve_artifacts(compose_data, output_data_config, job_name) # free up the training data directory as it may contain # lots of data downloaded from S3. This doesn't delete any local # data that was just mounted to the container. - _delete_tree(data_dir) - _delete_tree(shared_dir) - # Also free the container config files. - for host in self.hosts: - container_config_path = os.path.join(self.container_root, host) - _delete_tree(container_config_path) - - self._cleanup() - # Print our Job Complete line to have a simmilar experience to training on SageMaker where you + dirs_to_delete = [data_dir, shared_dir] + self._cleanup(dirs_to_delete) + # Print our Job Complete line to have a similar experience to training on SageMaker where you # see this line at the end. print('===== Job Complete =====') - return s3_artifacts + return artifacts def serve(self, model_dir, environment): """Host a local endpoint using docker-compose. @@ -188,7 +185,7 @@ def stop_serving(self): # for serving we can delete everything in the container root. _delete_tree(self.container_root) - def retrieve_artifacts(self, compose_data): + def retrieve_artifacts(self, compose_data, output_data_config, job_name): """Get the model artifacts from all the container nodes. Used after training completes to gather the data from all the individual containers. As the @@ -201,26 +198,49 @@ def retrieve_artifacts(self, compose_data): Returns: Local path to the collected model artifacts. """ - # Grab the model artifacts from all the Nodes. - s3_artifacts = os.path.join(self.container_root, 's3_artifacts') - os.mkdir(s3_artifacts) + # We need a directory to store the artfiacts from all the nodes + # and another one to contained the compressed final artifacts + artifacts = os.path.join(self.container_root, 'artifacts') + compressed_artifacts = os.path.join(self.container_root, 'compressed_artifacts') + os.mkdir(artifacts) + + model_artifacts = os.path.join(artifacts, 'model') + output_artifacts = os.path.join(artifacts, 'output') - s3_model_artifacts = os.path.join(s3_artifacts, 'model') - s3_output_artifacts = os.path.join(s3_artifacts, 'output') - os.mkdir(s3_model_artifacts) - os.mkdir(s3_output_artifacts) + artifact_dirs = [model_artifacts, output_artifacts, compressed_artifacts] + for d in artifact_dirs: + os.mkdir(d) + # Gather the artifacts from all nodes into artifacts/model and artifacts/output for host in self.hosts: volumes = compose_data['services'][str(host)]['volumes'] - for volume in volumes: host_dir, container_dir = volume.split(':') if container_dir == '/opt/ml/model': - sagemaker.local.utils.recursive_copy(host_dir, s3_model_artifacts) + sagemaker.local.utils.recursive_copy(host_dir, model_artifacts) elif container_dir == '/opt/ml/output': - sagemaker.local.utils.recursive_copy(host_dir, s3_output_artifacts) + sagemaker.local.utils.recursive_copy(host_dir, output_artifacts) - return s3_model_artifacts + # Tar Artifacts -> model.tar.gz and output.tar.gz + model_files = [os.path.join(model_artifacts, name) for name in os.listdir(model_artifacts)] + output_files = [os.path.join(output_artifacts, name) for name in os.listdir(output_artifacts)] + sagemaker.utils.create_tar_file(model_files, os.path.join(compressed_artifacts, 'model.tar.gz')) + sagemaker.utils.create_tar_file(output_files, os.path.join(compressed_artifacts, 'output.tar.gz')) + + if output_data_config['S3OutputPath'] == '': + output_data = 'file://%s' % compressed_artifacts + else: + # Now we just need to move the compressed artifacts to wherever they are required + output_data = sagemaker.local.utils.move_to_destination( + compressed_artifacts, + output_data_config['S3OutputPath'], + job_name, + self.sagemaker_session) + + _delete_tree(model_artifacts) + _delete_tree(output_artifacts) + + return os.path.join(output_data, 'model.tar.gz') def write_config_files(self, host, hyperparameters, input_data_config): """Write the config files for the training containers. @@ -235,7 +255,6 @@ def write_config_files(self, host, hyperparameters, input_data_config): Returns: None """ - config_path = os.path.join(self.container_root, host, 'input', 'config') resource_config = { @@ -243,9 +262,15 @@ def write_config_files(self, host, hyperparameters, input_data_config): 'hosts': self.hosts } - json_input_data_config = { - c['ChannelName']: {'ContentType': 'application/octet-stream'} for c in input_data_config - } + print(input_data_config) + json_input_data_config = {} + for c in input_data_config: + channel_name = c['ChannelName'] + json_input_data_config[channel_name] = { + 'TrainingInputMode': 'File' + } + if 'ContentType' in c: + json_input_data_config[channel_name]['ContentType'] = c['ContentType'] _write_json_file(os.path.join(config_path, 'hyperparameters.json'), hyperparameters) _write_json_file(os.path.join(config_path, 'resourceconfig.json'), resource_config) @@ -261,29 +286,13 @@ def _prepare_training_volumes(self, data_dir, input_data_config, hyperparameters # mount the local directory to the container. For S3 Data we will download the S3 data # first. for channel in input_data_config: - if channel['DataSource'] and 'S3DataSource' in channel['DataSource']: - uri = channel['DataSource']['S3DataSource']['S3Uri'] - elif channel['DataSource'] and 'FileDataSource' in channel['DataSource']: - uri = channel['DataSource']['FileDataSource']['FileUri'] - else: - raise ValueError('Need channel[\'DataSource\'] to have' - ' [\'S3DataSource\'] or [\'FileDataSource\']') - - parsed_uri = urlparse(uri) - key = parsed_uri.path.lstrip('/') - + uri = channel['DataUri'] channel_name = channel['ChannelName'] channel_dir = os.path.join(data_dir, channel_name) os.mkdir(channel_dir) - if parsed_uri.scheme == 's3': - bucket_name = parsed_uri.netloc - sagemaker.utils.download_folder(bucket_name, key, channel_dir, self.sagemaker_session) - elif parsed_uri.scheme == 'file': - path = parsed_uri.path - volumes.append(_Volume(path, channel=channel_name)) - else: - raise ValueError('Unknown URI scheme {}'.format(parsed_uri.scheme)) + data_source = sagemaker.local.data.get_data_source_instance(uri, self.sagemaker_session) + volumes.append(_Volume(data_source.get_root_dir(), channel=channel_name)) # If there is a training script directory and it is a local directory, # mount it to the container. @@ -301,25 +310,20 @@ def _prepare_serving_volumes(self, model_location): volumes = [] host = self.hosts[0] # Make the model available to the container. If this is a local file just mount it to - # the container as a volume. If it is an S3 location download it and extract the tar file. + # the container as a volume. If it is an S3 location, the DataSource will download it, we + # just need to extract the tar file. host_dir = os.path.join(self.container_root, host) os.makedirs(host_dir) - if model_location.startswith('s3'): - container_model_dir = os.path.join(self.container_root, host, 'model') - os.makedirs(container_model_dir) + model_data_source = sagemaker.local.data.get_data_source_instance( + model_location, self.sagemaker_session) - parsed_uri = urlparse(model_location) - filename = os.path.basename(parsed_uri.path) - tar_location = os.path.join(container_model_dir, filename) - sagemaker.utils.download_file(parsed_uri.netloc, parsed_uri.path, tar_location, self.sagemaker_session) + for filename in model_data_source.get_file_list(): + if tarfile.is_tarfile(filename): + with tarfile.open(filename) as tar: + tar.extractall(path=model_data_source.get_root_dir()) - if tarfile.is_tarfile(tar_location): - with tarfile.open(tar_location) as tar: - tar.extractall(path=container_model_dir) - volumes.append(_Volume(container_model_dir, '/opt/ml/model')) - else: - volumes.append(_Volume(model_location, '/opt/ml/model')) + volumes.append(_Volume(model_data_source.get_root_dir(), '/opt/ml/model')) return volumes @@ -368,7 +372,6 @@ def _generate_compose_file(self, command, additional_volumes=None, additional_en 'networks': { 'sagemaker-local': {'name': 'sagemaker-local'} } - } docker_compose_path = os.path.join(self.container_root, DOCKER_COMPOSE_FILENAME) @@ -469,9 +472,15 @@ def _build_optml_volumes(self, host, subdirs): return volumes - def _cleanup(self): - # we don't need to cleanup anything at the moment - pass + def _cleanup(self, dirs_to_delete=None): + if dirs_to_delete: + for d in dirs_to_delete: + _delete_tree(d) + + # Free the container config files. + for host in self.hosts: + container_config_path = os.path.join(self.container_root, host) + _delete_tree(container_config_path) class _HostingContainer(Thread): @@ -610,7 +619,7 @@ def _aws_credentials(session): 'AWS_SECRET_ACCESS_KEY=%s' % (str(secret_key)) ] elif not _aws_credentials_available_in_metadata_service(): - logger.warn("Using the short-lived AWS credentials found in session. They might expire while running.") + logger.warning("Using the short-lived AWS credentials found in session. They might expire while running.") return [ 'AWS_ACCESS_KEY_ID=%s' % (str(access_key)), 'AWS_SECRET_ACCESS_KEY=%s' % (str(secret_key)), diff --git a/src/sagemaker/local/local_session.py b/src/sagemaker/local/local_session.py index 8361e18dab..134d90ffe3 100644 --- a/src/sagemaker/local/local_session.py +++ b/src/sagemaker/local/local_session.py @@ -71,7 +71,7 @@ def create_training_job(self, TrainingJobName, AlgorithmSpecification, InputData AlgorithmSpecification['TrainingImage'], self.sagemaker_session) training_job = _LocalTrainingJob(container) hyperparameters = kwargs['HyperParameters'] if 'HyperParameters' in kwargs else {} - training_job.start(InputDataConfig, hyperparameters, TrainingJobName) + training_job.start(InputDataConfig, OutputDataConfig, hyperparameters, TrainingJobName) LocalSagemakerClient._training_jobs[TrainingJobName] = training_job diff --git a/src/sagemaker/local/utils.py b/src/sagemaker/local/utils.py index 355c4f77d2..a79f36df78 100644 --- a/src/sagemaker/local/utils.py +++ b/src/sagemaker/local/utils.py @@ -39,25 +39,31 @@ def copy_directory_structure(destination_directory, relative_path): os.makedirs(destination_directory, relative_path) -def move_to_destination(source, destination, sagemaker_session): +def move_to_destination(source, destination, job_name, sagemaker_session): """move source to destination. Can handle uploading to S3 Args: source (str): root directory to move destination (str): file:// or s3:// URI that source will be moved to. sagemaker_session (sagemaker.Session): a sagemaker_session to interact with S3 if needed + + Returns: + (str): destination URI """ parsed_uri = urlparse(destination) if parsed_uri.scheme == 'file': recursive_copy(source, parsed_uri.path) + final_uri = destination elif parsed_uri.scheme == 's3': bucket = parsed_uri.netloc - path = parsed_uri.path.strip('/') + path = "%s%s" % (parsed_uri.path.lstrip('/'), job_name) + final_uri = 's3://%s/%s' % (bucket, path) sagemaker_session.upload_data(source, bucket, path) else: raise ValueError('Invalid destination URI, must be s3:// or file://, got: %s' % destination) shutil.rmtree(source) + return final_uri def recursive_copy(source, destination): diff --git a/src/sagemaker/utils.py b/src/sagemaker/utils.py index 160b78a1a4..9cd4bc0296 100644 --- a/src/sagemaker/utils.py +++ b/src/sagemaker/utils.py @@ -16,6 +16,8 @@ import os import re import sys +import tarfile +import tempfile import time from datetime import datetime @@ -242,6 +244,28 @@ def download_folder(bucket_name, prefix, target, sagemaker_session): obj.download_file(file_path) +def create_tar_file(source_files, target=None): + """Create a tar file containing all the source_files + + Args: + source_files (List[str]): List of file paths that will be contained in the tar file + + Returns: + (str): path to created tar file + + """ + if target: + filename = target + else: + _, filename = tempfile.mkstemp() + + with tarfile.open(filename, mode='w:gz') as t: + for sf in source_files: + # Add all files from the directory into the root of the directory structure of the tar + t.add(sf, arcname=os.path.basename(sf)) + return filename + + def download_file(bucket_name, path, target, sagemaker_session): """Download a Single File from S3 into a local path diff --git a/tests/integ/test_local_mode.py b/tests/integ/test_local_mode.py index ace201a7fa..b51c844c8e 100644 --- a/tests/integ/test_local_mode.py +++ b/tests/integ/test_local_mode.py @@ -21,9 +21,8 @@ import pytest from sagemaker.local import LocalSession, LocalSagemakerRuntimeClient, LocalSagemakerClient -from sagemaker.mxnet import MXNet, MXNetModel +from sagemaker.mxnet import MXNet from sagemaker.tensorflow import TensorFlow -from sagemaker.fw_utils import tar_and_upload_dir from tests.integ import DATA_DIR, PYTHON_VERSION from tests.integ.timeout import timeout @@ -58,21 +57,24 @@ def _initialize(self, boto_session, sagemaker_client, sagemaker_runtime_client): @pytest.fixture(scope='module') def mxnet_model(sagemaker_local_session): - script_path = os.path.join(DATA_DIR, 'mxnet_mnist', 'mnist.py') - data_path = os.path.join(DATA_DIR, 'mxnet_mnist') + def _create_model(output_path): + script_path = os.path.join(DATA_DIR, 'mxnet_mnist', 'mnist.py') + data_path = os.path.join(DATA_DIR, 'mxnet_mnist') - mx = MXNet(entry_point=script_path, role='SageMakerRole', - train_instance_count=1, train_instance_type='local', - sagemaker_session=sagemaker_local_session) + mx = MXNet(entry_point=script_path, role='SageMakerRole', + train_instance_count=1, train_instance_type='local', + output_path=output_path, + sagemaker_session=sagemaker_local_session) - train_input = mx.sagemaker_session.upload_data(path=os.path.join(data_path, 'train'), - key_prefix='integ-test-data/mxnet_mnist/train') - test_input = mx.sagemaker_session.upload_data(path=os.path.join(data_path, 'test'), - key_prefix='integ-test-data/mxnet_mnist/test') + train_input = mx.sagemaker_session.upload_data(path=os.path.join(data_path, 'train'), + key_prefix='integ-test-data/mxnet_mnist/train') + test_input = mx.sagemaker_session.upload_data(path=os.path.join(data_path, 'test'), + key_prefix='integ-test-data/mxnet_mnist/test') - mx.fit({'train': train_input, 'test': test_input}) - model = mx.create_model(1) - return model + mx.fit({'train': train_input, 'test': test_input}) + model = mx.create_model(1) + return model + return _create_model @pytest.mark.skipif(PYTHON_VERSION != 'py2', reason="TensorFlow image supports only python 2.") @@ -259,15 +261,9 @@ def test_local_mode_serving_from_s3_model(sagemaker_local_session, mxnet_model): local_mode_lock_fd = open(LOCK_PATH, 'w') local_mode_lock = local_mode_lock_fd.fileno() - model_data = mxnet_model.model_data - boto_session = sagemaker_local_session.boto_session - default_bucket = sagemaker_local_session.default_bucket() - uploaded_data = tar_and_upload_dir(boto_session, default_bucket, - 'test_mxnet_local_mode', '', model_data) - - s3_model = MXNetModel(model_data=uploaded_data.s3_prefix, role='SageMakerRole', - entry_point=mxnet_model.entry_point, image=mxnet_model.image, - sagemaker_session=sagemaker_local_session) + path = 's3://%s' % sagemaker_local_session.default_bucket() + s3_model = mxnet_model(path) + s3_model.sagemaker_session = sagemaker_local_session predictor = None try: @@ -285,7 +281,7 @@ def test_local_mode_serving_from_s3_model(sagemaker_local_session, mxnet_model): fcntl.lockf(local_mode_lock, fcntl.LOCK_UN) -def test_local_mode_serving_from_local_model(sagemaker_local_session, mxnet_model): +def test_local_mode_serving_from_local_model(tmpdir, sagemaker_local_session, mxnet_model): local_mode_lock_fd = open(LOCK_PATH, 'w') local_mode_lock = local_mode_lock_fd.fileno() predictor = None @@ -295,8 +291,10 @@ def test_local_mode_serving_from_local_model(sagemaker_local_session, mxnet_mode # to allow concurrent test execution. The serving test is really fast so it still # makes sense to allow this behavior. fcntl.lockf(local_mode_lock, fcntl.LOCK_EX) - mxnet_model.sagemaker_session = sagemaker_local_session - predictor = mxnet_model.deploy(initial_instance_count=1, instance_type='local') + path = 'file://%s' % (str(tmpdir)) + model = mxnet_model(path) + model.sagemaker_session = sagemaker_local_session + predictor = model.deploy(initial_instance_count=1, instance_type='local') data = numpy.zeros(shape=(1, 1, 28, 28)) predictor.predict(data) finally: diff --git a/tests/unit/test_estimator.py b/tests/unit/test_estimator.py index 73c6fd2241..a1de343b3c 100644 --- a/tests/unit/test_estimator.py +++ b/tests/unit/test_estimator.py @@ -914,4 +914,24 @@ def test_distributed_gpu_local_mode(LocalSession): with pytest.raises(RuntimeError): Estimator(IMAGE_NAME, ROLE, 3, 'local_gpu', output_path=OUTPUT_PATH) + +@patch('sagemaker.estimator.LocalSession') +def test_local_mode_file_output_path(local_session_class): + local_session = Mock() + local_session.local_mode = True + local_session_class.return_value = local_session + + e = Estimator(IMAGE_NAME, ROLE, INSTANCE_COUNT, 'local', output_path='file:///tmp/model/') + assert e.output_path == 'file:///tmp/model/' + + +@patch('sagemaker.estimator.Session') +def test_file_output_path_not_supported_outside_local_mode(session_class): + session = Mock() + session.local_mode = False + session_class.return_value = session + + with pytest.raises(RuntimeError): + Estimator(IMAGE_NAME, ROLE, INSTANCE_COUNT, INSTANCE_TYPE, output_path='file:///tmp/model') + ################################################################################# diff --git a/tests/unit/test_image.py b/tests/unit/test_image.py index ac4cab8ae2..1d5f62b8a2 100644 --- a/tests/unit/test_image.py +++ b/tests/unit/test_image.py @@ -22,10 +22,11 @@ import json import os import subprocess +import tarfile import pytest import yaml -from mock import call, patch, Mock, MagicMock +from mock import patch, Mock, MagicMock import sagemaker from sagemaker.local.image import _SageMakerContainer, _aws_credentials @@ -37,6 +38,7 @@ INPUT_DATA_CONFIG = [ { 'ChannelName': 'a', + 'DataUri': 'file:///tmp/source1', 'DataSource': { 'FileDataSource': { 'FileDataDistributionType': 'FullyReplicated', @@ -46,6 +48,7 @@ }, { 'ChannelName': 'b', + 'DataUri': 's3://my-own-bucket/prefix', 'DataSource': { 'S3DataSource': { 'S3DataDistributionType': 'FullyReplicated', @@ -55,11 +58,15 @@ } } ] + +OUTPUT_DATA_CONFIG = { + 'S3OutputPath': '' +} + HYPERPARAMETERS = {'a': 1, 'b': json.dumps('bee'), 'sagemaker_submit_directory': json.dumps('s3://my_bucket/code')} - LOCAL_CODE_HYPERPARAMETERS = {'a': 1, 'b': 2, 'sagemaker_submit_directory': json.dumps('file:///tmp/code')} @@ -81,7 +88,6 @@ def sagemaker_session(): @patch('sagemaker.local.local_session.LocalSession') def test_write_config_file(LocalSession, tmpdir): - sagemaker_container = _SageMakerContainer('local', 2, 'my-image') sagemaker_container.container_root = str(tmpdir.mkdir('container-root')) host = "algo-1" @@ -120,6 +126,60 @@ def test_write_config_file(LocalSession, tmpdir): assert channel['ChannelName'] in input_data_config_data +@patch('sagemaker.local.local_session.LocalSession') +def test_write_config_files_input_content_type(LocalSession, tmpdir): + sagemaker_container = _SageMakerContainer('local', 1, 'my-image') + sagemaker_container.container_root = str(tmpdir.mkdir('container-root')) + host = 'algo-1' + + sagemaker.local.image._create_config_file_directories(sagemaker_container.container_root, host) + + container_root = sagemaker_container.container_root + config_file_root = os.path.join(container_root, host, 'input', 'config') + + input_data_config_file = os.path.join(config_file_root, 'inputdataconfig.json') + + # write the config files, and then lets check they exist and have the right content. + input_data_config = [ + { + 'ChannelName': 'channel_a', + 'DataUri': 'file:///tmp/source1', + 'ContentType': 'text/csv', + 'DataSource': { + 'FileDataSource': { + 'FileDataDistributionType': 'FullyReplicated', + 'FileUri': 'file:///tmp/source1' + } + } + }, + { + 'ChannelName': 'channel_b', + 'DataUri': 's3://my-own-bucket/prefix', + 'DataSource': { + 'S3DataSource': { + 'S3DataDistributionType': 'FullyReplicated', + 'S3DataType': 'S3Prefix', + 'S3Uri': 's3://my-own-bucket/prefix' + } + } + } + ] + sagemaker_container.write_config_files(host, HYPERPARAMETERS, input_data_config) + + assert os.path.exists(input_data_config_file) + parsed_input_config = json.load(open(input_data_config_file)) + # Validate Input Data Config + for channel in input_data_config: + assert channel['ChannelName'] in parsed_input_config + + # Channel A has a content type + assert 'ContentType' in parsed_input_config['channel_a'] + assert parsed_input_config['channel_a']['ContentType'] == 'text/csv' + + # Channel B does not have content type + assert 'ContentType' not in parsed_input_config['channel_b'] + + @patch('sagemaker.local.local_session.LocalSession') def test_retrieve_artifacts(LocalSession, tmpdir): sagemaker_container = _SageMakerContainer('local', 2, 'my-image') @@ -144,50 +204,59 @@ def test_retrieve_artifacts(LocalSession, tmpdir): } } - dirs1 = ['model', 'model/data'] - dirs2 = ['model', 'model/data', 'model/tmp'] - dirs3 = ['output', 'output/data'] - dirs4 = ['output', 'output/data', 'output/log'] - - files1 = ['model/data/model.json', 'model/data/variables.csv'] - files2 = ['model/data/model.json', 'model/data/variables2.csv', 'model/tmp/something-else.json'] - files3 = ['output/data/loss.json', 'output/data/accuracy.json'] - files4 = ['output/data/loss.json', 'output/data/accuracy2.json', 'output/log/warnings.txt'] - - expected = ['model', 'model/data/', 'model/data/model.json', 'model/data/variables.csv', - 'model/data/variables2.csv', 'model/tmp/something-else.json', 'output', 'output/data', 'output/log', - 'output/data/loss.json', 'output/data/accuracy.json', 'output/data/accuracy2.json', - 'output/log/warnings.txt'] - - for d in dirs1: - os.mkdir(os.path.join(volume1, d)) - for d in dirs2: - os.mkdir(os.path.join(volume2, d)) - for d in dirs3: - os.mkdir(os.path.join(volume1, d)) - for d in dirs4: - os.mkdir(os.path.join(volume2, d)) + dirs = [ + ('model', volume1), ('model/data', volume1), + ('model', volume2), ('model/data', volume2), ('model/tmp', volume2), + ('output', volume1), ('output/data', volume1), + ('output', volume2), ('output/data', volume2), ('output/log', volume2) + ] + + files = [ + ('model/data/model.json', volume1), ('model/data/variables.csv', volume1), + ('model/data/model.json', volume2), ('model/data/variables2.csv', volume2), + ('model/tmp/something-else.json', volume2), + ('output/data/loss.json', volume1), ('output/data/accuracy.json', volume1), + ('output/data/loss.json', volume2), ('output/data/accuracy2.json', volume2), + ('output/log/warnings.txt', volume2) + ] + + expected_model = ['data', 'data/model.json', 'data/variables.csv', + 'data/variables2.csv', 'tmp/something-else.json'] + expected_output = ['data', 'log', 'data/loss.json', 'data/accuracy.json', 'data/accuracy2.json', + 'log/warnings.txt'] + + for d, volume in dirs: + os.mkdir(os.path.join(volume, d)) # create all the files - for f in files1: - open(os.path.join(volume1, f), 'a').close() - for f in files2: - open(os.path.join(volume2, f), 'a').close() - for f in files3: - open(os.path.join(volume1, f), 'a').close() - for f in files4: - open(os.path.join(volume2, f), 'a').close() + for f, volume in files: + open(os.path.join(volume, f), 'a').close() - s3_model_artifacts = sagemaker_container.retrieve_artifacts(compose_data) - s3_artifacts = os.path.dirname(s3_model_artifacts) + output_path = str(tmpdir.mkdir('exported_files')) + output_data_config = { + 'S3OutputPath': 'file://%s' % output_path + } - for f in expected: - assert set(os.listdir(s3_artifacts)) == set(['model', 'output']) - assert os.path.exists(os.path.join(s3_artifacts, f)) + model_artifacts = sagemaker_container.retrieve_artifacts( + compose_data, output_data_config, sagemaker_session).replace('file://', '') + artifacts = os.path.dirname(model_artifacts) + # we have both the tar files + assert set(os.listdir(artifacts)) == {'model.tar.gz', 'output.tar.gz'} -def test_stream_output(): + # check that the tar files contain what we expect + tar = tarfile.open(os.path.join(output_path, 'model.tar.gz')) + model_tar_files = [m.name for m in tar.getmembers()] + for f in expected_model: + assert f in model_tar_files + tar = tarfile.open(os.path.join(output_path, 'output.tar.gz')) + output_tar_files = [m.name for m in tar.getmembers()] + for f in expected_output: + assert f in output_tar_files + + +def test_stream_output(): # it should raise an exception if the command fails with pytest.raises(RuntimeError): p = subprocess.Popen(['ls', '/some/unknown/path'], @@ -203,7 +272,6 @@ def test_stream_output(): def test_check_output(): - with pytest.raises(Exception): sagemaker.local.image._check_output(['ls', '/some/unknown/path']) @@ -216,14 +284,12 @@ def test_check_output(): assert output == msg -@patch('sagemaker.local.local_session.LocalSession') -@patch('sagemaker.local.image._stream_output') +@patch('sagemaker.local.local_session.LocalSession', Mock()) +@patch('sagemaker.local.image._stream_output', Mock()) +@patch('sagemaker.local.image._SageMakerContainer._cleanup', Mock()) +@patch('sagemaker.local.data.get_data_source_instance', Mock()) @patch('subprocess.Popen') -@patch('sagemaker.local.image._SageMakerContainer._cleanup') -@patch('sagemaker.utils.download_folder') -def test_train(download_folder, _cleanup, popen, _stream_output, LocalSession, - tmpdir, sagemaker_session): - +def test_train(popen, tmpdir, sagemaker_session): directories = [str(tmpdir.mkdir('container-root')), str(tmpdir.mkdir('data'))] with patch('sagemaker.local.image._SageMakerContainer._create_tmp_folder', side_effect=directories): @@ -231,14 +297,10 @@ def test_train(download_folder, _cleanup, popen, _stream_output, LocalSession, instance_count = 2 image = 'my-image' sagemaker_container = _SageMakerContainer('local', instance_count, image, sagemaker_session=sagemaker_session) - sagemaker_container.train(INPUT_DATA_CONFIG, HYPERPARAMETERS, TRAINING_JOB_NAME) - - channel_dir = os.path.join(directories[1], 'b') - download_folder_calls = [call('my-own-bucket', 'prefix', channel_dir, sagemaker_session)] - download_folder.assert_has_calls(download_folder_calls) + sagemaker_container.train( + INPUT_DATA_CONFIG, OUTPUT_DATA_CONFIG, HYPERPARAMETERS, TRAINING_JOB_NAME) docker_compose_file = os.path.join(sagemaker_container.container_root, 'docker-compose.yaml') - call_args = popen.call_args[0][0] assert call_args is not None @@ -260,20 +322,19 @@ def test_train(download_folder, _cleanup, popen, _stream_output, LocalSession, assert os.path.exists(os.path.join(sagemaker_container.container_root, 'output/data')) -@patch('sagemaker.local.local_session.LocalSession') -@patch('sagemaker.local.image._stream_output') -@patch('sagemaker.local.image._SageMakerContainer._cleanup') -@patch('sagemaker.utils.download_folder') -def test_train_with_hyperparameters_without_job_name(download_folder, _cleanup, _stream_output, LocalSession, tmpdir): - +@patch('sagemaker.local.local_session.LocalSession', Mock()) +@patch('sagemaker.local.image._stream_output', Mock()) +@patch('sagemaker.local.image._SageMakerContainer._cleanup', Mock()) +@patch('sagemaker.local.data.get_data_source_instance', Mock()) +def test_train_with_hyperparameters_without_job_name(tmpdir, sagemaker_session): directories = [str(tmpdir.mkdir('container-root')), str(tmpdir.mkdir('data'))] with patch('sagemaker.local.image._SageMakerContainer._create_tmp_folder', side_effect=directories): - instance_count = 2 image = 'my-image' - sagemaker_container = _SageMakerContainer('local', instance_count, image, sagemaker_session=LocalSession) - sagemaker_container.train(INPUT_DATA_CONFIG, HYPERPARAMETERS, TRAINING_JOB_NAME) + sagemaker_container = _SageMakerContainer('local', instance_count, image, sagemaker_session=sagemaker_session) + sagemaker_container.train( + INPUT_DATA_CONFIG, OUTPUT_DATA_CONFIG, HYPERPARAMETERS, TRAINING_JOB_NAME) docker_compose_file = os.path.join(sagemaker_container.container_root, 'docker-compose.yaml') @@ -283,12 +344,12 @@ def test_train_with_hyperparameters_without_job_name(download_folder, _cleanup, assert 'TRAINING_JOB_NAME={}'.format(TRAINING_JOB_NAME) in config['services'][h]['environment'] -@patch('sagemaker.local.local_session.LocalSession') +@patch('sagemaker.local.local_session.LocalSession', Mock()) @patch('sagemaker.local.image._stream_output', side_effect=RuntimeError('this is expected')) -@patch('subprocess.Popen') -@patch('sagemaker.local.image._SageMakerContainer._cleanup') -@patch('sagemaker.utils.download_folder') -def test_train_error(download_folder, _cleanup, popen, _stream_output, LocalSession, tmpdir, sagemaker_session): +@patch('sagemaker.local.image._SageMakerContainer._cleanup', Mock()) +@patch('sagemaker.local.data.get_data_source_instance', Mock()) +@patch('subprocess.Popen', Mock()) +def test_train_error(_stream_output, tmpdir, sagemaker_session): directories = [str(tmpdir.mkdir('container-root')), str(tmpdir.mkdir('data'))] with patch('sagemaker.local.image._SageMakerContainer._create_tmp_folder', side_effect=directories): @@ -297,18 +358,18 @@ def test_train_error(download_folder, _cleanup, popen, _stream_output, LocalSess sagemaker_container = _SageMakerContainer('local', instance_count, image, sagemaker_session=sagemaker_session) with pytest.raises(RuntimeError) as e: - sagemaker_container.train(INPUT_DATA_CONFIG, HYPERPARAMETERS, TRAINING_JOB_NAME) + sagemaker_container.train( + INPUT_DATA_CONFIG, OUTPUT_DATA_CONFIG, HYPERPARAMETERS, TRAINING_JOB_NAME) assert 'this is expected' in str(e) -@patch('sagemaker.local.local_session.LocalSession') -@patch('sagemaker.local.image._stream_output') -@patch('subprocess.Popen') -@patch('sagemaker.local.image._SageMakerContainer._cleanup') -@patch('sagemaker.utils.download_folder') -def test_train_local_code(download_folder, _cleanup, popen, _stream_output, - _local_session, tmpdir, sagemaker_session): +@patch('sagemaker.local.local_session.LocalSession', Mock()) +@patch('sagemaker.local.image._stream_output', Mock()) +@patch('sagemaker.local.image._SageMakerContainer._cleanup', Mock()) +@patch('sagemaker.local.data.get_data_source_instance', Mock()) +@patch('subprocess.Popen', Mock()) +def test_train_local_code(tmpdir, sagemaker_session): directories = [str(tmpdir.mkdir('container-root')), str(tmpdir.mkdir('data'))] with patch('sagemaker.local.image._SageMakerContainer._create_tmp_folder', side_effect=directories): @@ -317,7 +378,8 @@ def test_train_local_code(download_folder, _cleanup, popen, _stream_output, sagemaker_container = _SageMakerContainer('local', instance_count, image, sagemaker_session=sagemaker_session) - sagemaker_container.train(INPUT_DATA_CONFIG, LOCAL_CODE_HYPERPARAMETERS, TRAINING_JOB_NAME) + sagemaker_container.train( + INPUT_DATA_CONFIG, OUTPUT_DATA_CONFIG, LOCAL_CODE_HYPERPARAMETERS, TRAINING_JOB_NAME) docker_compose_file = os.path.join(sagemaker_container.container_root, 'docker-compose.yaml') @@ -345,7 +407,7 @@ def test_container_has_gpu_support(tmpdir, sagemaker_session): assert docker_host['runtime'] == 'nvidia' -def test_container_does_not_enable_nvidia_docker_for_cpu_containers(tmpdir, sagemaker_session): +def test_container_does_not_enable_nvidia_docker_for_cpu_containers(sagemaker_session): instance_count = 1 image = 'my-image' sagemaker_container = _SageMakerContainer('local', instance_count, image, @@ -355,14 +417,13 @@ def test_container_does_not_enable_nvidia_docker_for_cpu_containers(tmpdir, sage assert 'runtime' not in docker_host -@patch('sagemaker.local.image._HostingContainer.run') -@patch('shutil.copy') -@patch('shutil.copytree') -def test_serve(up, copy, copytree, tmpdir, sagemaker_session): - +@patch('sagemaker.local.image._HostingContainer.run', Mock()) +@patch('sagemaker.local.image._SageMakerContainer._prepare_serving_volumes', Mock(return_value=[])) +@patch('shutil.copy', Mock()) +@patch('shutil.copytree', Mock()) +def test_serve(tmpdir, sagemaker_session): with patch('sagemaker.local.image._SageMakerContainer._create_tmp_folder', return_value=str(tmpdir.mkdir('container-root'))): - image = 'my-image' sagemaker_container = _SageMakerContainer('local', 1, image, sagemaker_session=sagemaker_session) environment = { @@ -382,14 +443,13 @@ def test_serve(up, copy, copytree, tmpdir, sagemaker_session): assert config['services'][h]['command'] == 'serve' -@patch('sagemaker.local.image._HostingContainer.run') -@patch('shutil.copy') -@patch('shutil.copytree') -def test_serve_local_code(up, copy, copytree, tmpdir, sagemaker_session): - +@patch('sagemaker.local.image._HostingContainer.run', Mock()) +@patch('sagemaker.local.image._SageMakerContainer._prepare_serving_volumes', Mock(return_value=[])) +@patch('shutil.copy', Mock()) +@patch('shutil.copytree', Mock()) +def test_serve_local_code(tmpdir, sagemaker_session): with patch('sagemaker.local.image._SageMakerContainer._create_tmp_folder', return_value=str(tmpdir.mkdir('container-root'))): - image = 'my-image' sagemaker_container = _SageMakerContainer('local', 1, image, sagemaker_session=sagemaker_session) environment = { @@ -413,14 +473,13 @@ def test_serve_local_code(up, copy, copytree, tmpdir, sagemaker_session): assert '%s:/opt/ml/code' % '/tmp/code' in volumes -@patch('sagemaker.local.image._HostingContainer.run') -@patch('shutil.copy') -@patch('shutil.copytree') -def test_serve_local_code_no_env(up, copy, copytree, tmpdir, sagemaker_session): - +@patch('sagemaker.local.image._HostingContainer.run', Mock()) +@patch('sagemaker.local.image._SageMakerContainer._prepare_serving_volumes', Mock(return_value=[])) +@patch('shutil.copy', Mock()) +@patch('shutil.copytree', Mock()) +def test_serve_local_code_no_env(tmpdir, sagemaker_session): with patch('sagemaker.local.image._SageMakerContainer._create_tmp_folder', return_value=str(tmpdir.mkdir('container-root'))): - image = 'my-image' sagemaker_container = _SageMakerContainer('local', 1, image, sagemaker_session=sagemaker_session) sagemaker_container.serve('/some/model/path', {}) @@ -435,36 +494,41 @@ def test_serve_local_code_no_env(up, copy, copytree, tmpdir, sagemaker_session): assert config['services'][h]['command'] == 'serve' -@patch('sagemaker.utils.download_file') +@patch('sagemaker.local.data.get_data_source_instance') @patch('tarfile.is_tarfile') @patch('tarfile.open', MagicMock()) @patch('os.makedirs', Mock()) -def test_prepare_serving_volumes_with_s3_model(is_tarfile, download_file, sagemaker_session): - +def test_prepare_serving_volumes_with_s3_model(is_tarfile, get_data_source_instance, sagemaker_session): sagemaker_container = _SageMakerContainer('local', 1, 'some-image', sagemaker_session=sagemaker_session) sagemaker_container.container_root = '/tmp/container_root' - container_model_dir = os.path.join('/tmp/container_root/', sagemaker_container.hosts[0], 'model') + s3_data_source = Mock() + s3_data_source.get_root_dir.return_value = '/tmp/downloaded/data/' + s3_data_source.get_file_list.return_value = ['/tmp/downloaded/data/my_model.tar.gz'] + get_data_source_instance.return_value = s3_data_source is_tarfile.return_value = True volumes = sagemaker_container._prepare_serving_volumes('s3://bucket/my_model.tar.gz') - - tar_location = os.path.join(container_model_dir, 'my_model.tar.gz') - download_file.assert_called_with('bucket', '/my_model.tar.gz', tar_location, sagemaker_session) - is_tarfile.assert_called_with(tar_location) + is_tarfile.assert_called_with('/tmp/downloaded/data/my_model.tar.gz') assert len(volumes) == 1 assert volumes[0].container_dir == '/opt/ml/model' - assert volumes[0].host_dir == container_model_dir + assert volumes[0].host_dir == '/tmp/downloaded/data/' +@patch('sagemaker.local.data.get_data_source_instance') +@patch('tarfile.is_tarfile', Mock(return_value=False)) @patch('os.makedirs', Mock()) -def test_prepare_serving_volumes_with_local_model(sagemaker_session): - +def test_prepare_serving_volumes_with_local_model(get_data_source_instance, sagemaker_session): sagemaker_container = _SageMakerContainer('local', 1, 'some-image', sagemaker_session=sagemaker_session) sagemaker_container.container_root = '/tmp/container_root' - volumes = sagemaker_container._prepare_serving_volumes('/path/to/my_model') + local_file_data_source = Mock() + local_file_data_source.get_root_dir.return_value = '/path/to/my_model' + local_file_data_source.get_file_list.return_value = ['/path/to/my_model/model'] + get_data_source_instance.return_value = local_file_data_source + + volumes = sagemaker_container._prepare_serving_volumes('file:///path/to/my_model') assert len(volumes) == 1 assert volumes[0].container_dir == '/opt/ml/model' diff --git a/tests/unit/test_local_entities.py b/tests/unit/test_local_entities.py index 86180951dd..6261031bdf 100644 --- a/tests/unit/test_local_entities.py +++ b/tests/unit/test_local_entities.py @@ -157,7 +157,7 @@ def test_local_transform_job_perform_batch_inference(get_config_value, move_to_d local_transform_job._perform_batch_inference(input_data, output_data, **transform_kwargs) - dir, output, session = move_to_destination.call_args[0] + dir, output, job_name, session = move_to_destination.call_args[0] assert output == 's3://bucket/output' output_files = os.listdir(dir) assert len(output_files) == 2 diff --git a/tests/unit/test_local_utils.py b/tests/unit/test_local_utils.py index a4f2bb35fb..5f48b68602 100644 --- a/tests/unit/test_local_utils.py +++ b/tests/unit/test_local_utils.py @@ -15,7 +15,7 @@ import pytest from mock import patch, Mock -import sagemaker +import sagemaker.local.utils BUCKET_NAME = 'some-nice-bucket' @@ -24,14 +24,15 @@ @patch('sagemaker.local.utils.recursive_copy') def test_move_to_destination(recursive_copy): # local files will just be recursive copied - sagemaker.local.utils.move_to_destination('/tmp/data', 'file:///target/dir/', None) + sagemaker.local.utils.move_to_destination('/tmp/data', 'file:///target/dir/', 'job', None) recursive_copy.assert_called() # s3 destination will upload to S3 sms = Mock() - sagemaker.local.utils.move_to_destination('/tmp/data', 's3://bucket/path', sms) + sagemaker.local.utils.move_to_destination('/tmp/data', 's3://bucket/path', 'job', sms) sms.upload_data.assert_called() - # weird destination, should raise an exception + +def test_move_to_destination_illegal_destination(): with pytest.raises(ValueError): - sagemaker.local.utils.move_to_destination('/tmp/data', 'ftp://ftp/in/2018', None) + sagemaker.local.utils.move_to_destination('/tmp/data', 'ftp://ftp/in/2018', 'job', None) diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index 14dcff4a96..0a1521fb54 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -252,3 +252,24 @@ def test_download_file(): '/tmp/file.tar.gz', session) bucket_mock.download_file.assert_called_with('prefix/path/file.tar.gz', '/tmp/file.tar.gz') + + +@patch('tarfile.open') +def test_create_tar_file_with_provided_path(open): + open.return_value = open + open.__enter__ = Mock() + open.__exit__ = Mock(return_value=None) + file_list = ['/tmp/a', '/tmp/b'] + path = sagemaker.utils.create_tar_file(file_list, target='/my/custom/path.tar.gz') + assert path == '/my/custom/path.tar.gz' + + +@patch('tarfile.open') +@patch('tempfile.mkstemp', Mock(return_value=(None, '/auto/generated/path'))) +def test_create_tar_file_with_auto_generated_path(open): + open.return_value = open + open.__enter__ = Mock() + open.__exit__ = Mock(return_value=None) + file_list = ['/tmp/a', '/tmp/b'] + path = sagemaker.utils.create_tar_file(file_list) + assert path == '/auto/generated/path' diff --git a/tox.ini b/tox.ini index 45847d1024..9660734fd0 100644 --- a/tox.ini +++ b/tox.ini @@ -39,7 +39,8 @@ ignore = FI54, FI55, FI56, - FI57 + FI57, + W503 require-code = True