diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..62c89355 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.idea/ \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 00000000..2482f02c --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,71 @@ +# Changelog + +## v1.0.5 (2019-08-05) + +### Bug fixes and other changes + + * upgrade sagemaker-container version + * unmark 2 deploy tests + * update p2 restricted regions + +## v1.0.6 (2019-06-21) + +### Bug fixes and other changes + + * unmark 2 deploy tests + +## v1.0.5 (2019-06-20) + +### Bug fixes and other changes + + * update p2 restricted regions + +## v1.0.4 (2019-06-19) + +### Bug fixes and other changes + + * skip tests in gpu instance restricted regions + +## v1.0.3 (2019-06-18) + +### Bug fixes and other changes + + * modify buildspecs and tox files + +## v1.0.2 (2019-06-17) + +### Bug fixes and other changes + + * freeze dependency versions + +## v1.0.1 (2019-06-13) + +### Bug fixes and other changes + + * add buildspec-release file and upgrade cuda version + * upgrade PyTorch to 1.1 + * disable test_mnist_gpu for py2 for now + * fix broken line of buildspec + * prevent hidden errors in buildspec + * Add AWS CodeBuild buildspec for pull request + * Bump minimum SageMaker Containers version to 2.4.6 and pin SageMaker Python SDK to 1.18.16 + * fix broken link in README + * Add timeout to test_mnist_gpu test + * Use dummy role in tests and update local failure integ test + * Use the SageMaker Python SDK for local serving integ tests + * Use the SageMaker Python SDK for local integ distributed training tests + * Use the SageMaker Python SDK for local integ single-machine training tests + * Pin fastai version to 1.0.39 in CPU dockerfile + * Use the SageMaker Python SDK for SageMaker integration tests + * Add missing rendering dependencies for opencv and a simple test. + * Add opencv support. + * Freeze PyYAML version to avoid conflict with Docker Compose + * Unfreeze numpy version. + * Freeze TorchVision to 0.2.1 + * Specify region when creating S3 resource in integ tests + * Read framework version from Python SDK for integ test default + * Fix unicode display problem in py2 container + * freeze pip <=18.1, fastai == 1.0.39, numpy <= 1.15.4 + * Add support for fastai (https://github.com/fastai/fastai) library. + * Remove "requsests" from tests dependencies to avoid regular conflicts with "requests" package from "sagemaker" dependencies. + * Add support for PyTorch-1.0. diff --git a/README.md b/README.md deleted file mode 100644 index fea8e2ac..00000000 --- a/README.md +++ /dev/null @@ -1,7 +0,0 @@ -## sagemaker-pytorch-serving-container - -A real mystery - -## License - -This library is licensed under the Apache 2.0 License. diff --git a/README.rst b/README.rst new file mode 100644 index 00000000..37cb3eca --- /dev/null +++ b/README.rst @@ -0,0 +1,266 @@ + +=========================== +SageMaker PyTorch Serving Container +=========================== + +SageMaker PyTorch Serving Container is an open source library for making the +PyTorch framework run on Amazon SageMaker. + +This repository also contains Dockerfiles which install this library, PyTorch, and dependencies +for building SageMaker PyTorch images. + +The SageMaker team uses this repository to build its official PyTorch image. To use this image on SageMaker, +see `Python SDK `__. +For end users, this repository is typically of interest if you need implementation details for +the official image, or if you want to use it to build your own customized PyTorch image. + +For information on running PyTorch jobs on SageMaker: `SageMaker PyTorch Estimators and Models +`__. + +For notebook examples: `SageMaker Notebook +Examples `__. + +Table of Contents +----------------- + +#. `Getting Started <#getting-started>`__ +#. `Building your Image <#building-your-image>`__ +#. `Running the tests <#running-the-tests>`__ + +Getting Started +--------------- + +Prerequisites +~~~~~~~~~~~~~ + +Make sure you have installed all of the following prerequisites on your +development machine: + +- `Docker `__ + +For Testing on GPU +^^^^^^^^^^^^^^^^^^ + +- `Nvidia-Docker `__ + +Recommended +^^^^^^^^^^^ + +- A Python environment management tool (e.g. + `PyEnv `__, + `VirtualEnv `__) + +Building your image +------------------- + +`Amazon SageMaker `__ +utilizes Docker containers to run all training jobs & inference endpoints. + +The Docker images are built from the Dockerfiles specified in +`Docker/ `__. + +The Docker files are grouped based on PyTorch version and separated +based on Python version and processor type. + +The Docker images, used to run training & inference jobs, are built from +both corresponding "base" and "final" Dockerfiles. + +Base Images +~~~~~~~~~~~ + +The "base" Dockerfile encompass the installation of the framework and all of the dependencies +needed. + +Tagging scheme is based on --py. (e.g.1.0.0-cpu-py3) + +All "final" Dockerfiles build images using base images that use the tagging scheme +above. + +If you want to build your base docker image, then use: + +:: + + # All build instructions assume you're building from the root directory of the sagemaker-pytorch-container. + + # CPU + docker build -t pytorch-base:-cpu-py -f docker//base/Dockerfile.cpu --build-arg py_version= . + + # GPU + docker build -t pytorch-base:-gpu-py -f docker//base/Dockerfile.gpu --build-arg py_version= . + +:: + + # Example + + # CPU + docker build -t pytorch-base:1.0.0-cpu-py3 -f docker/1.0.0/base/Dockerfile.cpu --build-arg py_version=3 . + + # GPU + docker build -t pytorch-base:1.0.0-gpu-py3 -f docker/1.0.0/base/Dockerfile.gpu --build-arg py_version=3 . + +Final Images +~~~~~~~~~~~~ + +The "final" Dockerfiles encompass the installation of the SageMaker specific support code. + +All "final" Dockerfiles use `base images for building `__. + +These "base" images are specified with the naming convention of +pytorch-base:--py. + +Before building "final" images: + +Build your "base" image. Make sure it is named and tagged in accordance with your "final" +Dockerfile. + + +:: + + # Create the SageMaker PyTorch Serving Container Python package. + cd sagemaker-pytorch-container + python setup.py bdist_wheel + +If you want to build "final" Docker images, then use: + +:: + + # All build instructions assume you're building from the root directory of the sagemaker-pytorch-container. + + # CPU + docker build -t : -f docker//final/Dockerfile.cpu --build-arg py_version= . + + # GPU + docker build -t : -f docker//final/Dockerfile.gpu --build-arg py_version= . + +:: + + # Example + + # CPU + docker build -t preprod-pytorch:1.0.0-cpu-py3 -f docker/1.0.0/final/Dockerfile.cpu --build-arg py_version=3 . + + # GPU + docker build -t preprod-pytorch:1.0.0-gpu-py3 -f docker/1.0.0/final/Dockerfile.gpu --build-arg py_version=3 . + + +Running the tests +----------------- + +Running the tests requires installation of the SageMaker PyTorch Serving Container code and its test +dependencies. + +:: + + git clone https://github.com/aws/sagemaker-pytorch-container.git + cd sagemaker-pytorch-container + pip install -e .[test] + +Tests are defined in +`test/ `__ +and include unit, local integration, and SageMaker integration tests. + +Unit Tests +~~~~~~~~~~ + +If you want to run unit tests, then use: + +:: + + # All test instructions should be run from the top level directory + + pytest test/unit + + # or you can use tox to run unit tests as well as flake8 and code coverage + + tox + + +Local Integration Tests +~~~~~~~~~~~~~~~~~~~~~~~ + +Running local integration tests require `Docker `__ and `AWS +credentials `__, +as the local integration tests make calls to a couple AWS services. The local integration tests and +SageMaker integration tests require configurations specified within their respective +`conftest.py `__. + +Local integration tests on GPU require `Nvidia-Docker `__. + +Before running local integration tests: + +#. Build your Docker image. +#. Pass in the correct pytest arguments to run tests against your Docker image. + +If you want to run local integration tests, then use: + +:: + + # Required arguments for integration tests are found in test/conftest.py + + pytest test/integration/local --docker-base-name \ + --tag \ + --py-version <2_or_3> \ + --framework-version \ + --processor + +:: + + # Example + pytest test/integration/local --docker-base-name preprod-pytorch \ + --tag 1.0 \ + --py-version 3 \ + --framework-version 1.0.0 \ + --processor cpu + +SageMaker Integration Tests +~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +SageMaker integration tests require your Docker image to be within an `Amazon ECR repository `__. + +The Docker base name is your `ECR repository namespace `__. + +The instance type is your specified `Amazon SageMaker Instance Type +`__ that the SageMaker integration test will run on. + +Before running SageMaker integration tests: + +#. Build your Docker image. +#. Push the image to your ECR repository. +#. Pass in the correct pytest arguments to run tests on SageMaker against the image within your ECR repository. + +If you want to run a SageMaker integration end to end test on `Amazon +SageMaker `__, then use: + +:: + + # Required arguments for integration tests are found in test/conftest.py + + pytest test/integration/sagemaker --aws-id \ + --docker-base-name \ + --instance-type \ + --tag \ + +:: + + # Example + pytest test/integration/sagemaker --aws-id 12345678910 \ + --docker-base-name preprod-pytorch \ + --instance-type ml.m4.xlarge \ + --tag 1.0 + +Contributing +------------ + +Please read +`CONTRIBUTING.md `__ +for details on our code of conduct, and the process for submitting pull +requests to us. + +License +------- + +SageMaker PyTorch Serving Container is licensed under the Apache 2.0 License. It is copyright 2018 Amazon +.com, Inc. or its affiliates. All Rights Reserved. The license is available at: +http://aws.amazon.com/apache2.0/ diff --git a/VERSION b/VERSION new file mode 100644 index 00000000..cfa0775d --- /dev/null +++ b/VERSION @@ -0,0 +1 @@ +1.0.6.dev0 diff --git a/buildspec.yml b/buildspec.yml new file mode 100644 index 00000000..9c54880c --- /dev/null +++ b/buildspec.yml @@ -0,0 +1,115 @@ +version: 0.2 + +env: + variables: + FRAMEWORK_VERSION: '1.1.0' + CPU_PY_VERSION: '3' + CPU_INSTANCE_TYPE: 'ml.c4.xlarge' + GPU_PY_VERSION: '3' + GPU_INSTANCE_TYPE: 'ml.p2.xlarge' + LOCAL_BASE_REPO: 'pytorch-base' + ECR_REPO: 'sagemaker-test' + GITHUB_REPO: 'sagemaker-pytorch-serving-container' + SETUP_FILE: 'setup_cmds.sh' + SETUP_CMDS: '#!/bin/bash\npip install --upgrade pip\npip install -U -e .\npip install -U -e .[test]' + + +phases: + pre_build: + commands: + - start-dockerd + - ACCOUNT=$(aws sts get-caller-identity --query 'Account' --output text) + - PREPROD_IMAGE="$ACCOUNT.dkr.ecr.$AWS_DEFAULT_REGION.amazonaws.com/$ECR_REPO" + - PR_NUM=$(echo $CODEBUILD_SOURCE_VERSION | grep -o '[0-9]\+') + - BUILD_ID="$(echo $CODEBUILD_BUILD_ID | sed -e 's/:/-/g')" + - echo 'Pull request number:' $PR_NUM '. No value means this build is not from pull request.' + + build: + commands: + # install + - pip3 install -U -e . + - pip3 install -U -e .[test] + + # run unit tests + - pytest test/unit + + # build cpu base image + - base_dir="docker/$FRAMEWORK_VERSION/base" + - cpu_py3_base_tag="$FRAMEWORK_VERSION-cpu-py3" + - cpu_dockerfile="Dockerfile.cpu" + - cd $base_dir + - docker build -t $LOCAL_BASE_REPO:$cpu_py3_base_tag -f $cpu_dockerfile . + - cd ../../../ + + # build gpu base image + - gpu_py3_base_tag="$FRAMEWORK_VERSION-gpu-py3" + - gpu_dockerfile="Dockerfile.gpu" + - cd $base_dir + - docker build -t $LOCAL_BASE_REPO:$gpu_py3_base_tag -f $gpu_dockerfile . + - cd ../../../ + + # create wheel + - python3 setup.py bdist_wheel + + # build cpu image + - build_dir="docker/$FRAMEWORK_VERSION/final" + - CPU_PY3_TAG="$FRAMEWORK_VERSION-cpu-py3-$BUILD_ID" + - docker build -f "$build_dir/$cpu_dockerfile" -t $PREPROD_IMAGE:$CPU_PY3_TAG . + + # build gpu image + - GPU_PY3_TAG="$FRAMEWORK_VERSION-gpu-py3-$BUILD_ID" + - docker build -f "$build_dir/$gpu_dockerfile" -t $PREPROD_IMAGE:$GPU_PY3_TAG . + + # push images to ecr + - $(aws ecr get-login --registry-ids $ACCOUNT --no-include-email --region $AWS_DEFAULT_REGION) + - docker push $PREPROD_IMAGE:$CPU_PY3_TAG + - docker push $PREPROD_IMAGE:$GPU_PY3_TAG + + # launch remote gpu instance + - prefix='ml.' + - instance_type=${GPU_INSTANCE_TYPE#"$prefix"} + - create-key-pair + - launch-ec2-instance --instance-type $instance_type --ami-name dlami-ubuntu + + # run cpu integration tests + - | + if has-matching-changes "test/" "tests/" "src/*.py" "docker/*" "buildspec.yml"; then + pytest test/integration/local --region $AWS_DEFAULT_REGION --docker-base-name $PREPROD_IMAGE --framework-version $FRAMEWORK_VERSION --processor cpu --tag $CPU_PY3_TAG + else + echo "skipping cpu integration tests" + fi + + # run gpu integration tests + - | + if has-matching-changes "test/" "tests/" "src/*.py" "docker/*" "buildspec.yml"; then + printf "$SETUP_CMDS" > $SETUP_FILE + py3_cmd="pytest test/integration/local --region $AWS_DEFAULT_REGION --docker-base-name $PREPROD_IMAGE --framework-version $FRAMEWORK_VERSION --processor gpu --tag $GPU_PY3_TAG" + remote-test --github-repo $GITHUB_REPO --test-cmd "$py3_cmd" --setup-file $SETUP_FILE --pr-number "$PR_NUM" + else + echo "skipping gpu integration tests" + fi + + # run cpu sagemaker tests + - | + if has-matching-changes "test/" "tests/" "src/*.py" "docker/*" "buildspec.yml"; then + pytest test/integration/sagemaker --region $AWS_DEFAULT_REGION --docker-base-name $ECR_REPO --aws-id $ACCOUNT --framework-version $FRAMEWORK_VERSION --processor cpu --instance-type $CPU_INSTANCE_TYPE --tag $CPU_PY3_TAG + else + echo "skipping cpu sagemaker tests" + fi + + # run gpu sagemaker tests + - | + if has-matching-changes "test/" "tests/" "src/*.py" "docker/*" "buildspec.yml"; then + pytest test/integration/sagemaker --region $AWS_DEFAULT_REGION --docker-base-name $ECR_REPO --aws-id $ACCOUNT --framework-version $FRAMEWORK_VERSION --processor gpu --instance-type $GPU_INSTANCE_TYPE --tag $GPU_PY3_TAG + else + echo "skipping gpu sagemaker tests" + fi + + finally: + # shut down remote gpu instance + - cleanup-gpu-instances + - cleanup-key-pairs + + # remove ecr image + - aws ecr batch-delete-image --repository-name $ECR_REPO --region $AWS_DEFAULT_REGION --image-ids imageTag=$CPU_PY3_TAG + - aws ecr batch-delete-image --repository-name $ECR_REPO --region $AWS_DEFAULT_REGION --image-ids imageTag=$GPU_PY3_TAG diff --git a/docker/1.1.0/base/Dockerfile.cpu b/docker/1.1.0/base/Dockerfile.cpu new file mode 100644 index 00000000..f2d6cf0f --- /dev/null +++ b/docker/1.1.0/base/Dockerfile.cpu @@ -0,0 +1,40 @@ +FROM ubuntu:16.04 + +ENV py_version=3 + +# Validate that arguments are specified +RUN test $py_version || exit 1 + +# Install python and nginx +RUN apt-get update && apt-get install -y --no-install-recommends software-properties-common && \ + add-apt-repository ppa:deadsnakes/ppa -y && \ + apt-get update && apt-get install -y --no-install-recommends \ + build-essential \ + curl \ + jq \ + libsm6 \ + libxext6 \ + libxrender-dev \ + nginx && \ + if [ $py_version -eq 3 ]; \ + then apt-get install -y --no-install-recommends python3.6-dev \ + && ln -s -f /usr/bin/python3.6 /usr/bin/python; \ + else apt-get install -y --no-install-recommends python-dev; fi && \ + rm -rf /var/lib/apt/lists/* + +# Install pip +RUN cd /tmp && \ + curl -O https://bootstrap.pypa.io/get-pip.py && \ + python get-pip.py 'pip<=18.1' && rm get-pip.py + +# Python won’t try to write .pyc or .pyo files on the import of source modules +# Force stdin, stdout and stderr to be totally unbuffered. Good for logging +ENV PYTHONDONTWRITEBYTECODE=1 PYTHONUNBUFFERED=1 PYTHONIOENCODING=UTF-8 LANG=C.UTF-8 LC_ALL=C.UTF-8 + +# Install dependencies from pip +RUN if [ $py_version -eq 3 ]; \ + then pip install --no-cache-dir https://download.pytorch.org/whl/cpu/torch-1.1.0-cp36-cp36m-linux_x86_64.whl \ + https://download.pytorch.org/whl/cpu/torchvision-0.3.0-cp36-cp36m-linux_x86_64.whl fastai==1.0.39; \ + else pip install --no-cache-dir https://download.pytorch.org/whl/cpu/torch-1.1.0-cp27-cp27mu-linux_x86_64.whl \ + https://download.pytorch.org/whl/cpu/torchvision-0.3.0-cp27-cp27mu-linux_x86_64.whl; fi && \ + pip install --no-cache-dir 'opencv-python>=4.0,<4.1' Pillow retrying six diff --git a/docker/1.1.0/base/Dockerfile.gpu b/docker/1.1.0/base/Dockerfile.gpu new file mode 100644 index 00000000..c7a78441 --- /dev/null +++ b/docker/1.1.0/base/Dockerfile.gpu @@ -0,0 +1,35 @@ +FROM nvidia/cuda:10.1-cudnn7-runtime-ubuntu16.04 + +ENV py_version=3 + +# Validate that arguments are specified +RUN test $py_version || exit 1 + +# Install python and nginx +RUN apt-get update && apt-get install -y --no-install-recommends software-properties-common && \ + add-apt-repository ppa:deadsnakes/ppa -y && \ + apt-get update && apt-get install -y --no-install-recommends \ + build-essential \ + curl \ + jq \ + libsm6 \ + libxext6 \ + libxrender-dev \ + nginx && \ + if [ $py_version -eq 3 ]; \ + then apt-get install -y --no-install-recommends python3.6-dev \ + && ln -s -f /usr/bin/python3.6 /usr/bin/python; \ + else apt-get install -y --no-install-recommends python-dev; fi && \ + rm -rf /var/lib/apt/lists/* + +# Install pip +RUN cd /tmp && \ + curl -O https://bootstrap.pypa.io/get-pip.py && \ + python get-pip.py 'pip<=18.1' && rm get-pip.py + +# Python won’t try to write .pyc or .pyo files on the import of source modules +# Force stdin, stdout and stderr to be totally unbuffered. Good for logging +ENV PYTHONDONTWRITEBYTECODE=1 PYTHONUNBUFFERED=1 PYTHONIOENCODING=UTF-8 LANG=C.UTF-8 LC_ALL=C.UTF-8 + +RUN pip install --no-cache-dir 'opencv-python>=4.0,<4.1' Pillow retrying six torch==1.1.0 torchvision==0.3.0 && \ + if [ $py_version -eq 3 ]; then pip install --no-cache-dir fastai==1.0.39; fi diff --git a/docker/1.1.0/final/Dockerfile.cpu b/docker/1.1.0/final/Dockerfile.cpu new file mode 100644 index 00000000..3705978b --- /dev/null +++ b/docker/1.1.0/final/Dockerfile.cpu @@ -0,0 +1,17 @@ +FROM pytorch-base:1.1.0-cpu-py3 + +LABEL com.amazonaws.sagemaker.capabilities.accept-bind-to-port=true + +# Copy workaround script for incorrect hostname +COPY lib/changehostname.c / +COPY lib/start_with_right_hostname.sh /usr/local/bin/start_with_right_hostname.sh +RUN chmod +x /usr/local/bin/start_with_right_hostname.sh + +COPY dist/sagemaker_pytorch_serving_container-1.2-py2.py3-none-any.whl /sagemaker_pytorch_serving_container-1.2-py2.py3-none-any.whl +RUN pip install --no-cache-dir /sagemaker_pytorch_serving_container-1.2-py2.py3-none-any.whl && \ + rm /sagemaker_pytorch_serving_container-1.2-py2.py3-none-any.whl + +ENV SAGEMAKER_SERVING_MODULE sagemaker_pytorch_serving_container.serving:main + +# Starts framework +ENTRYPOINT ["bash", "-m", "start_with_right_hostname.sh"] diff --git a/docker/1.1.0/final/Dockerfile.gpu b/docker/1.1.0/final/Dockerfile.gpu new file mode 100644 index 00000000..4eb98f88 --- /dev/null +++ b/docker/1.1.0/final/Dockerfile.gpu @@ -0,0 +1,17 @@ +FROM pytorch-base:1.1.0-gpu-py3 + +LABEL com.amazonaws.sagemaker.capabilities.accept-bind-to-port=true + +# Copy workaround script for incorrect hostname +COPY lib/changehostname.c / +COPY lib/start_with_right_hostname.sh /usr/local/bin/start_with_right_hostname.sh +RUN chmod +x /usr/local/bin/start_with_right_hostname.sh + +COPY dist/sagemaker_pytorch_serving_container-1.2-py2.py3-none-any.whl /sagemaker_pytorch_serving_container-1.2-py2.py3-none-any.whl +RUN pip install --no-cache-dir /sagemaker_pytorch_serving_container-1.2-py2.py3-none-any.whl && \ + rm /sagemaker_pytorch_serving_container-1.2-py2.py3-none-any.whl + +ENV SAGEMAKER_SERVING_MODULE sagemaker_pytorch_serving_container.serving:main + +# Starts framework +ENTRYPOINT ["bash", "-m", "start_with_right_hostname.sh"] diff --git a/lib/changehostname.c b/lib/changehostname.c new file mode 100644 index 00000000..1ae5ee1a --- /dev/null +++ b/lib/changehostname.c @@ -0,0 +1,18 @@ +#include +#include + +/* + * Modifies gethostname to return algo-1, algo-2, etc. when running on SageMaker. + * + * Without this gethostname() on SageMaker returns 'aws', leading NCCL/MPI to think there is only one host, + * not realizing that it needs to use NET/Socket. + * + * When docker container starts we read 'current_host' value from /opt/ml/input/config/resourceconfig.json + * and replace PLACEHOLDER_HOSTNAME with it before compiling this code into a shared library. + */ +int gethostname(char *name, size_t len) +{ + const char *val = PLACEHOLDER_HOSTNAME; + strncpy(name, val, len); + return 0; +} diff --git a/lib/start_with_right_hostname.sh b/lib/start_with_right_hostname.sh new file mode 100644 index 00000000..662cb15b --- /dev/null +++ b/lib/start_with_right_hostname.sh @@ -0,0 +1,15 @@ +#!/usr/bin/env bash + +if [ $1 == 'train' ] +then + CURRENT_HOST=$(jq .current_host /opt/ml/input/config/resourceconfig.json) + + sed -ie "s/PLACEHOLDER_HOSTNAME/$CURRENT_HOST/g" changehostname.c + + gcc -o changehostname.o -c -fPIC -Wall changehostname.c + gcc -o libchangehostname.so -shared -export-dynamic changehostname.o -ldl + + LD_PRELOAD=/libchangehostname.so train +else + serve +fi diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 00000000..1274dfb7 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,6 @@ +[tool:pytest] +addopts = + --verbose + +[bdist_wheel] +universal=1 diff --git a/setup.py b/setup.py new file mode 100644 index 00000000..800be23b --- /dev/null +++ b/setup.py @@ -0,0 +1,56 @@ +# Copyright 2018-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +from __future__ import absolute_import + +import os +from glob import glob +from os.path import basename +from os.path import splitext + +from setuptools import setup, find_packages + + +def read(fname): + return open(os.path.join(os.path.dirname(__file__), fname)).read() + + +setup( + name='sagemaker_pytorch_serving_container', + version='1.2', + description='Open source library for creating PyTorch containers to run on Amazon SageMaker.', + + packages=find_packages(where='src', exclude=('test',)), + package_dir={'': 'src'}, + py_modules=[splitext(basename(path))[0] for path in glob('src/*.py')], + + long_description=read('README.rst'), + author='Amazon Web Services', + license='Apache License 2.0', + + classifiers=[ + "Development Status :: 5 - Production/Stable", + "Intended Audience :: Developers", + "Natural Language :: English", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python", + 'Programming Language :: Python :: 2.7', + 'Programming Language :: Python :: 3.6', + ], + install_requires=['numpy==1.16.4', 'Pillow==6.0.0', 'retrying==1.3.3', 'sagemaker-containers==2.5.4', + 'six==1.12.0', 'torch==1.1.0'], + extras_require={ + 'test': ['boto3==1.9.169', 'coverage==4.5.3', 'docker-compose==1.23.2', 'flake8==3.7.7', 'Flask==1.0.2', + 'mock==2.0.0', 'pytest==4.4.0', 'pytest-cov==2.7.1', 'pytest-xdist==1.28.0', 'PyYAML==3.10', + 'sagemaker==1.28.1', 'torchvision==0.3.0', 'tox==3.7.0'] + }, +) diff --git a/src/sagemaker_pytorch_serving_container/__init__.py b/src/sagemaker_pytorch_serving_container/__init__.py new file mode 100644 index 00000000..74f14335 --- /dev/null +++ b/src/sagemaker_pytorch_serving_container/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +from __future__ import absolute_import diff --git a/src/sagemaker_pytorch_serving_container/serving.py b/src/sagemaker_pytorch_serving_container/serving.py new file mode 100644 index 00000000..50d4bb00 --- /dev/null +++ b/src/sagemaker_pytorch_serving_container/serving.py @@ -0,0 +1,112 @@ +# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +from __future__ import absolute_import + +import logging + +import torch +from sagemaker_containers.beta.framework import (content_types, encoders, env, modules, transformer, + worker) + +logger = logging.getLogger(__name__) + + +def default_model_fn(model_dir): + """Loads a model. For PyTorch, a default function to load a model cannot be provided. + Users should provide customized model_fn() in script. + + Args: + model_dir: a directory where model is saved. + + Returns: A PyTorch model. + """ + return transformer.default_model_fn(model_dir) + + +def default_input_fn(input_data, content_type): + """A default input_fn that can handle JSON, CSV and NPZ formats. + + Args: + input_data: the request payload serialized in the content_type format + content_type: the request content_type + + Returns: input_data deserialized into torch.FloatTensor or torch.cuda.FloatTensor depending if cuda is available. + """ + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + np_array = encoders.decode(input_data, content_type) + tensor = torch.FloatTensor( + np_array) if content_type in content_types.UTF8_TYPES else torch.from_numpy(np_array) + return tensor.to(device) + + +def default_predict_fn(data, model): + """A default predict_fn for PyTorch. Calls a model on data deserialized in input_fn. + Runs prediction on GPU if cuda is available. + + Args: + data: input data (torch.Tensor) for prediction deserialized by input_fn + model: PyTvorch model loaded in memory by model_fn + + Returns: a prediction + """ + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + model.to(device) + input_data = data.to(device) + model.eval() + with torch.no_grad(): + output = model(input_data) + return output + + +def default_output_fn(prediction, accept): + """A default output_fn for PyTorch. Serializes predictions from predict_fn to JSON, CSV or NPZ format. + + Args: + prediction: a prediction result from predict_fn + accept: type which the output data needs to be serialized + + Returns: output data serialized + """ + if type(prediction) == torch.Tensor: + prediction = prediction.detach().cpu().numpy() + + return worker.Response(response=encoders.encode(prediction, accept), mimetype=accept) + + +def _user_module_transformer(user_module): + model_fn = getattr(user_module, 'model_fn', default_model_fn) + input_fn = getattr(user_module, 'input_fn', default_input_fn) + predict_fn = getattr(user_module, 'predict_fn', default_predict_fn) + output_fn = getattr(user_module, 'output_fn', default_output_fn) + + return transformer.Transformer(model_fn=model_fn, input_fn=input_fn, predict_fn=predict_fn, + output_fn=output_fn) + + +app = None + + +def main(environ, start_response): + global app + if app is None: + serving_env = env.ServingEnv() + user_module = modules.import_module(serving_env.module_dir, serving_env.module_name) + + user_module_transformer = _user_module_transformer(user_module) + + user_module_transformer.initialize() + + app = worker.Worker(transform_fn=user_module_transformer.transform, + module_name=serving_env.module_name) + + return app(environ, start_response) diff --git a/test/__init__.py b/test/__init__.py new file mode 100644 index 00000000..74f14335 --- /dev/null +++ b/test/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +from __future__ import absolute_import diff --git a/test/conftest.py b/test/conftest.py new file mode 100644 index 00000000..3c72a578 --- /dev/null +++ b/test/conftest.py @@ -0,0 +1,179 @@ +# Copyright 2018-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +from __future__ import absolute_import + +import boto3 +import os +import logging +import platform +import pytest +import shutil +import sys +import tempfile + +from sagemaker import LocalSession, Session +from sagemaker.pytorch import PyTorch + +from test.utils import image_utils + +logger = logging.getLogger(__name__) +logging.getLogger('boto').setLevel(logging.INFO) +logging.getLogger('boto3').setLevel(logging.INFO) +logging.getLogger('botocore').setLevel(logging.INFO) +logging.getLogger('factory.py').setLevel(logging.INFO) +logging.getLogger('auth.py').setLevel(logging.INFO) +logging.getLogger('connectionpool.py').setLevel(logging.INFO) + + +dir_path = os.path.dirname(os.path.realpath(__file__)) + +NO_P2_REGIONS = ['ap-northeast-3', 'ap-southeast-2', 'ca-central-1', 'eu-central-1', + 'eu-west-2', 'us-west-1'] +NO_P3_REGIONS = ['ap-northeast-3', 'ap-southeast-1', 'ap-southeast-2', 'ap-south-1', 'ca-central-1', + 'eu-central-1', 'eu-west-2', 'us-west-1'] + + +def pytest_addoption(parser): + parser.addoption('--build-image', '-D', action='store_true') + parser.addoption('--build-base-image', '-B', action='store_true') + parser.addoption('--aws-id') + parser.addoption('--instance-type') + parser.addoption('--docker-base-name', default='pytorch') + parser.addoption('--region', default='us-west-2') + parser.addoption('--framework-version', default=PyTorch.LATEST_VERSION) + parser.addoption('--processor', choices=['gpu', 'cpu'], default='cpu') + # If not specified, will default to {framework-version}-{processor}-py3 + parser.addoption('--tag', default=None) + + +@pytest.fixture(scope='session', name='docker_base_name') +def fixture_docker_base_name(request): + return request.config.getoption('--docker-base-name') + + +@pytest.fixture(scope='session', name='region') +def fixture_region(request): + return request.config.getoption('--region') + + +@pytest.fixture(scope='session', name='framework_version') +def fixture_framework_version(request): + return request.config.getoption('--framework-version') + + +@pytest.fixture(scope='session', name='processor') +def fixture_processor(request): + return request.config.getoption('--processor') + + +@pytest.fixture(scope='session', name='tag') +def fixture_tag(request, framework_version, processor): + provided_tag = request.config.getoption('--tag') + default_tag = '{}-{}-py3'.format(framework_version, processor) + return provided_tag if provided_tag else default_tag + + +@pytest.fixture(scope='session', name='docker_image') +def fixture_docker_image(docker_base_name, tag): + return '{}:{}'.format(docker_base_name, tag) + + +@pytest.fixture +def opt_ml(): + tmp = tempfile.mkdtemp() + os.mkdir(os.path.join(tmp, 'output')) + + # Docker cannot mount Mac OS /var folder properly see + # https://forums.docker.com/t/var-folders-isnt-mounted-properly/9600 + opt_ml_dir = '/private{}'.format(tmp) if platform.system() == 'Darwin' else tmp + yield opt_ml_dir + + shutil.rmtree(tmp, True) + + +@pytest.fixture(scope='session', name='use_gpu') +def fixture_use_gpu(processor): + return processor == 'gpu' + + +@pytest.fixture(scope='session', name='build_base_image', autouse=True) +def fixture_build_base_image(request, framework_version, processor, tag, docker_base_name): + build_base_image = request.config.getoption('--build-base-image') + if build_base_image: + return image_utils.build_base_image(framework_name=docker_base_name, + framework_version=framework_version, + base_image_tag=tag, + processor=processor, + cwd=os.path.join(dir_path, '..')) + + return tag + + +@pytest.fixture(scope='session', name='build_image', autouse=True) +def fixture_build_image(request, framework_version, processor, tag, docker_base_name): + build_image = request.config.getoption('--build-image') + if build_image: + return image_utils.build_image(framework_name=docker_base_name, + framework_version=framework_version, + processor=processor, + tag=tag, + cwd=os.path.join(dir_path, '..')) + + return tag + + +@pytest.fixture(scope='session', name='sagemaker_session') +def fixture_sagemaker_session(region): + return Session(boto_session=boto3.Session(region_name=region)) + + +@pytest.fixture(scope='session', name='sagemaker_local_session') +def fixture_sagemaker_local_session(region): + return LocalSession(boto_session=boto3.Session(region_name=region)) + + +@pytest.fixture(name='aws_id', scope='session') +def fixture_aws_id(request): + return request.config.getoption('--aws-id') + + +@pytest.fixture(name='instance_type', scope='session') +def fixture_instance_type(request, processor): + provided_instance_type = request.config.getoption('--instance-type') + default_instance_type = 'local' if processor == 'cpu' else 'local_gpu' + return provided_instance_type or default_instance_type + + +@pytest.fixture(name='docker_registry', scope='session') +def fixture_docker_registry(aws_id, region): + return '{}.dkr.ecr.{}.amazonaws.com'.format(aws_id, region) + + +@pytest.fixture(name='ecr_image', scope='session') +def fixture_ecr_image(docker_registry, docker_base_name, tag): + return '{}/{}:{}'.format(docker_registry, docker_base_name, tag) + + +@pytest.fixture(autouse=True) +def skip_by_device_type(request, use_gpu, instance_type): + is_gpu = use_gpu or instance_type[3] in ['g', 'p'] + if (request.node.get_closest_marker('skip_gpu') and is_gpu) or \ + (request.node.get_closest_marker('skip_cpu') and not is_gpu): + pytest.skip('Skipping because running on \'{}\' instance'.format(instance_type)) + + +@pytest.fixture(autouse=True) +def skip_gpu_instance_restricted_regions(region, instance_type): + if((region in NO_P2_REGIONS and instance_type.startswith('ml.p2')) or + (region in NO_P3_REGIONS and instance_type.startswith('ml.p3'))): + pytest.skip('Skipping GPU test in region {}'.format(region)) diff --git a/test/integration/__init__.py b/test/integration/__init__.py new file mode 100644 index 00000000..2b5e8db2 --- /dev/null +++ b/test/integration/__init__.py @@ -0,0 +1,36 @@ +# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +from __future__ import absolute_import + +import os + +resources_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'resources')) +mnist_path = os.path.join(resources_path, 'mnist') +mnist_script = os.path.join(mnist_path, 'mnist.py') +fastai_path = os.path.join(resources_path, 'fastai') +fastai_cifar_script = os.path.join(fastai_path, 'train_cifar.py') +fastai_mnist_script = os.path.join(fastai_path, 'mnist.py') +data_dir = os.path.join(mnist_path, 'data') +training_dir = os.path.join(data_dir, 'training') +dist_operations_path = os.path.join(resources_path, 'distributed_operations.py') + +mnist_1d_script = os.path.join(mnist_path, 'mnist_1d.py') +model_cpu_dir = os.path.join(mnist_path, 'model_cpu') +model_cpu_1d_dir = os.path.join(model_cpu_dir, '1d') +model_gpu_dir = os.path.join(mnist_path, 'model_gpu') +model_gpu_1d_dir = os.path.join(model_gpu_dir, '1d') +call_model_fn_once_script = os.path.join(resources_path, 'call_model_fn_once.py') + +ROLE = 'dummy/unused-role' +DEFAULT_TIMEOUT = 20 +PYTHON3 = 'py3' diff --git a/test/integration/local/__init__.py b/test/integration/local/__init__.py new file mode 100644 index 00000000..74f14335 --- /dev/null +++ b/test/integration/local/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +from __future__ import absolute_import diff --git a/test/integration/local/test_serving.py b/test/integration/local/test_serving.py new file mode 100644 index 00000000..613afb71 --- /dev/null +++ b/test/integration/local/test_serving.py @@ -0,0 +1,137 @@ +# Copyright 2018-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +from __future__ import absolute_import + +from contextlib import contextmanager + +import numpy as np +import pytest +import torch +import torch.utils.data +import torch.utils.data.distributed +from sagemaker.pytorch import PyTorchModel +from sagemaker.predictor import BytesDeserializer, csv_deserializer, csv_serializer, \ + json_deserializer, json_serializer, npy_serializer, numpy_deserializer +from sagemaker_containers.beta.framework import content_types +from torchvision import datasets, transforms + +from test.integration import training_dir, mnist_script, mnist_1d_script, model_cpu_dir, \ + model_gpu_dir, model_cpu_1d_dir, call_model_fn_once_script, ROLE +from test.utils import local_mode_utils + +CONTENT_TYPE_TO_SERIALIZER_MAP = { + content_types.CSV: csv_serializer, + content_types.JSON: json_serializer, + content_types.NPY: npy_serializer, +} + +ACCEPT_TYPE_TO_DESERIALIZER_MAP = { + content_types.CSV: csv_deserializer, + content_types.JSON: json_deserializer, + content_types.NPY: numpy_deserializer, +} + + +@pytest.fixture(name='test_loader') +def fixture_test_loader(): + # Largest batch size is only 300 because client_max_body_size is 5M + return _get_test_data_loader(batch_size=300) + + +def test_serve_json_npy(test_loader, use_gpu, docker_image, sagemaker_local_session, instance_type): + model_dir = model_gpu_dir if use_gpu else model_cpu_dir + with _predictor(model_dir, mnist_script, docker_image, sagemaker_local_session, + instance_type) as predictor: + for content_type in (content_types.JSON, content_types.NPY): + for accept in (content_types.JSON, content_types.CSV, content_types.NPY): + _assert_prediction_npy_json(predictor, test_loader, content_type, accept) + + +def test_serve_csv(test_loader, use_gpu, docker_image, sagemaker_local_session, instance_type): + with _predictor(model_cpu_1d_dir, mnist_1d_script, docker_image, sagemaker_local_session, + instance_type) as predictor: + for accept in (content_types.JSON, content_types.CSV, content_types.NPY): + _assert_prediction_csv(predictor, test_loader, accept) + + +@pytest.mark.skip_cpu +def test_serve_cpu_model_on_gpu(test_loader, docker_image, sagemaker_local_session, instance_type): + with _predictor(model_cpu_1d_dir, mnist_1d_script, docker_image, sagemaker_local_session, + instance_type) as predictor: + _assert_prediction_npy_json(predictor, test_loader, content_types.NPY, content_types.JSON) + + +def test_serving_calls_model_fn_once(docker_image, sagemaker_local_session, instance_type): + with _predictor(model_cpu_dir, call_model_fn_once_script, docker_image, sagemaker_local_session, + instance_type, model_server_workers=2) as predictor: + predictor.accept = None + predictor.deserializer = BytesDeserializer() + + # call enough times to ensure multiple requests to a worker + for i in range(3): + # will return 500 error if model_fn called during request handling + output = predictor.predict(b'input') + assert output == b'output' + + +@contextmanager +def _predictor(model_dir, script, image, sagemaker_local_session, instance_type, + model_server_workers=None): + model = PyTorchModel('file://{}'.format(model_dir), + ROLE, + script, + image=image, + sagemaker_session=sagemaker_local_session, + model_server_workers=model_server_workers) + + with local_mode_utils.lock(): + try: + predictor = model.deploy(1, instance_type) + yield predictor + finally: + predictor.delete_endpoint() + + +def _assert_prediction_npy_json(predictor, test_loader, content_type, accept): + predictor.content_type = content_type + predictor.serializer = CONTENT_TYPE_TO_SERIALIZER_MAP[content_type] + predictor.accept = accept + predictor.deserializer = ACCEPT_TYPE_TO_DESERIALIZER_MAP[accept] + + data = _get_mnist_batch(test_loader).numpy() + output = predictor.predict(data) + + assert np.asarray(output).shape == (test_loader.batch_size, 10) + + +def _assert_prediction_csv(predictor, test_loader, accept): + predictor.accept = accept + predictor.deserializer = ACCEPT_TYPE_TO_DESERIALIZER_MAP[accept] + + data = _get_mnist_batch(test_loader).view(test_loader.batch_size, -1) + output = predictor.predict(data) + assert np.asarray(output).shape == (test_loader.batch_size, 10) + + +def _get_test_data_loader(batch_size): + return torch.utils.data.DataLoader( + datasets.MNIST(training_dir, train=False, transform=transforms.Compose([ + transforms.ToTensor(), + transforms.Normalize((0.1307,), (0.3081,)) + ])), + batch_size=batch_size, shuffle=True) + + +def _get_mnist_batch(test_loader): + for data in test_loader: + return data[0] diff --git a/test/integration/sagemaker/__init__.py b/test/integration/sagemaker/__init__.py new file mode 100644 index 00000000..74f14335 --- /dev/null +++ b/test/integration/sagemaker/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +from __future__ import absolute_import diff --git a/test/integration/sagemaker/test_mnist.py b/test/integration/sagemaker/test_mnist.py new file mode 100644 index 00000000..da5f422e --- /dev/null +++ b/test/integration/sagemaker/test_mnist.py @@ -0,0 +1,62 @@ +# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +from __future__ import absolute_import + +import os + +import numpy as np +import pytest +import sagemaker +from sagemaker.pytorch import PyTorchModel + +from test.integration import (DEFAULT_TIMEOUT, mnist_script, model_cpu_dir, model_gpu_dir) +from test.integration.sagemaker.timeout import timeout, timeout_and_delete_endpoint + + +@pytest.mark.skip_gpu +def test_mnist_distributed_cpu(sagemaker_session, ecr_image, instance_type): + instance_type = instance_type or 'ml.c4.xlarge' + _test_mnist_distributed(sagemaker_session, ecr_image, instance_type) + + +@pytest.mark.skip_cpu +def test_mnist_distributed_gpu(sagemaker_session, ecr_image, instance_type): + instance_type = instance_type or 'ml.p2.xlarge' + _test_mnist_distributed(sagemaker_session, ecr_image, instance_type) + + +def _test_mnist_distributed(sagemaker_session, ecr_image, instance_type): + model_dir = os.path.join(model_cpu_dir, '1d', 'model.tar.gz') + + endpoint_name = sagemaker.utils.unique_name_from_base("sagemaker-pytorch-serving") + + model_data = sagemaker_session.upload_data( + path=model_dir, + key_prefix="sagemaker-pytorch-serving/models", + ) + + pytorch = PyTorchModel(model_data, + 'SageMakerRole', + mnist_script, + ecr_image, + sagemaker_session) + + with timeout_and_delete_endpoint(endpoint_name, sagemaker_session, minutes=30): + predictor = pytorch.deploy(initial_instance_count=1, instance_type=instance_type, + endpoint_name=endpoint_name) + + batch_size = 100 + data = np.random.rand(batch_size, 1, 28, 28).astype(np.float32) + output = predictor.predict(data) + + assert output.shape == (batch_size, 10) diff --git a/test/integration/sagemaker/timeout.py b/test/integration/sagemaker/timeout.py new file mode 100644 index 00000000..d98b538d --- /dev/null +++ b/test/integration/sagemaker/timeout.py @@ -0,0 +1,83 @@ +# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +# TODO: this is used in all containers and sdk. We should move it to container support or sdk test utils. +from __future__ import absolute_import +import signal +from contextlib import contextmanager +import logging + +from botocore.exceptions import ClientError + +LOGGER = logging.getLogger('timeout') + + +class TimeoutError(Exception): + pass + + +@contextmanager +def timeout(seconds=0, minutes=0, hours=0): + """Add a signal-based timeout to any block of code. + If multiple time units are specified, they will be added together to determine time limit. + Usage: + with timeout(seconds=5): + my_slow_function(...) + Args: + - seconds: The time limit, in seconds. + - minutes: The time limit, in minutes. + - hours: The time limit, in hours. + """ + + limit = seconds + 60 * minutes + 3600 * hours + + def handler(signum, frame): + raise TimeoutError('timed out after {} seconds'.format(limit)) + + try: + signal.signal(signal.SIGALRM, handler) + signal.alarm(limit) + + yield + finally: + signal.alarm(0) + + +@contextmanager +def timeout_and_delete_endpoint(endpoint_name, sagemaker_session, + seconds=0, minutes=0, hours=0): + with timeout(seconds=seconds, minutes=minutes, hours=hours) as t: + try: + yield [t] + finally: + try: + sagemaker_session.delete_endpoint(endpoint_name) + LOGGER.info("deleted endpoint {}".format(endpoint_name)) + except ClientError as ce: + if ce.response['Error']['Code'] == 'ValidationException': + # avoids the inner exception to be overwritten + pass + + +@contextmanager +def timeout_and_delete_endpoint_by_name(endpoint_name, sagemaker_session, seconds=0, minutes=0, hours=0): + with timeout(seconds=seconds, minutes=minutes, hours=hours) as t: + try: + yield [t] + finally: + try: + sagemaker_session.delete_endpoint(endpoint_name) + LOGGER.info('deleted endpoint {}'.format(endpoint_name)) + except ClientError as ce: + if ce.response['Error']['Code'] == 'ValidationException': + # avoids the inner exception to be overwritten + pass diff --git a/test/resources/call_model_fn_once.py b/test/resources/call_model_fn_once.py new file mode 100644 index 00000000..4f726578 --- /dev/null +++ b/test/resources/call_model_fn_once.py @@ -0,0 +1,37 @@ +# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +from __future__ import absolute_import + +import os + + +def model_fn(model_dir): + lock_file = os.path.join(model_dir, 'model_fn.lock.{}'.format(os.getpid())) + if os.path.exists(lock_file): + raise RuntimeError('model_fn called more than once (lock: {})'.format(lock_file)) + + open(lock_file, 'a').close() + + return 'model' + + +def input_fn(data, content_type): + return data + + +def predict_fn(data, model): + return b'output' + + +def output_fn(prediction, accept): + return prediction diff --git a/test/resources/distributed_operations.py b/test/resources/distributed_operations.py new file mode 100644 index 00000000..e8045842 --- /dev/null +++ b/test/resources/distributed_operations.py @@ -0,0 +1,268 @@ +# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +from __future__ import absolute_import +import argparse +import json +import logging +import os +import sys +import torch +import torch.distributed as dist +from torch.multiprocessing import Process +import torch.utils.data +import torch.utils.data.distributed + +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) +logger.addHandler(logging.StreamHandler(sys.stdout)) + + +def _get_tensor(rank, rows, columns): + device = torch.device( + "cuda:{}".format(dist.get_rank() % torch.cuda.device_count()) if torch.cuda.is_available() + else "cpu" + ) + tensor = torch.ones(rows, columns) * (rank + 1) + return tensor.to(device) + + +def _get_zeros_tensor(rows, columns): + device = torch.device( + "cuda:{}".format(dist.get_rank() % torch.cuda.device_count()) if torch.cuda.is_available() + else "cpu" + ) + tensor = torch.zeros(rows, columns) + return tensor.to(device) + + +def _get_zeros_tensors_list(rows, columns): + return [_get_zeros_tensor(rows, columns) for _ in range(dist.get_world_size())] + + +def _get_tensors_sum(rows, columns): + device = torch.device( + "cuda:{}".format(dist.get_rank() % torch.cuda.device_count()) if torch.cuda.is_available() + else "cpu" + ) + result = (1 + dist.get_world_size()) * dist.get_world_size() / 2 + tensor = torch.ones(rows, columns) * result + return tensor.to(device) + + +def _send_recv(rank, rows, columns): + source = 0 + tensor = _get_tensor(rank, rows, columns) + logger.debug('Rank: {},\nTensor BEFORE send_recv: {}'.format(rank, tensor)) + if rank == 0: + for i in range(1, dist.get_world_size()): + dist.send(tensor=tensor, dst=i) + else: + dist.recv(tensor=tensor, src=source) + logger.debug('Rank: {},\nTensor AFTER send_recv: {}\n'.format(rank, tensor)) + + assert torch.equal(tensor, _get_tensor(source, rows, columns)),\ + 'Rank {}: Tensor was not equal to rank {} tensor after send-recv.'.format(rank, source) + + +def _broadcast(rank, rows, columns): + source = 0 + tensor = _get_tensor(rank, rows, columns) + logger.debug('Rank: {},\nTensor BEFORE broadcast: {}'.format(rank, tensor)) + dist.broadcast(tensor, src=source) + logger.debug('Rank: {},\nTensor AFTER broadcast: {}\n'.format(rank, tensor)) + + assert torch.equal(tensor, _get_tensor(source, rows, columns)), \ + 'Rank {}: Tensor was not equal to rank {} tensor after broadcast.'.format(rank, source) + + +def _all_reduce(rank, rows, columns): + tensor = _get_tensor(rank, rows, columns) + logger.debug('Rank: {},\nTensor BEFORE all_reduce: {}'.format(rank, tensor)) + dist.all_reduce(tensor, op=dist.reduce_op.SUM) + logger.debug('Rank: {},\nTensor AFTER all_reduce: {}\n'.format(rank, tensor)) + + assert torch.equal(tensor, _get_tensors_sum(rows, columns)), \ + 'Rank {}: Tensor was not equal to SUM of {} tensors after all_reduce.'.format(rank, dist.get_world_size()) + + +def _reduce(rank, rows, columns): + dest = 0 + tensor = _get_tensor(rank, rows, columns) + logger.debug('Rank: {},\nTensor BEFORE reduce: {}'.format(rank, tensor)) + # this is inplace operation + dist.reduce(tensor, op=dist.reduce_op.SUM, dst=dest) + logger.debug('Rank: {},\nTensor AFTER reduce: {}\n'.format(rank, tensor)) + + if rank == dest: + assert torch.equal(tensor, _get_tensors_sum(rows, columns)), \ + 'Rank {}: Tensor was not equal to SUM of {} tensors after reduce.'.format(rank, dist.get_world_size()) + + +def _all_gather(rank, rows, columns): + tensor = _get_tensor(rank, rows, columns) + tensors_list = _get_zeros_tensors_list(rows, columns) + logger.debug('Rank: {},\nTensor BEFORE all_gather: {}'.format(rank, tensor)) + dist.all_gather(tensors_list, tensor) + logger.debug('Rank: {},\nTensor AFTER all_gather: {}. tensors_list: {}\n'.format( + rank, tensor, tensors_list)) + + # tensor shouldn't have changed + assert torch.equal(tensor, _get_tensor(rank, rows, columns)), \ + 'Rank {}: Tensor got changed after all_gather.'.format(rank) + for i in range(dist.get_world_size()): + assert torch.equal(tensors_list[i], _get_tensor(i, rows, columns)), \ + 'Rank {}: tensors lists are not the same after all_gather.' + + +def _gather(rank, rows, columns): + dest = 0 + tensor = _get_tensor(rank, rows, columns) + if rank == dest: + tensors_list = _get_zeros_tensors_list(rows, columns) + logger.debug('Rank: {},\nTensor BEFORE gather: {}. tensors_list: {}'.format( + rank, tensor, tensors_list)) + dist.gather(tensor=tensor, gather_list=tensors_list) + logger.debug('Rank: {},\nTensor AFTER gather: {}. tensors_list: {}\n'.format( + rank, tensor, tensors_list)) + for i in range(dist.get_world_size()): + assert torch.equal(tensors_list[i], _get_tensor(i, rows, columns)), \ + 'Rank {}: tensors lists are not the same after gather.' + else: + logger.debug('Rank: {},\nTensor BEFORE gather: {}\n'.format(rank, tensor)) + dist.gather(tensor=tensor, dst=dest) + logger.debug('Rank: {},\nTensor AFTER gather: {}\n'.format(rank, tensor)) + + # tensor shouldn't have changed + assert torch.equal(tensor, _get_tensor(rank, rows, columns)), \ + 'Rank {}: Tensor got changed after gather.'.format(rank) + + +def _scatter(rank, rows, columns): + source = 0 + tensor = _get_tensor(rank, rows, columns) + if rank == source: + tensors_list = _get_zeros_tensors_list(rows, columns) + logger.debug('Rank: {},\nTensor BEFORE scatter: {}. tensors_list: {}'.format( + rank, tensor, tensors_list)) + dist.scatter(tensor=tensor, scatter_list=tensors_list) + else: + logger.debug('Rank: {},\nTensor BEFORE scatter: {}\n'.format(rank, tensor)) + dist.scatter(tensor=tensor, src=source) + logger.debug('Rank: {},\nTensor AFTER scatter: {}\n'.format(rank, tensor)) + + assert torch.equal(tensor, _get_zeros_tensor(rows, columns)), \ + 'Rank {}: Tensor should be all zeroes after scatter.'.format(rank) + + +def _barrier(rank): + logger.debug('Rank: {}, Waiting for other processes before the barrier.'.format(rank)) + dist.barrier() + logger.debug('Rank: {}, Passing the barrier'.format(rank)) + + +def main(): + print('Starting') + parser = argparse.ArgumentParser() + # Configurable hyperparameters + parser.add_argument('--rows', type=int, default=1, + help='Number of rows in the tensor.') + parser.add_argument('--columns', type=int, default=1, + help='Number of columns in the tensor.') + parser.add_argument('--backend', type=str, default=None, + help='backend for distributed operations.') + + # Container environment + parser.add_argument('--hosts', type=list, default=json.loads(os.environ["SM_HOSTS"])) + parser.add_argument('--current-host', type=str, default=os.environ["SM_CURRENT_HOST"]) + parser.add_argument('--model-dir', type=str, default=os.environ["SM_MODEL_DIR"]) + parser.add_argument('--num-gpus', type=int, default=os.environ["SM_NUM_GPUS"]) + parser.add_argument('--num-cpus', type=int, default=os.environ["SM_NUM_CPUS"]) + + args = parser.parse_args() + + number_of_processes = args.num_gpus if args.num_gpus > 0 else args.num_cpus + world_size = number_of_processes * len(args.hosts) + logger.info('Running \'{}\' backend on {} nodes and {} processes. World size is {}.'.format( + args.backend, len(args.hosts), number_of_processes, world_size + )) + host_rank = args.hosts.index(args.current_host) + master_addr = args.hosts[0] + master_port = '55555' + processes = [] + for rank in range(number_of_processes): + process_rank = host_rank * number_of_processes + rank + p = Process( + target=init_processes, + args=(args.backend, + master_addr, + master_port, + process_rank, + world_size, + args.rows, + args.columns, + args.current_host, + args.num_gpus) + ) + p.start() + processes.append(p) + + for p in processes: + p.join() + + save('success', args.model_dir) + + +def init_processes(backend, master_addr, master_port, rank, world_size, + rows, columns, host, num_gpus): + # Initialize the distributed environment. + os.environ['WORLD_SIZE'] = str(world_size) + os.environ['RANK'] = str(rank) + os.environ['MASTER_ADDR'] = master_addr + os.environ['MASTER_PORT'] = master_port + + logger.info('Init process rank {} on host \'{}\''.format(rank, host)) + dist.init_process_group(backend=backend, rank=rank, world_size=world_size) + run(backend, rank, rows, columns, num_gpus) + + +def run(backend, rank, rows, columns, num_gpus): + # https://pytorch.org/docs/master/distributed.html + if backend == 'gloo': + print('Run operations supported by \'gloo\' backend.') + _broadcast(rank, rows, columns) + _all_reduce(rank, rows, columns) + _barrier(rank) + + # this operation supported only on cpu + if num_gpus == 0: + _send_recv(rank, rows, columns) + elif backend == 'nccl': + print('Run operations supported by \'nccl\' backend.') + # Note: nccl does not support gather or scatter as well: + # https://github.com/pytorch/pytorch/blob/v0.4.0/torch/lib/THD/base/data_channels/DataChannelNccl.cpp + _broadcast(rank, rows, columns) + _all_reduce(rank, rows, columns) + _reduce(rank, rows, columns) + _all_gather(rank, rows, columns) + + +def save(result, model_dir): + filename = os.path.join(model_dir, result) + if not os.path.exists(filename): + logger.info("Saving success result") + with open(filename, 'w') as f: + f.write(result) + + +if __name__ == '__main__': + main() diff --git a/test/resources/fastai/cifar/cifar.py b/test/resources/fastai/cifar/cifar.py new file mode 100644 index 00000000..fae2aa7c --- /dev/null +++ b/test/resources/fastai/cifar/cifar.py @@ -0,0 +1,46 @@ +# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + +# Based on https://github.com/fastai/fastai/blob/master/examples/train_cifar.py +# imports and the code was as much preserved to match the official example +from fastai.script import * +from fastai.vision import * +from fastai.vision.models.wrn import wrn_22 +from fastai.distributed import * +import torch + +torch.backends.cudnn.benchmark = True + +@call_parse +def main(gpu:Param("GPU to run on", str)=None): + """Distrubuted training of CIFAR-10. + Fastest speed is if you run as follows: + python -m fastai.launch cifar.py""" + gpu = setup_distrib(gpu) + n_gpus = int(os.environ.get("WORLD_SIZE", 1)) + tgz_path = os.environ.get('SM_CHANNEL_TRAINING') + path = os.path.join(tgz_path, 'cifar10_tiny') + tarfile.open(f'{path}.tgz', 'r:gz').extractall(tgz_path) + ds_tfms = ([*rand_pad(4, 32), flip_lr(p=0.5)], []) + workers = min(16, num_cpus()//n_gpus) + data = ImageDataBunch.from_folder(path, valid='test', ds_tfms=ds_tfms, bs=512//n_gpus, + num_workers=workers) + data.normalize(cifar_stats) + learn = Learner(data, wrn_22(), metrics=accuracy, path='/opt/ml', model_dir='model') + if gpu is None: + learn.model = nn.DataParallel(learn.model) + else: + learn.distributed(gpu) + learn.to_fp16() + learn.fit_one_cycle(2, 3e-3, wd=0.4) + learn.save(name='model') diff --git a/test/resources/fastai/cifar/train_cifar.py b/test/resources/fastai/cifar/train_cifar.py new file mode 100644 index 00000000..febac46b --- /dev/null +++ b/test/resources/fastai/cifar/train_cifar.py @@ -0,0 +1,6 @@ + +# TODO: replace with a shell script when it's supported by the container +import os + +# Using system instead of subprocess as an easier way to simulate shell script call +os.system('python -m fastai.launch cifar.py') diff --git a/test/resources/fastai/cifar_tiny/training/cifar10_tiny.tgz b/test/resources/fastai/cifar_tiny/training/cifar10_tiny.tgz new file mode 100644 index 00000000..7ce6d275 Binary files /dev/null and b/test/resources/fastai/cifar_tiny/training/cifar10_tiny.tgz differ diff --git a/test/resources/fastai/mnist.py b/test/resources/fastai/mnist.py new file mode 100644 index 00000000..d2889480 --- /dev/null +++ b/test/resources/fastai/mnist.py @@ -0,0 +1,29 @@ +# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + +# Based on https://github.com/fastai/fastai/blob/master/examples/train_mnist.py +# imports and the code was as much preserved to match the official example +from fastai.script import * +from fastai.vision import * + +@call_parse +def main(): + tgz_path = os.environ.get('SM_CHANNEL_TRAINING') + path = os.path.join(tgz_path, 'mnist_tiny') + tarfile.open(f'{path}.tgz', 'r:gz').extractall(tgz_path) + tfms = (rand_pad(2, 28), []) + data = ImageDataBunch.from_folder(path, ds_tfms=tfms, bs=64) + data.normalize(imagenet_stats) + learn = create_cnn(data, models.resnet18, metrics=accuracy, path='/opt/ml', model_dir='model') + learn.fit_one_cycle(1, 0.02) + learn.save(name='model') diff --git a/test/resources/fastai/mnist_tiny/training/mnist_tiny.tgz b/test/resources/fastai/mnist_tiny/training/mnist_tiny.tgz new file mode 100644 index 00000000..c24f3f03 Binary files /dev/null and b/test/resources/fastai/mnist_tiny/training/mnist_tiny.tgz differ diff --git a/test/resources/mnist/data/training/MNIST/processed/test.pt b/test/resources/mnist/data/training/MNIST/processed/test.pt new file mode 100644 index 00000000..1d2112e0 Binary files /dev/null and b/test/resources/mnist/data/training/MNIST/processed/test.pt differ diff --git a/test/resources/mnist/data/training/MNIST/processed/training.pt b/test/resources/mnist/data/training/MNIST/processed/training.pt new file mode 100644 index 00000000..7583094a Binary files /dev/null and b/test/resources/mnist/data/training/MNIST/processed/training.pt differ diff --git a/test/resources/mnist/data/training/model b/test/resources/mnist/data/training/model new file mode 100644 index 00000000..393ea140 Binary files /dev/null and b/test/resources/mnist/data/training/model differ diff --git a/test/resources/mnist/mnist.py b/test/resources/mnist/mnist.py new file mode 100644 index 00000000..779c46ff --- /dev/null +++ b/test/resources/mnist/mnist.py @@ -0,0 +1,227 @@ +# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +from __future__ import absolute_import +import argparse +import logging +import os +import sys + +import cv2 as cv +import sagemaker_containers +import torch +import torch.distributed as dist +import torch.nn as nn +import torch.nn.functional as F +import torch.optim as optim +import torch.utils.data +import torch.utils.data.distributed +from torchvision import datasets, transforms + +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) +logger.addHandler(logging.StreamHandler(sys.stdout)) + + +# Based on https://github.com/pytorch/examples/blob/master/mnist/main.py +class Net(nn.Module): + def __init__(self): + logger.info("Create neural network module") + + super(Net, self).__init__() + self.conv1 = nn.Conv2d(1, 10, kernel_size=5) + self.conv2 = nn.Conv2d(10, 20, kernel_size=5) + self.conv2_drop = nn.Dropout2d() + self.fc1 = nn.Linear(320, 50) + self.fc2 = nn.Linear(50, 10) + + def forward(self, x): + x = F.relu(F.max_pool2d(self.conv1(x), 2)) + x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2)) + x = x.view(-1, 320) + x = F.relu(self.fc1(x)) + x = F.dropout(x, training=self.training) + x = self.fc2(x) + return F.log_softmax(x, dim=1) + + +def _get_train_data_loader(batch_size, training_dir, is_distributed, **kwargs): + logger.info("Get train data loader") + dataset = datasets.MNIST(training_dir, train=True, transform=transforms.Compose([ + transforms.ToTensor(), + transforms.Normalize((0.1307,), (0.3081,)) + ])) + train_sampler = torch.utils.data.distributed.DistributedSampler(dataset) if is_distributed else None + return torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=train_sampler is None, + sampler=train_sampler, **kwargs) + + +def _get_test_data_loader(test_batch_size, training_dir, **kwargs): + logger.info("Get test data loader") + return torch.utils.data.DataLoader( + datasets.MNIST(training_dir, train=False, transform=transforms.Compose([ + transforms.ToTensor(), + transforms.Normalize((0.1307,), (0.3081,)) + ])), + batch_size=test_batch_size, shuffle=True, **kwargs) + + +def _average_gradients(model): + # Gradient averaging. + size = float(dist.get_world_size()) + for param in model.parameters(): + dist.all_reduce(param.grad.data, op=dist.reduce_op.SUM) + param.grad.data /= size + + +def train(args): + is_distributed = len(args.hosts) > 1 and args.backend is not None + logger.debug("Distributed training - {}".format(is_distributed)) + use_cuda = (args.processor == 'gpu') or (args.num_gpus > 0) + logger.debug("Number of gpus available - {}".format(args.num_gpus)) + kwargs = {'num_workers': 1, 'pin_memory': True} if use_cuda else {} + device = torch.device("cuda" if use_cuda else "cpu") + + if is_distributed: + # Initialize the distributed environment. + world_size = len(args.hosts) + os.environ['WORLD_SIZE'] = str(world_size) + host_rank = args.hosts.index(args.current_host) + os.environ['RANK'] = str(host_rank) + dist.init_process_group(backend=args.backend, rank=host_rank, world_size=world_size) + logger.info('Initialized the distributed environment: \'{}\' backend on {} nodes. '.format( + args.backend, dist.get_world_size()) + 'Current host rank is {}. Number of gpus: {}'.format( + dist.get_rank(), args.num_gpus)) + + # set the seed for generating random numbers + torch.manual_seed(args.seed) + if use_cuda: + torch.cuda.manual_seed(args.seed) + + train_loader = _get_train_data_loader(args.batch_size, args.data_dir, is_distributed, **kwargs) + test_loader = _get_test_data_loader(args.test_batch_size, args.data_dir, **kwargs) + + # TODO: assert the logs when we move to the SDK local mode + logger.debug("Processes {}/{} ({:.0f}%) of train data".format( + len(train_loader.sampler), len(train_loader.dataset), + 100. * len(train_loader.sampler) / len(train_loader.dataset) + )) + + logger.debug("Processes {}/{} ({:.0f}%) of test data".format( + len(test_loader.sampler), len(test_loader.dataset), + 100. * len(test_loader.sampler) / len(test_loader.dataset) + )) + + model = Net().to(device) + if is_distributed and use_cuda: + # multi-machine multi-gpu case + logger.debug("Multi-machine multi-gpu: using DistributedDataParallel.") + model = torch.nn.parallel.DistributedDataParallel(model) + elif use_cuda: + # single-machine multi-gpu case + logger.debug("Single-machine multi-gpu: using DataParallel().cuda().") + model = torch.nn.DataParallel(model).to(device) + else: + # single-machine or multi-machine cpu case + logger.debug("Single-machine/multi-machine cpu: using DataParallel.") + model = torch.nn.DataParallel(model) + + optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum) + + for epoch in range(1, args.epochs + 1): + model.train() + for batch_idx, (data, target) in enumerate(train_loader, 1): + data, target = data.to(device), target.to(device) + optimizer.zero_grad() + output = model(data) + loss = F.nll_loss(output, target) + loss.backward() + if is_distributed and not use_cuda: + # average gradients manually for multi-machine cpu case only + _average_gradients(model) + optimizer.step() + if batch_idx % args.log_interval == 0: + logger.debug('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format( + epoch, batch_idx * len(data), len(train_loader.sampler), + 100. * batch_idx / len(train_loader), loss.item())) + test(model, test_loader, device) + save_model(model, args.model_dir) + + +def test(model, test_loader, device): + model.eval() + test_loss = 0 + correct = 0 + with torch.no_grad(): + for data, target in test_loader: + data, target = data.to(device), target.to(device) + output = model(data) + test_loss += F.nll_loss(output, target, size_average=None).item() # sum up batch loss + pred = output.max(1, keepdim=True)[1] # get the index of the max log-probability + correct += pred.eq(target.view_as(pred)).sum().item() + + test_loss /= len(test_loader.dataset) + logger.debug('Test set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format( + test_loss, correct, len(test_loader.dataset), + 100. * correct / len(test_loader.dataset))) + + +def model_fn(model_dir): + logger.info('model_fn') + model = torch.nn.DataParallel(Net()) + with open(os.path.join(model_dir, 'model.pth'), 'rb') as f: + model.load_state_dict(torch.load(f)) + return model + + +def save_model(model, model_dir): + logger.info("Saving the model.") + path = os.path.join(model_dir, 'model.pth') + # recommended way from http://pytorch.org/docs/master/notes/serialization.html + torch.save(model.state_dict(), path) + + +if __name__ == '__main__': + # test opencv + print(cv.__version__) + + parser = argparse.ArgumentParser() + + # Data and model checkpoints directories + parser.add_argument('--batch-size', type=int, default=64, metavar='N', + help='input batch size for training (default: 64)') + parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N', + help='input batch size for testing (default: 1000)') + parser.add_argument('--epochs', type=int, default=1, metavar='N', + help='number of epochs to train (default: 10)') + parser.add_argument('--lr', type=float, default=0.01, metavar='LR', + help='learning rate (default: 0.01)') + parser.add_argument('--momentum', type=float, default=0.5, metavar='M', + help='SGD momentum (default: 0.5)') + parser.add_argument('--seed', type=int, default=1, metavar='S', + help='random seed (default: 1)') + parser.add_argument('--log-interval', type=int, default=100, metavar='N', + help='how many batches to wait before logging training status') + parser.add_argument('--backend', type=str, default=None, + help='backend for distributed training') + parser.add_argument('--processor', type=str, default='cpu', + help='backend for distributed training') + + # Container environment + env = sagemaker_containers.training_env() + parser.add_argument('--hosts', type=list, default=env.hosts) + parser.add_argument('--current-host', type=str, default=env.current_host) + parser.add_argument('--model-dir', type=str, default=env.model_dir) + parser.add_argument('--data-dir', type=str, default=env.channel_input_dirs['training']) + parser.add_argument('--num-gpus', type=int, default=env.num_gpus) + + train(parser.parse_args()) diff --git a/test/resources/mnist/mnist_1d.py b/test/resources/mnist/mnist_1d.py new file mode 100644 index 00000000..40246b01 --- /dev/null +++ b/test/resources/mnist/mnist_1d.py @@ -0,0 +1,47 @@ +# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +from __future__ import absolute_import +import os +import torch +import torch.nn as nn +import torch.nn.functional as F +import torch.utils.data +import torch.utils.data.distributed + + +# Based on https://github.com/pytorch/examples/blob/master/mnist/main.py +class Net(nn.Module): + def __init__(self): + super(Net, self).__init__() + self.conv1 = nn.Conv2d(1, 10, kernel_size=5) + self.conv2 = nn.Conv2d(10, 20, kernel_size=5) + self.conv2_drop = nn.Dropout2d() + self.fc1 = nn.Linear(320, 50) + self.fc2 = nn.Linear(50, 10) + + def forward(self, x): + x = x.view(x.shape[0], 1, 28, 28) + x = F.relu(F.max_pool2d(self.conv1(x), 2)) + x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2)) + x = x.view(-1, 320) + x = F.relu(self.fc1(x)) + x = F.dropout(x, training=self.training) + x = self.fc2(x) + return F.log_softmax(x, dim=1) + + +def model_fn(model_dir): + model = torch.nn.DataParallel(Net()) + with open(os.path.join(model_dir, 'model.pth'), 'rb') as f: + model.load_state_dict(torch.load(f)) + return model diff --git a/test/resources/mnist/model_cpu/1d/model.tar.gz b/test/resources/mnist/model_cpu/1d/model.tar.gz new file mode 100644 index 00000000..4ca07729 Binary files /dev/null and b/test/resources/mnist/model_cpu/1d/model.tar.gz differ diff --git a/test/resources/mnist/model_cpu/model.tar.gz b/test/resources/mnist/model_cpu/model.tar.gz new file mode 100644 index 00000000..f075782e Binary files /dev/null and b/test/resources/mnist/model_cpu/model.tar.gz differ diff --git a/test/resources/mnist/model_gpu/model.tar.gz b/test/resources/mnist/model_gpu/model.tar.gz new file mode 100644 index 00000000..ed39a7a8 Binary files /dev/null and b/test/resources/mnist/model_gpu/model.tar.gz differ diff --git a/test/unit/test_serving.py b/test/unit/test_serving.py new file mode 100644 index 00000000..4a06e179 --- /dev/null +++ b/test/unit/test_serving.py @@ -0,0 +1,197 @@ +# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +from __future__ import absolute_import + +import csv +import json + +import numpy as np +import pytest +import torch +import torch.nn as nn +from mock import MagicMock +from mock import patch +from sagemaker_containers.beta.framework import content_types, errors +from six import StringIO, BytesIO +from torch.autograd import Variable + +from sagemaker_pytorch_serving_container.serving import main, default_model_fn, default_input_fn +from sagemaker_pytorch_serving_container.serving import default_predict_fn, default_output_fn + +device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + + +class DummyModel(nn.Module): + def __init__(self, ): + super(DummyModel, self).__init__() + + def forward(self, x): + pass + + def __call__(self, tensor): + return 3 * tensor + + +@pytest.fixture(scope='session', name='tensor') +def fixture_tensor(): + tensor = torch.rand(5, 10, 7, 9) + return tensor.to(device) + + +def test_default_model_fn(): + with pytest.raises(NotImplementedError): + default_model_fn('model_dir') + + +def test_default_input_fn_json(tensor): + json_data = json.dumps(tensor.cpu().numpy().tolist()) + deserialized_np_array = default_input_fn(json_data, content_types.JSON) + + assert deserialized_np_array.is_cuda == torch.cuda.is_available() + assert torch.equal(tensor, deserialized_np_array) + + +def test_default_input_fn_csv(): + array = [[1, 2, 3], [4, 5, 6]] + str_io = StringIO() + csv.writer(str_io, delimiter=',').writerows(array) + + deserialized_np_array = default_input_fn(str_io.getvalue(), content_types.CSV) + + tensor = torch.FloatTensor(array).to(device) + assert torch.equal(tensor, deserialized_np_array) + assert deserialized_np_array.is_cuda == torch.cuda.is_available() + + +def test_default_input_fn_csv_bad_columns(): + str_io = StringIO() + csv_writer = csv.writer(str_io, delimiter=',') + csv_writer.writerow([1, 2, 3]) + csv_writer.writerow([1, 2, 3, 4]) + + with pytest.raises(ValueError): + default_input_fn(str_io.getvalue(), content_types.CSV) + + +def test_default_input_fn_npy(tensor): + stream = BytesIO() + np.save(stream, tensor.cpu().numpy()) + deserialized_np_array = default_input_fn(stream.getvalue(), content_types.NPY) + + assert deserialized_np_array.is_cuda == torch.cuda.is_available() + assert torch.equal(tensor, deserialized_np_array) + + +def test_default_input_fn_bad_content_type(): + with pytest.raises(errors.UnsupportedFormatError): + default_input_fn('', 'application/not_supported') + + +def test_default_predict_fn(tensor): + model = DummyModel() + prediction = default_predict_fn(tensor, model) + assert torch.equal(model(Variable(tensor)), prediction) + assert prediction.is_cuda == torch.cuda.is_available() + + +def test_default_predict_fn_cpu_cpu(tensor): + prediction = default_predict_fn(tensor.cpu(), DummyModel().cpu()) + + model = DummyModel().to(device) + assert torch.equal(model(Variable(tensor)), prediction) + assert prediction.is_cuda == torch.cuda.is_available() + + +@pytest.mark.skipif(not torch.cuda.is_available(), reason="cuda is not available") +def test_default_predict_fn_cpu_gpu(tensor): + model = DummyModel().cuda() + prediction = default_predict_fn(tensor.cpu(), model) + assert torch.equal(model(tensor), prediction) + assert prediction.is_cuda is True + + +@pytest.mark.skipif(not torch.cuda.is_available(), reason="cuda is not available") +def test_default_predict_fn_gpu_cpu(tensor): + prediction = default_predict_fn(tensor.cpu(), DummyModel().cpu()) + model = DummyModel().cuda() + assert torch.equal(model(tensor), prediction) + assert prediction.is_cuda is True + + +@pytest.mark.skipif(not torch.cuda.is_available(), reason="cuda is not available") +def test_default_predict_fn_gpu_gpu(tensor): + tensor = tensor.cuda() + model = DummyModel().cuda() + prediction = default_predict_fn(tensor, model) + assert torch.equal(model(tensor), prediction) + assert prediction.is_cuda is True + + +def test_default_output_fn_json(tensor): + output = default_output_fn(tensor, content_types.JSON) + + assert json.dumps(tensor.cpu().numpy().tolist()) in output.get_data(as_text=True) + assert content_types.JSON == output.mimetype + + +def test_default_output_fn_npy(tensor): + output = default_output_fn(tensor, content_types.NPY) + + stream = BytesIO() + np.save(stream, tensor.cpu().numpy()) + + assert stream.getvalue() in output.get_data(as_text=False) + assert content_types.NPY == output.mimetype + + +def test_default_output_fn_csv_long(): + tensor = torch.LongTensor([[1, 2, 3], [4, 5, 6]]) + output = default_output_fn(tensor, content_types.CSV) + + assert '1,2,3\n4,5,6\n' in output.get_data(as_text=True) + assert content_types.CSV == output.mimetype + + +def test_default_output_fn_csv_float(): + tensor = torch.FloatTensor([[1, 2, 3], [4, 5, 6]]) + output = default_output_fn(tensor, content_types.CSV) + + assert '1.0,2.0,3.0\n4.0,5.0,6.0\n' in output.get_data(as_text=True) + assert content_types.CSV == output.mimetype + + +def test_default_output_fn_bad_accept(): + with pytest.raises(errors.UnsupportedFormatError): + default_output_fn('', 'application/not_supported') + + +@pytest.mark.skipif(not torch.cuda.is_available(), reason="cuda is not available") +def test_default_output_fn_gpu(): + tensor_gpu = torch.LongTensor([[1, 2, 3], [4, 5, 6]]).cuda() + + output = default_output_fn(tensor_gpu, content_types.CSV) + + assert '1,2,3\n4,5,6\n' in output.get_data(as_text=True) + assert content_types.CSV == output.mimetype + + +@patch('sagemaker_containers.beta.framework.modules.import_module') +@patch('sagemaker_containers.beta.framework.worker.Worker') +@patch('sagemaker_containers.beta.framework.transformer.Transformer.initialize') +@patch('sagemaker_containers.beta.framework.env.ServingEnv', MagicMock()) +def test_hosting_start(mock_import_module, mock_worker, mock_transformer_init): + environ = MagicMock() + start_response = MagicMock() + main(environ, start_response) + mock_transformer_init.assert_called() + mock_worker.return_value.assert_called_with(environ, start_response) diff --git a/test/utils/__init__.py b/test/utils/__init__.py new file mode 100644 index 00000000..74f14335 --- /dev/null +++ b/test/utils/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +from __future__ import absolute_import diff --git a/test/utils/image_utils.py b/test/utils/image_utils.py new file mode 100644 index 00000000..7fc50c68 --- /dev/null +++ b/test/utils/image_utils.py @@ -0,0 +1,66 @@ +# Copyright 2018-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +from __future__ import absolute_import + +import os +import subprocess +import sys + +CYAN_COLOR = '\033[36m' +END_COLOR = '\033[0m' + + +def build_base_image(framework_name, framework_version, processor, base_image_tag, cwd='.'): + base_image_uri = get_base_image_uri(framework_name, base_image_tag) + + dockerfile_location = os.path.join('docker', framework_version, 'base', + 'Dockerfile.{}'.format(processor)) + + subprocess.check_call(['docker', 'build', '-t', base_image_uri, + '-f', dockerfile_location, cwd], cwd=cwd) + print('created image {}'.format(base_image_uri)) + return base_image_uri + + +def build_image(framework_name, framework_version, processor, tag, cwd='.'): + _check_call('python setup.py bdist_wheel') + + image_uri = get_image_uri(framework_name, tag) + + dockerfile_location = os.path.join('docker', framework_version, 'final', + 'Dockerfile.{}'.format(processor)) + + subprocess.check_call( + ['docker', 'build', '-t', image_uri, '-f', dockerfile_location, cwd], cwd=cwd) + print('created image {}'.format(image_uri)) + return image_uri + + +def get_base_image_uri(framework_name, base_image_tag): + return '{}-base:{}'.format(framework_name, base_image_tag) + + +def get_image_uri(framework_name, tag): + return '{}:{}'.format(framework_name, tag) + + +def _check_call(cmd, *popenargs, **kwargs): + if isinstance(cmd, str): + cmd = cmd.split(" ") + _print_cmd(cmd) + subprocess.check_call(cmd, *popenargs, **kwargs) + + +def _print_cmd(cmd): + print('executing docker command: {}{}{}'.format(CYAN_COLOR, ' '.join(cmd), END_COLOR)) + sys.stdout.flush() diff --git a/test/utils/local_mode_utils.py b/test/utils/local_mode_utils.py new file mode 100644 index 00000000..b5f77159 --- /dev/null +++ b/test/utils/local_mode_utils.py @@ -0,0 +1,46 @@ +# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +from __future__ import absolute_import + +from contextlib import contextmanager +import fcntl +import os +import tarfile +import time + +from test.integration import resources_path + +LOCK_PATH = os.path.join(resources_path, 'local_mode_lock') + + +@contextmanager +def lock(): + # Since Local Mode uses the same port for serving, we need a lock in order + # to allow concurrent test execution. + local_mode_lock_fd = open(LOCK_PATH, 'w') + local_mode_lock = local_mode_lock_fd.fileno() + + fcntl.lockf(local_mode_lock, fcntl.LOCK_EX) + + try: + yield + finally: + time.sleep(5) + fcntl.lockf(local_mode_lock, fcntl.LOCK_UN) + + +def assert_files_exist(output_path, directory_file_map): + for directory, files in directory_file_map.items(): + with tarfile.open(os.path.join(output_path, '{}.tar.gz'.format(directory))) as tar: + for f in files: + tar.getmember(f) diff --git a/tox.ini b/tox.ini new file mode 100644 index 00000000..979b5935 --- /dev/null +++ b/tox.ini @@ -0,0 +1,70 @@ +# Tox (http://tox.testrun.org/) is a tool for running tests +# in multiple virtualenvs. This configuration file will run the +# test suite on all supported python versions. To use it, "pip install tox" +# and then run "tox" from this directory. + +[tox] +envlist = py36,flake8 +skip_missing_interpreters = False + +[flake8] +max-line-length = 120 +exclude = + build/ + .git + __pycache__ + .tox + test/resources/ + lib/ +max-complexity = 10 +ignore = + FI10, + FI12, + FI13, + FI14, + FI15, + FI16, + FI17, + FI50, + FI51, + FI52, + FI53, + FI54, + FI55, + FI56, + FI57 +require-code = True + +[testenv] +# {posargs} can be passed in by additional arguments specified when invoking tox. +# Can be used to specify which tests to run, e.g.: tox -- -s +passenv = + AWS_ACCESS_KEY_ID + AWS_SECRET_ACCESS_KEY + AWS_SESSION_TOKEN + AWS_CONTAINER_CREDENTIALS_RELATIVE_URI + AWS_DEFAULT_REGION +commands = + coverage run --source sagemaker_pytorch_serving_container -m py.test test/unit {posargs} + coverage report +deps = + coverage + pytest + pytest-cov + pytest-xdist + mock + requests == 2.20.0 + urllib3 < 1.23, >= 1.21 + sagemaker + sagemaker-containers + torch + torchvision + retrying + six + +[testenv:flake8] +basepython = python +deps = + flake8 + flake8-future-import +commands = flake8