diff --git a/.flake8 b/.flake8 new file mode 100644 index 00000000..2801db95 --- /dev/null +++ b/.flake8 @@ -0,0 +1,3 @@ +[flake8] +application_import_names = image_utils, integration, local_mode_utils, sagemaker_pytorch_container, test, test-toolkit, timeout, utils +import-order-style = google diff --git a/buildspec-release.yml b/buildspec-release.yml index b404bb8a..698c7945 100644 --- a/buildspec-release.yml +++ b/buildspec-release.yml @@ -12,7 +12,7 @@ phases: # run unit tests - AWS_ACCESS_KEY_ID= AWS_SECRET_ACCESS_KEY= AWS_SESSION_TOKEN= AWS_CONTAINER_CREDENTIALS_RELATIVE_URI= AWS_DEFAULT_REGION= - tox -e py27,py36 -- test/unit + tox -e py36 -- test/unit # run local integ tests #- $(aws ecr get-login --no-include-email --region us-west-2) diff --git a/buildspec-toolkit.yml b/buildspec-toolkit.yml new file mode 100644 index 00000000..a1a2f8ed --- /dev/null +++ b/buildspec-toolkit.yml @@ -0,0 +1,102 @@ +version: 0.2 + +env: + variables: + FRAMEWORK_VERSION: '1.4.0' + EIA_FRAMEWORK_VERSION: '1.3.1' + CPU_INSTANCE_TYPE: 'ml.c4.xlarge' + GPU_INSTANCE_TYPE: 'ml.p2.xlarge' + EIA_ACCELERATOR_TYPE: 'ml.eia2.medium' + ECR_REPO: 'sagemaker-test' + GITHUB_REPO: 'sagemaker-pytorch-serving-container' + DLC_ACCOUNT: '763104351884' + SETUP_FILE: 'setup_cmds.sh' + SETUP_CMDS: '#!/bin/bash\npip install --upgrade pip\npip install -U -e .\npip install -U -e .[test]' + +phases: + pre_build: + commands: + - start-dockerd + - ACCOUNT=$(aws --region $AWS_DEFAULT_REGION sts --endpoint-url https://sts.$AWS_DEFAULT_REGION.amazonaws.com get-caller-identity --query 'Account' --output text) + - PREPROD_IMAGE="$ACCOUNT.dkr.ecr.$AWS_DEFAULT_REGION.amazonaws.com/$ECR_REPO" + - PR_NUM=$(echo $CODEBUILD_SOURCE_VERSION | grep -o '[0-9]\+') + - BUILD_ID="$(echo $CODEBUILD_BUILD_ID | sed -e 's/:/-/g')" + - echo 'Pull request number:' $PR_NUM '. No value means this build is not from pull request.' + + build: + commands: + - TOX_PARALLEL_NO_SPINNER=1 + - PY_COLORS=0 + + # install + - pip3 install -U -e .[test] + + # run linters + - tox -e flake8,twine + + # run unit tests + - tox -e py36 test-toolkit/unit + + # define tags + - GENERIC_TAG="$FRAMEWORK_VERSION-pytorch-$BUILD_ID" + - DLC_CPU_TAG="$FRAMEWORK_VERSION-dlc-cpu-$BUILD_ID" + - DLC_GPU_TAG="$FRAMEWORK_VERSION-dlc-gpu-$BUILD_ID" + - DLC_EIA_TAG="$FRAMEWORK_VERSION-dlc-eia-$BUILD_ID" + + # run local CPU integration tests (build and push the image to ECR repo) + - test_cmd="pytest test-toolkit/integration/local --build-image --push-image --dockerfile-type pytorch --region $AWS_DEFAULT_REGION --docker-base-name $ECR_REPO --aws-id $ACCOUNT --framework-version $FRAMEWORK_VERSION --processor cpu --tag $GENERIC_TAG" + - execute-command-if-has-matching-changes "$test_cmd" "test-toolkit/" "src/*.py" "setup.py" "setup.cfg" "buildspec-toolkit.yml" "docker/build_artifacts/*" + - test_cmd="pytest test-toolkit/integration/local --build-image --push-image --dockerfile-type dlc.cpu --region $AWS_DEFAULT_REGION --docker-base-name $ECR_REPO --aws-id $ACCOUNT --framework-version $FRAMEWORK_VERSION --processor cpu --tag $DLC_CPU_TAG" + - execute-command-if-has-matching-changes "$test_cmd" "test-toolkit/" "src/*.py" "setup.py" "setup.cfg" "buildspec-toolkit.yml" "docker/build_artifacts/*" + + # launch remote GPU instance + - prefix='ml.' + - instance_type=${GPU_INSTANCE_TYPE#"$prefix"} + - create-key-pair + - launch-ec2-instance --instance-type $instance_type --ami-name dlami-ubuntu-latest + + # build DLC GPU image because the base DLC image is too big and takes too long to build as part of the test + - python3 setup.py sdist + - build_dir="test-toolkit/docker/$FRAMEWORK_VERSION" + - $(aws ecr get-login --registry-ids $DLC_ACCOUNT --no-include-email --region $AWS_DEFAULT_REGION) + - docker build -f "$build_dir/Dockerfile.dlc.gpu" -t $PREPROD_IMAGE:$DLC_GPU_TAG --build-arg region=$AWS_DEFAULT_REGION . + # push DLC GPU image to ECR + - $(aws ecr get-login --registry-ids $ACCOUNT --no-include-email --region $AWS_DEFAULT_REGION) + - docker push $PREPROD_IMAGE:$DLC_GPU_TAG + + # run GPU local integration tests + - printf "$SETUP_CMDS" > $SETUP_FILE + # no reason to rebuild the image again since it was already built and pushed to ECR during CPU tests + - generic_cmd="pytest test-toolkit/integration/local --region $AWS_DEFAULT_REGION --docker-base-name $ECR_REPO --aws-id $ACCOUNT --framework-version $FRAMEWORK_VERSION --processor gpu --tag $GENERIC_TAG" + - test_cmd="remote-test --github-repo $GITHUB_REPO --test-cmd \"$generic_cmd\" --setup-file $SETUP_FILE --pr-number \"$PR_NUM\"" + - execute-command-if-has-matching-changes "$test_cmd" "test-toolkit/" "src/*.py" "setup.py" "setup.cfg" "buildspec-toolkit.yml" "docker/build_artifacts/*" + - dlc_cmd="pytest test-toolkit/integration/local --region $AWS_DEFAULT_REGION --docker-base-name $ECR_REPO --aws-id $ACCOUNT --framework-version $FRAMEWORK_VERSION --processor gpu --tag $DLC_GPU_TAG" + - test_cmd="remote-test --github-repo $GITHUB_REPO --test-cmd \"$dlc_cmd\" --setup-file $SETUP_FILE --pr-number \"$PR_NUM\" --skip-setup" + - execute-command-if-has-matching-changes "$test_cmd" "test-toolkit/" "src/*.py" "setup.py" "setup.cfg" "buildspec-toolkit.yml" "docker/build_artifacts/*" + + # run CPU sagemaker integration tests + - test_cmd="pytest test-toolkit/integration/sagemaker --region $AWS_DEFAULT_REGION --docker-base-name $ECR_REPO --aws-id $ACCOUNT --framework-version $FRAMEWORK_VERSION --processor cpu --instance-type $CPU_INSTANCE_TYPE --tag $GENERIC_TAG" + - execute-command-if-has-matching-changes "$test_cmd" "test-toolkit/" "src/*.py" "setup.py" "setup.cfg" "buildspec-toolkit.yml" "docker/build_artifacts/*" + - test_cmd="pytest test-toolkit/integration/sagemaker --region $AWS_DEFAULT_REGION --docker-base-name $ECR_REPO --aws-id $ACCOUNT --framework-version $FRAMEWORK_VERSION --processor cpu --instance-type $CPU_INSTANCE_TYPE --tag $DLC_CPU_TAG" + - execute-command-if-has-matching-changes "$test_cmd" "test-toolkit/" "src/*.py" "setup.py" "setup.cfg" "buildspec-toolkit.yml" "docker/build_artifacts/*" + + # run GPU sagemaker integration tests + - test_cmd="pytest test-toolkit/integration/sagemaker --region $AWS_DEFAULT_REGION --docker-base-name $ECR_REPO --aws-id $ACCOUNT --framework-version $FRAMEWORK_VERSION --processor gpu --instance-type $GPU_INSTANCE_TYPE --tag $GENERIC_TAG" + - execute-command-if-has-matching-changes "$test_cmd" "test-toolkit/" "src/*.py" "setup.py" "setup.cfg" "buildspec-toolkit.yml" "docker/build_artifacts/*" + - test_cmd="pytest test-toolkit/integration/sagemaker --region $AWS_DEFAULT_REGION --docker-base-name $ECR_REPO --aws-id $ACCOUNT --framework-version $FRAMEWORK_VERSION --processor gpu --instance-type $GPU_INSTANCE_TYPE --tag $DLC_GPU_TAG" + - execute-command-if-has-matching-changes "$test_cmd" "test-toolkit/" "src/*.py" "setup.py" "setup.cfg" "buildspec-toolkit.yml" "docker/build_artifacts/*" + + # run EIA sagemaker integration tests + - test_cmd="pytest test-toolkit/integration/sagemaker --build-image --push-image --dockerfile-type dlc.eia --region $AWS_DEFAULT_REGION --docker-base-name $ECR_REPO --aws-id $ACCOUNT --framework-version $EIA_FRAMEWORK_VERSION --processor cpu --instance-type $CPU_INSTANCE_TYPE --accelerator-type $EIA_ACCELERATOR_TYPE --tag $DLC_EIA_TAG" + - execute-command-if-has-matching-changes "$test_cmd" "test-toolkit/" "src/*.py" "setup.py" "setup.cfg" "buildspec-toolkit.yml" "docker/build_artifacts/*" + + finally: + # shut down remote GPU instance + - cleanup-gpu-instances + - cleanup-key-pairs + + # remove ECR image + - aws ecr batch-delete-image --repository-name $ECR_REPO --region $AWS_DEFAULT_REGION --image-ids imageTag=$GENERIC_TAG + - aws ecr batch-delete-image --repository-name $ECR_REPO --region $AWS_DEFAULT_REGION --image-ids imageTag=$DLC_CPU_TAG + - aws ecr batch-delete-image --repository-name $ECR_REPO --region $AWS_DEFAULT_REGION --image-ids imageTag=$DLC_GPU_TAG + - aws ecr batch-delete-image --repository-name $ECR_REPO --region $AWS_DEFAULT_REGION --image-ids imageTag=$DLC_EIA_TAG diff --git a/test-toolkit/conftest.py b/test-toolkit/conftest.py index 0fe0dcdc..d2f9fac8 100644 --- a/test-toolkit/conftest.py +++ b/test-toolkit/conftest.py @@ -18,13 +18,11 @@ import platform import pytest import shutil -import sys import tempfile from sagemaker import LocalSession, Session -from sagemaker.pytorch import PyTorch -from test.utils import image_utils +from utils import image_utils logger = logging.getLogger(__name__) logging.getLogger('boto').setLevel(logging.INFO) @@ -44,19 +42,36 @@ def pytest_addoption(parser): - parser.addoption('--build-image', '-D', action='store_true') - parser.addoption('--build-base-image', '-B', action='store_true') - parser.addoption('--aws-id') + parser.addoption('--build-image', '-B', action='store_true') + parser.addoption('--push-image', '-P', action='store_true') + parser.addoption('--dockerfile-type', '-T', + choices=['dlc.cpu', 'dlc.gpu', 'dlc.eia', 'pytorch'], + default='pytorch') + parser.addoption('--dockerfile', '-D', default=None) + parser.addoption('--aws-id', default=None) parser.addoption('--instance-type') - parser.addoption('--docker-base-name', default='pytorch') + parser.addoption('--accelerator-type') + parser.addoption('--docker-base-name', default='sagemaker-pytorch-inference') parser.addoption('--region', default='us-west-2') - parser.addoption('--framework-version', default=PyTorch.LATEST_VERSION) - parser.addoption('--py-version', choices=['2', '3'], default=str(sys.version_info.major)) + parser.addoption('--framework-version', default="1.4.0") + parser.addoption('--py-version', choices=['2', '3'], default='3') + # Processor is still "cpu" for EIA tests parser.addoption('--processor', choices=['gpu', 'cpu'], default='cpu') # If not specified, will default to {framework-version}-{processor}-py{py-version} parser.addoption('--tag', default=None) +@pytest.fixture(scope='session', name='dockerfile_type') +def fixture_dockerfile_type(request): + return request.config.getoption('--dockerfile-type') + + +@pytest.fixture(scope='session', name='dockerfile') +def fixture_dockerfile(request, dockerfile_type): + dockerfile = request.config.getoption('--dockerfile') + return dockerfile if dockerfile else 'Dockerfile.{}'.format(dockerfile_type) + + @pytest.fixture(scope='session', name='docker_base_name') def fixture_docker_base_name(request): return request.config.getoption('--docker-base-name') @@ -89,11 +104,6 @@ def fixture_tag(request, framework_version, processor, py_version): return provided_tag if provided_tag else default_tag -@pytest.fixture(scope='session', name='docker_image') -def fixture_docker_image(docker_base_name, tag): - return '{}:{}'.format(docker_base_name, tag) - - @pytest.fixture def opt_ml(): tmp = tempfile.mkdtemp() @@ -112,32 +122,25 @@ def fixture_use_gpu(processor): return processor == 'gpu' -@pytest.fixture(scope='session', name='build_base_image', autouse=True) -def fixture_build_base_image(request, framework_version, py_version, processor, tag, docker_base_name): - build_base_image = request.config.getoption('--build-base-image') - if build_base_image: - return image_utils.build_base_image(framework_name=docker_base_name, - framework_version=framework_version, - py_version=py_version, - base_image_tag=tag, - processor=processor, - cwd=os.path.join(dir_path, '..')) - - return tag - - @pytest.fixture(scope='session', name='build_image', autouse=True) -def fixture_build_image(request, framework_version, py_version, processor, tag, docker_base_name): +def fixture_build_image(request, framework_version, dockerfile, image_uri, region): build_image = request.config.getoption('--build-image') if build_image: - return image_utils.build_image(framework_name=docker_base_name, - framework_version=framework_version, - py_version=py_version, - processor=processor, - tag=tag, + return image_utils.build_image(framework_version=framework_version, + dockerfile=dockerfile, + image_uri=image_uri, + region=region, cwd=os.path.join(dir_path, '..')) - return tag + return image_uri + + +@pytest.fixture(scope='session', name='push_image', autouse=True) +def fixture_push_image(request, image_uri, region, aws_id): + push_image = request.config.getoption('--push-image') + if push_image: + return image_utils.push_image(image_uri, region, aws_id) + return None @pytest.fixture(scope='session', name='sagemaker_session') @@ -162,32 +165,80 @@ def fixture_instance_type(request, processor): return provided_instance_type or default_instance_type +@pytest.fixture(name='accelerator_type', scope='session') +def fixture_accelerator_type(request): + return request.config.getoption('--accelerator-type') + + @pytest.fixture(name='docker_registry', scope='session') def fixture_docker_registry(aws_id, region): - return '{}.dkr.ecr.{}.amazonaws.com'.format(aws_id, region) + return '{}.dkr.ecr.{}.amazonaws.com'.format(aws_id, region) if aws_id else None + + +@pytest.fixture(name='image_uri', scope='session') +def fixture_image_uri(docker_registry, docker_base_name, tag): + if docker_registry: + return '{}/{}:{}'.format(docker_registry, docker_base_name, tag) + return '{}:{}'.format(docker_base_name, tag) + + +@pytest.fixture(scope='session', name='dist_cpu_backend', params=['gloo']) +def fixture_dist_cpu_backend(request): + return request.param -@pytest.fixture(name='ecr_image', scope='session') -def fixture_ecr_image(docker_registry, docker_base_name, tag): - return '{}/{}:{}'.format(docker_registry, docker_base_name, tag) +@pytest.fixture(scope='session', name='dist_gpu_backend', params=['gloo', 'nccl']) +def fixture_dist_gpu_backend(request): + return request.param @pytest.fixture(autouse=True) -def skip_by_device_type(request, use_gpu, instance_type): +def skip_by_device_type(request, use_gpu, instance_type, accelerator_type): is_gpu = use_gpu or instance_type[3] in ['g', 'p'] - if (request.node.get_closest_marker('skip_gpu') and is_gpu) or \ - (request.node.get_closest_marker('skip_cpu') and not is_gpu): + is_eia = accelerator_type is not None + + # Separate out cases for clearer logic. + # When running GPU test, skip CPU test. When running CPU test, skip GPU test. + if (request.node.get_closest_marker('gpu_test') and not is_gpu) or \ + (request.node.get_closest_marker('cpu_test') and is_gpu): + pytest.skip('Skipping because running on \'{}\' instance'.format(instance_type)) + + # When running EIA test, skip the CPU and GPU functions + elif (request.node.get_closest_marker('gpu_test') or request.node.get_closest_marker('cpu_test')) and is_eia: + pytest.skip('Skipping because running on \'{}\' instance'.format(instance_type)) + + # When running CPU or GPU test, skip EIA test. + elif request.node.get_closest_marker('eia_test') and not is_eia: pytest.skip('Skipping because running on \'{}\' instance'.format(instance_type)) @pytest.fixture(autouse=True) def skip_by_py_version(request, py_version): + """ + This will cause tests to be skipped w/ py3 containers if "py-version" flag is not set + and pytest is running from py2. We can rely on this when py2 is deprecated, but for now + we must use "skip_py2_containers" + """ if request.node.get_closest_marker('skip_py2') and py_version != 'py3': pytest.skip('Skipping the test because Python 2 is not supported.') +@pytest.fixture(autouse=True) +def skip_test_in_region(request, region): + if request.node.get_closest_marker('skip_test_in_region'): + if region == 'me-south-1': + pytest.skip('Skipping SageMaker test in region {}'.format(region)) + + @pytest.fixture(autouse=True) def skip_gpu_instance_restricted_regions(region, instance_type): - if (region in NO_P2_REGIONS and instance_type.startswith('ml.p2')) \ - or (region in NO_P3_REGIONS and instance_type.startswith('ml.p3')): + if ((region in NO_P2_REGIONS and instance_type.startswith('ml.p2')) + or (region in NO_P3_REGIONS and instance_type.startswith('ml.p3'))): pytest.skip('Skipping GPU test in region {}'.format(region)) + + +@pytest.fixture(autouse=True) +def skip_py2_containers(request, tag): + if request.node.get_closest_marker('skip_py2_containers'): + if 'py2' in tag: + pytest.skip('Skipping python2 container with tag {}'.format(tag)) diff --git a/test-toolkit/docker/1.3.1/Dockerfile.dlc.eia b/test-toolkit/docker/1.3.1/Dockerfile.dlc.eia new file mode 100644 index 00000000..ce2dbcb4 --- /dev/null +++ b/test-toolkit/docker/1.3.1/Dockerfile.dlc.eia @@ -0,0 +1,6 @@ +ARG region +FROM 763104351884.dkr.ecr.$region.amazonaws.com/pytorch-inference-eia:1.3.1-cpu-py3 + +COPY dist/sagemaker_pytorch_inference-*.tar.gz /sagemaker_pytorch_inference.tar.gz +RUN pip install --upgrade --no-cache-dir /sagemaker_pytorch_inference.tar.gz && \ + rm /sagemaker_pytorch_inference.tar.gz diff --git a/test-toolkit/docker/1.4.0/Dockerfile.dlc.cpu b/test-toolkit/docker/1.4.0/Dockerfile.dlc.cpu new file mode 100644 index 00000000..22b2a139 --- /dev/null +++ b/test-toolkit/docker/1.4.0/Dockerfile.dlc.cpu @@ -0,0 +1,6 @@ +ARG region +FROM 763104351884.dkr.ecr.$region.amazonaws.com/pytorch-inference:1.4.0-cpu-py3 + +COPY dist/sagemaker_pytorch_inference-*.tar.gz /sagemaker_pytorch_inference.tar.gz +RUN pip install --upgrade --no-cache-dir /sagemaker_pytorch_inference.tar.gz && \ + rm /sagemaker_pytorch_inference.tar.gz diff --git a/test-toolkit/docker/1.4.0/Dockerfile.dlc.gpu b/test-toolkit/docker/1.4.0/Dockerfile.dlc.gpu new file mode 100644 index 00000000..2189c646 --- /dev/null +++ b/test-toolkit/docker/1.4.0/Dockerfile.dlc.gpu @@ -0,0 +1,6 @@ +ARG region +FROM 763104351884.dkr.ecr.$region.amazonaws.com/pytorch-inference:1.4.0-gpu-py3 + +COPY dist/sagemaker_pytorch_inference-*.tar.gz /sagemaker_pytorch_inference.tar.gz +RUN pip install --upgrade --no-cache-dir /sagemaker_pytorch_inference.tar.gz && \ + rm /sagemaker_pytorch_inference.tar.gz diff --git a/test-toolkit/docker/1.4.0/Dockerfile.pytorch b/test-toolkit/docker/1.4.0/Dockerfile.pytorch new file mode 100644 index 00000000..a9da1ca3 --- /dev/null +++ b/test-toolkit/docker/1.4.0/Dockerfile.pytorch @@ -0,0 +1,41 @@ +FROM pytorch/pytorch:1.4-cuda10.1-cudnn7-runtime + +LABEL com.amazonaws.sagemaker.capabilities.accept-bind-to-port=true +LABEL com.amazonaws.sagemaker.capabilities.multi-models=true + +ARG MMS_VERSION=1.0.8 + +ENV SAGEMAKER_SERVING_MODULE sagemaker_pytorch_serving_container.serving:main +ENV TEMP=/home/model-server/tmp + +RUN apt-get update \ + && apt-get install -y --no-install-recommends \ + libgl1-mesa-glx \ + libglib2.0-0 \ + libsm6 \ + libxext6 \ + libxrender-dev \ + openjdk-8-jdk-headless \ + && rm -rf /var/lib/apt/lists/* + +RUN conda install -c conda-forge opencv==4.0.1 \ + && ln -s /opt/conda/bin/pip /usr/local/bin/pip3 + +RUN pip install mxnet-model-server==$MMS_VERSION + +COPY dist/sagemaker_pytorch_inference-*.tar.gz /sagemaker_pytorch_inference.tar.gz +RUN pip install --no-cache-dir /sagemaker_pytorch_inference.tar.gz && \ + rm /sagemaker_pytorch_inference.tar.gz + +RUN useradd -m model-server \ + && mkdir -p /home/model-server/tmp \ + && chown -R model-server /home/model-server + +COPY docker/build_artifacts/mms-entrypoint.py /usr/local/bin/dockerd-entrypoint.py +COPY docker/build_artifacts/config.properties /home/model-server + +RUN chmod +x /usr/local/bin/dockerd-entrypoint.py + +EXPOSE 8080 8081 +ENTRYPOINT ["python", "/usr/local/bin/dockerd-entrypoint.py"] +CMD ["mxnet-model-server", "--start", "--mms-config", "/home/model-server/config.properties"] diff --git a/test-toolkit/integration/__init__.py b/test-toolkit/integration/__init__.py index f8beb0d5..7904d0fd 100644 --- a/test-toolkit/integration/__init__.py +++ b/test-toolkit/integration/__init__.py @@ -16,16 +16,22 @@ resources_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'resources')) mnist_path = os.path.join(resources_path, 'mnist') -mnist_script = os.path.join(mnist_path, 'mnist.py') data_dir = os.path.join(mnist_path, 'data') training_dir = os.path.join(data_dir, 'training') +cpu_sub_dir = 'model_cpu' +gpu_sub_dir = 'model_gpu' +eia_sub_dir = 'model_eia' -model_cpu_dir = os.path.join(mnist_path, 'model_cpu') +model_cpu_dir = os.path.join(mnist_path, cpu_sub_dir) +mnist_cpu_script = os.path.join(model_cpu_dir, 'mnist.py') model_cpu_1d_dir = os.path.join(model_cpu_dir, '1d') mnist_1d_script = os.path.join(model_cpu_1d_dir, 'mnist_1d.py') -model_gpu_dir = os.path.join(mnist_path, 'model_gpu') +model_gpu_dir = os.path.join(mnist_path, gpu_sub_dir) +mnist_gpu_script = os.path.join(model_gpu_dir, 'mnist.py') model_gpu_1d_dir = os.path.join(model_gpu_dir, '1d') -call_model_fn_once_script = os.path.join(resources_path, 'call_model_fn_once.py') +model_eia_dir = os.path.join(mnist_path, eia_sub_dir) +mnist_eia_script = os.path.join(model_eia_dir, 'mnist.py') +call_model_fn_once_script = os.path.join(model_cpu_dir, 'call_model_fn_once.py') ROLE = 'dummy/unused-role' DEFAULT_TIMEOUT = 20 diff --git a/test-toolkit/integration/local/test_serving.py b/test-toolkit/integration/local/test_serving.py index 5c018958..f54c675f 100644 --- a/test-toolkit/integration/local/test_serving.py +++ b/test-toolkit/integration/local/test_serving.py @@ -25,9 +25,9 @@ from sagemaker_containers.beta.framework import content_types from torchvision import datasets, transforms -from test.integration import training_dir, mnist_script, mnist_1d_script, model_cpu_dir, \ - model_gpu_dir, model_cpu_1d_dir, call_model_fn_once_script, ROLE -from test.utils import local_mode_utils +from integration import training_dir, mnist_1d_script, model_cpu_dir, mnist_cpu_script, \ + model_gpu_dir, mnist_gpu_script, model_cpu_1d_dir, call_model_fn_once_script, ROLE +from utils import local_mode_utils CONTENT_TYPE_TO_SERIALIZER_MAP = { content_types.CSV: csv_serializer, @@ -48,31 +48,32 @@ def fixture_test_loader(): return _get_test_data_loader(batch_size=300) -def test_serve_json_npy(test_loader, use_gpu, docker_image, sagemaker_local_session, instance_type): +def test_serve_json_npy(test_loader, use_gpu, image_uri, sagemaker_local_session, instance_type): model_dir = model_gpu_dir if use_gpu else model_cpu_dir - with _predictor(model_dir, mnist_script, docker_image, sagemaker_local_session, + mnist_script = mnist_gpu_script if use_gpu else mnist_cpu_script + with _predictor(model_dir, mnist_script, image_uri, sagemaker_local_session, instance_type) as predictor: for content_type in (content_types.JSON, content_types.NPY): for accept in (content_types.JSON, content_types.CSV, content_types.NPY): _assert_prediction_npy_json(predictor, test_loader, content_type, accept) -def test_serve_csv(test_loader, use_gpu, docker_image, sagemaker_local_session, instance_type): - with _predictor(model_cpu_1d_dir, mnist_1d_script, docker_image, sagemaker_local_session, +def test_serve_csv(test_loader, use_gpu, image_uri, sagemaker_local_session, instance_type): + with _predictor(model_cpu_1d_dir, mnist_1d_script, image_uri, sagemaker_local_session, instance_type) as predictor: for accept in (content_types.JSON, content_types.CSV, content_types.NPY): _assert_prediction_csv(predictor, test_loader, accept) @pytest.mark.skip_cpu -def test_serve_cpu_model_on_gpu(test_loader, docker_image, sagemaker_local_session, instance_type): - with _predictor(model_cpu_1d_dir, mnist_1d_script, docker_image, sagemaker_local_session, +def test_serve_cpu_model_on_gpu(test_loader, image_uri, sagemaker_local_session, instance_type): + with _predictor(model_cpu_1d_dir, mnist_1d_script, image_uri, sagemaker_local_session, instance_type) as predictor: _assert_prediction_npy_json(predictor, test_loader, content_types.NPY, content_types.JSON) -def test_serving_calls_model_fn_once(docker_image, sagemaker_local_session, instance_type): - with _predictor(model_cpu_dir, call_model_fn_once_script, docker_image, sagemaker_local_session, +def test_serving_calls_model_fn_once(image_uri, sagemaker_local_session, instance_type): + with _predictor(model_cpu_dir, call_model_fn_once_script, image_uri, sagemaker_local_session, instance_type, model_server_workers=2) as predictor: predictor.accept = None predictor.deserializer = BytesDeserializer() diff --git a/test-toolkit/integration/sagemaker/test_mnist.py b/test-toolkit/integration/sagemaker/test_mnist.py index 4bcc0638..9efb28c8 100644 --- a/test-toolkit/integration/sagemaker/test_mnist.py +++ b/test-toolkit/integration/sagemaker/test_mnist.py @@ -19,25 +19,36 @@ import sagemaker from sagemaker.pytorch import PyTorchModel -from test.integration import mnist_script, model_cpu_dir -from test.integration.sagemaker.timeout import timeout_and_delete_endpoint +from integration import model_cpu_dir, mnist_cpu_script, mnist_gpu_script, model_eia_dir, mnist_eia_script +from integration.sagemaker.timeout import timeout_and_delete_endpoint -@pytest.mark.skip_gpu -def test_mnist_distributed_cpu(sagemaker_session, ecr_image, instance_type): +@pytest.mark.cpu_test +def test_mnist_cpu(sagemaker_session, image_uri, instance_type): instance_type = instance_type or 'ml.c4.xlarge' - _test_mnist_distributed(sagemaker_session, ecr_image, instance_type) + model_dir = os.path.join(model_cpu_dir, 'model_mnist.tar.gz') + _test_mnist_distributed(sagemaker_session, image_uri, instance_type, model_dir, mnist_cpu_script) -@pytest.mark.skip_cpu -def test_mnist_distributed_gpu(sagemaker_session, ecr_image, instance_type): +@pytest.mark.gpu_test +def test_mnist_gpu(sagemaker_session, image_uri, instance_type): instance_type = instance_type or 'ml.p2.xlarge' - _test_mnist_distributed(sagemaker_session, ecr_image, instance_type) + model_dir = os.path.join(model_cpu_dir, 'model_mnist.tar.gz') + _test_mnist_distributed(sagemaker_session, image_uri, instance_type, model_dir, mnist_gpu_script) -def _test_mnist_distributed(sagemaker_session, ecr_image, instance_type): - model_dir = os.path.join(model_cpu_dir, 'model_mnist.tar.gz') +@pytest.mark.eia_test +def test_mnist_eia(sagemaker_session, image_uri, instance_type, accelerator_type): + instance_type = instance_type or 'ml.c4.xlarge' + # Scripted model is serialized with torch.jit.save(). + # Inference test for EIA doesn't need to instantiate model definition then load state_dict + model_dir = os.path.join(model_eia_dir, 'model_mnist.tar.gz') + _test_mnist_distributed(sagemaker_session, image_uri, instance_type, model_dir, mnist_eia_script, + accelerator_type=accelerator_type) + +def _test_mnist_distributed(sagemaker_session, image_uri, instance_type, model_dir, mnist_script, + accelerator_type=None): endpoint_name = sagemaker.utils.unique_name_from_base("sagemaker-pytorch-serving") model_data = sagemaker_session.upload_data( @@ -45,15 +56,17 @@ def _test_mnist_distributed(sagemaker_session, ecr_image, instance_type): key_prefix="sagemaker-pytorch-serving/models", ) - pytorch = PyTorchModel(model_data, - 'SageMakerRole', - mnist_script, - image=ecr_image, - sagemaker_session=sagemaker_session) + pytorch = PyTorchModel(model_data=model_data, role='SageMakerRole', entry_point=mnist_script, + image=image_uri, sagemaker_session=sagemaker_session) with timeout_and_delete_endpoint(endpoint_name, sagemaker_session, minutes=30): - predictor = pytorch.deploy(initial_instance_count=1, instance_type=instance_type, - endpoint_name=endpoint_name) + # Use accelerator type to differentiate EI vs. CPU and GPU. Don't use processor value + if accelerator_type is not None: + predictor = pytorch.deploy(initial_instance_count=1, instance_type=instance_type, + accelerator_type=accelerator_type, endpoint_name=endpoint_name) + else: + predictor = pytorch.deploy(initial_instance_count=1, instance_type=instance_type, + endpoint_name=endpoint_name) batch_size = 100 data = np.random.rand(batch_size, 1, 28, 28).astype(np.float32) diff --git a/test-toolkit/resources/local_mode_lock b/test-toolkit/resources/local_mode_lock new file mode 100644 index 00000000..e69de29b diff --git a/test-toolkit/resources/mnist/mnist.py b/test-toolkit/resources/mnist/mnist.py deleted file mode 100644 index 5d50b30a..00000000 --- a/test-toolkit/resources/mnist/mnist.py +++ /dev/null @@ -1,227 +0,0 @@ -# Copyright 2019-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"). You -# may not use this file except in compliance with the License. A copy of -# the License is located at -# -# http://aws.amazon.com/apache2.0/ -# -# or in the "license" file accompanying this file. This file is -# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF -# ANY KIND, either express or implied. See the License for the specific -# language governing permissions and limitations under the License. -from __future__ import absolute_import -import argparse -import logging -import os -import sys - -import cv2 as cv -import sagemaker_containers -import torch -import torch.distributed as dist -import torch.nn as nn -import torch.nn.functional as F -import torch.optim as optim -import torch.utils.data -import torch.utils.data.distributed -from torchvision import datasets, transforms - -logger = logging.getLogger(__name__) -logger.setLevel(logging.DEBUG) -logger.addHandler(logging.StreamHandler(sys.stdout)) - - -# Based on https://github.com/pytorch/examples/blob/master/mnist/main.py -class Net(nn.Module): - def __init__(self): - logger.info("Create neural network module") - - super(Net, self).__init__() - self.conv1 = nn.Conv2d(1, 10, kernel_size=5) - self.conv2 = nn.Conv2d(10, 20, kernel_size=5) - self.conv2_drop = nn.Dropout2d() - self.fc1 = nn.Linear(320, 50) - self.fc2 = nn.Linear(50, 10) - - def forward(self, x): - x = F.relu(F.max_pool2d(self.conv1(x), 2)) - x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2)) - x = x.view(-1, 320) - x = F.relu(self.fc1(x)) - x = F.dropout(x, training=self.training) - x = self.fc2(x) - return F.log_softmax(x, dim=1) - - -def _get_train_data_loader(batch_size, training_dir, is_distributed, **kwargs): - logger.info("Get train data loader") - dataset = datasets.MNIST(training_dir, train=True, transform=transforms.Compose([ - transforms.ToTensor(), - transforms.Normalize((0.1307,), (0.3081,)) - ])) - train_sampler = torch.utils.data.distributed.DistributedSampler(dataset) if is_distributed else None - return torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=train_sampler is None, - sampler=train_sampler, **kwargs) - - -def _get_test_data_loader(test_batch_size, training_dir, **kwargs): - logger.info("Get test data loader") - return torch.utils.data.DataLoader( - datasets.MNIST(training_dir, train=False, transform=transforms.Compose([ - transforms.ToTensor(), - transforms.Normalize((0.1307,), (0.3081,)) - ])), - batch_size=test_batch_size, shuffle=True, **kwargs) - - -def _average_gradients(model): - # Gradient averaging. - size = float(dist.get_world_size()) - for param in model.parameters(): - dist.all_reduce(param.grad.data, op=dist.reduce_op.SUM) - param.grad.data /= size - - -def train(args): - is_distributed = len(args.hosts) > 1 and args.backend is not None - logger.debug("Distributed training - {}".format(is_distributed)) - use_cuda = (args.processor == 'gpu') or (args.num_gpus > 0) - logger.debug("Number of gpus available - {}".format(args.num_gpus)) - kwargs = {'num_workers': 1, 'pin_memory': True} if use_cuda else {} - device = torch.device("cuda" if use_cuda else "cpu") - - if is_distributed: - # Initialize the distributed environment. - world_size = len(args.hosts) - os.environ['WORLD_SIZE'] = str(world_size) - host_rank = args.hosts.index(args.current_host) - os.environ['RANK'] = str(host_rank) - dist.init_process_group(backend=args.backend, rank=host_rank, world_size=world_size) - logger.info('Initialized the distributed environment: \'{}\' backend on {} nodes. '.format( - args.backend, dist.get_world_size()) + 'Current host rank is {}. Number of gpus: {}'.format( - dist.get_rank(), args.num_gpus)) - - # set the seed for generating random numbers - torch.manual_seed(args.seed) - if use_cuda: - torch.cuda.manual_seed(args.seed) - - train_loader = _get_train_data_loader(args.batch_size, args.data_dir, is_distributed, **kwargs) - test_loader = _get_test_data_loader(args.test_batch_size, args.data_dir, **kwargs) - - # TODO: assert the logs when we move to the SDK local mode - logger.debug("Processes {}/{} ({:.0f}%) of train data".format( - len(train_loader.sampler), len(train_loader.dataset), - 100. * len(train_loader.sampler) / len(train_loader.dataset) - )) - - logger.debug("Processes {}/{} ({:.0f}%) of test data".format( - len(test_loader.sampler), len(test_loader.dataset), - 100. * len(test_loader.sampler) / len(test_loader.dataset) - )) - - model = Net().to(device) - if is_distributed and use_cuda: - # multi-machine multi-gpu case - logger.debug("Multi-machine multi-gpu: using DistributedDataParallel.") - model = torch.nn.parallel.DistributedDataParallel(model) - elif use_cuda: - # single-machine multi-gpu case - logger.debug("Single-machine multi-gpu: using DataParallel().cuda().") - model = torch.nn.DataParallel(model).to(device) - else: - # single-machine or multi-machine cpu case - logger.debug("Single-machine/multi-machine cpu: using DataParallel.") - model = torch.nn.DataParallel(model) - - optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum) - - for epoch in range(1, args.epochs + 1): - model.train() - for batch_idx, (data, target) in enumerate(train_loader, 1): - data, target = data.to(device), target.to(device) - optimizer.zero_grad() - output = model(data) - loss = F.nll_loss(output, target) - loss.backward() - if is_distributed and not use_cuda: - # average gradients manually for multi-machine cpu case only - _average_gradients(model) - optimizer.step() - if batch_idx % args.log_interval == 0: - logger.debug('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format( - epoch, batch_idx * len(data), len(train_loader.sampler), - 100. * batch_idx / len(train_loader), loss.item())) - test(model, test_loader, device) - save_model(model, args.model_dir) - - -def test(model, test_loader, device): - model.eval() - test_loss = 0 - correct = 0 - with torch.no_grad(): - for data, target in test_loader: - data, target = data.to(device), target.to(device) - output = model(data) - test_loss += F.nll_loss(output, target, size_average=None).item() # sum up batch loss - pred = output.max(1, keepdim=True)[1] # get the index of the max log-probability - correct += pred.eq(target.view_as(pred)).sum().item() - - test_loss /= len(test_loader.dataset) - logger.debug('Test set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format( - test_loss, correct, len(test_loader.dataset), - 100. * correct / len(test_loader.dataset))) - - -def model_fn(model_dir): - logger.info('model_fn') - model = torch.nn.DataParallel(Net()) - with open(os.path.join(model_dir, 'model.pth'), 'rb') as f: - model.load_state_dict(torch.load(f)) - return model - - -def save_model(model, model_dir): - logger.info("Saving the model.") - path = os.path.join(model_dir, 'model.pth') - # recommended way from http://pytorch.org/docs/master/notes/serialization.html - torch.save(model.state_dict(), path) - - -if __name__ == '__main__': - # test opencv - print(cv.__version__) - - parser = argparse.ArgumentParser() - - # Data and model checkpoints directories - parser.add_argument('--batch-size', type=int, default=64, metavar='N', - help='input batch size for training (default: 64)') - parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N', - help='input batch size for testing (default: 1000)') - parser.add_argument('--epochs', type=int, default=1, metavar='N', - help='number of epochs to train (default: 10)') - parser.add_argument('--lr', type=float, default=0.01, metavar='LR', - help='learning rate (default: 0.01)') - parser.add_argument('--momentum', type=float, default=0.5, metavar='M', - help='SGD momentum (default: 0.5)') - parser.add_argument('--seed', type=int, default=1, metavar='S', - help='random seed (default: 1)') - parser.add_argument('--log-interval', type=int, default=100, metavar='N', - help='how many batches to wait before logging training status') - parser.add_argument('--backend', type=str, default=None, - help='backend for distributed training') - parser.add_argument('--processor', type=str, default='cpu', - help='backend for distributed training') - - # Container environment - env = sagemaker_containers.training_env() - parser.add_argument('--hosts', type=list, default=env.hosts) - parser.add_argument('--current-host', type=str, default=env.current_host) - parser.add_argument('--model-dir', type=str, default=env.model_dir) - parser.add_argument('--data-dir', type=str, default=env.channel_input_dirs['training']) - parser.add_argument('--num-gpus', type=int, default=env.num_gpus) - - train(parser.parse_args()) diff --git a/test-toolkit/resources/mnist/model_cpu/1d/model.pth b/test-toolkit/resources/mnist/model_cpu/1d/model.pth new file mode 100644 index 00000000..126a0093 Binary files /dev/null and b/test-toolkit/resources/mnist/model_cpu/1d/model.pth differ diff --git a/test-toolkit/resources/mnist/model_cpu/mnist.py b/test-toolkit/resources/mnist/model_cpu/mnist.py index 5d50b30a..e0a1dfbe 100644 --- a/test-toolkit/resources/mnist/model_cpu/mnist.py +++ b/test-toolkit/resources/mnist/model_cpu/mnist.py @@ -11,21 +11,16 @@ # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. from __future__ import absolute_import -import argparse + import logging import os import sys -import cv2 as cv -import sagemaker_containers import torch -import torch.distributed as dist import torch.nn as nn import torch.nn.functional as F -import torch.optim as optim import torch.utils.data import torch.utils.data.distributed -from torchvision import datasets, transforms logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) @@ -54,174 +49,9 @@ def forward(self, x): return F.log_softmax(x, dim=1) -def _get_train_data_loader(batch_size, training_dir, is_distributed, **kwargs): - logger.info("Get train data loader") - dataset = datasets.MNIST(training_dir, train=True, transform=transforms.Compose([ - transforms.ToTensor(), - transforms.Normalize((0.1307,), (0.3081,)) - ])) - train_sampler = torch.utils.data.distributed.DistributedSampler(dataset) if is_distributed else None - return torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=train_sampler is None, - sampler=train_sampler, **kwargs) - - -def _get_test_data_loader(test_batch_size, training_dir, **kwargs): - logger.info("Get test data loader") - return torch.utils.data.DataLoader( - datasets.MNIST(training_dir, train=False, transform=transforms.Compose([ - transforms.ToTensor(), - transforms.Normalize((0.1307,), (0.3081,)) - ])), - batch_size=test_batch_size, shuffle=True, **kwargs) - - -def _average_gradients(model): - # Gradient averaging. - size = float(dist.get_world_size()) - for param in model.parameters(): - dist.all_reduce(param.grad.data, op=dist.reduce_op.SUM) - param.grad.data /= size - - -def train(args): - is_distributed = len(args.hosts) > 1 and args.backend is not None - logger.debug("Distributed training - {}".format(is_distributed)) - use_cuda = (args.processor == 'gpu') or (args.num_gpus > 0) - logger.debug("Number of gpus available - {}".format(args.num_gpus)) - kwargs = {'num_workers': 1, 'pin_memory': True} if use_cuda else {} - device = torch.device("cuda" if use_cuda else "cpu") - - if is_distributed: - # Initialize the distributed environment. - world_size = len(args.hosts) - os.environ['WORLD_SIZE'] = str(world_size) - host_rank = args.hosts.index(args.current_host) - os.environ['RANK'] = str(host_rank) - dist.init_process_group(backend=args.backend, rank=host_rank, world_size=world_size) - logger.info('Initialized the distributed environment: \'{}\' backend on {} nodes. '.format( - args.backend, dist.get_world_size()) + 'Current host rank is {}. Number of gpus: {}'.format( - dist.get_rank(), args.num_gpus)) - - # set the seed for generating random numbers - torch.manual_seed(args.seed) - if use_cuda: - torch.cuda.manual_seed(args.seed) - - train_loader = _get_train_data_loader(args.batch_size, args.data_dir, is_distributed, **kwargs) - test_loader = _get_test_data_loader(args.test_batch_size, args.data_dir, **kwargs) - - # TODO: assert the logs when we move to the SDK local mode - logger.debug("Processes {}/{} ({:.0f}%) of train data".format( - len(train_loader.sampler), len(train_loader.dataset), - 100. * len(train_loader.sampler) / len(train_loader.dataset) - )) - - logger.debug("Processes {}/{} ({:.0f}%) of test data".format( - len(test_loader.sampler), len(test_loader.dataset), - 100. * len(test_loader.sampler) / len(test_loader.dataset) - )) - - model = Net().to(device) - if is_distributed and use_cuda: - # multi-machine multi-gpu case - logger.debug("Multi-machine multi-gpu: using DistributedDataParallel.") - model = torch.nn.parallel.DistributedDataParallel(model) - elif use_cuda: - # single-machine multi-gpu case - logger.debug("Single-machine multi-gpu: using DataParallel().cuda().") - model = torch.nn.DataParallel(model).to(device) - else: - # single-machine or multi-machine cpu case - logger.debug("Single-machine/multi-machine cpu: using DataParallel.") - model = torch.nn.DataParallel(model) - - optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum) - - for epoch in range(1, args.epochs + 1): - model.train() - for batch_idx, (data, target) in enumerate(train_loader, 1): - data, target = data.to(device), target.to(device) - optimizer.zero_grad() - output = model(data) - loss = F.nll_loss(output, target) - loss.backward() - if is_distributed and not use_cuda: - # average gradients manually for multi-machine cpu case only - _average_gradients(model) - optimizer.step() - if batch_idx % args.log_interval == 0: - logger.debug('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format( - epoch, batch_idx * len(data), len(train_loader.sampler), - 100. * batch_idx / len(train_loader), loss.item())) - test(model, test_loader, device) - save_model(model, args.model_dir) - - -def test(model, test_loader, device): - model.eval() - test_loss = 0 - correct = 0 - with torch.no_grad(): - for data, target in test_loader: - data, target = data.to(device), target.to(device) - output = model(data) - test_loss += F.nll_loss(output, target, size_average=None).item() # sum up batch loss - pred = output.max(1, keepdim=True)[1] # get the index of the max log-probability - correct += pred.eq(target.view_as(pred)).sum().item() - - test_loss /= len(test_loader.dataset) - logger.debug('Test set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format( - test_loss, correct, len(test_loader.dataset), - 100. * correct / len(test_loader.dataset))) - - def model_fn(model_dir): logger.info('model_fn') model = torch.nn.DataParallel(Net()) with open(os.path.join(model_dir, 'model.pth'), 'rb') as f: model.load_state_dict(torch.load(f)) return model - - -def save_model(model, model_dir): - logger.info("Saving the model.") - path = os.path.join(model_dir, 'model.pth') - # recommended way from http://pytorch.org/docs/master/notes/serialization.html - torch.save(model.state_dict(), path) - - -if __name__ == '__main__': - # test opencv - print(cv.__version__) - - parser = argparse.ArgumentParser() - - # Data and model checkpoints directories - parser.add_argument('--batch-size', type=int, default=64, metavar='N', - help='input batch size for training (default: 64)') - parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N', - help='input batch size for testing (default: 1000)') - parser.add_argument('--epochs', type=int, default=1, metavar='N', - help='number of epochs to train (default: 10)') - parser.add_argument('--lr', type=float, default=0.01, metavar='LR', - help='learning rate (default: 0.01)') - parser.add_argument('--momentum', type=float, default=0.5, metavar='M', - help='SGD momentum (default: 0.5)') - parser.add_argument('--seed', type=int, default=1, metavar='S', - help='random seed (default: 1)') - parser.add_argument('--log-interval', type=int, default=100, metavar='N', - help='how many batches to wait before logging training status') - parser.add_argument('--backend', type=str, default=None, - help='backend for distributed training') - parser.add_argument('--processor', type=str, default='cpu', - help='backend for distributed training') - - # Container environment - env = sagemaker_containers.training_env() - parser.add_argument('--hosts', type=list, default=env.hosts) - parser.add_argument('--current-host', type=str, default=env.current_host) - parser.add_argument('--model-dir', type=str, default=env.model_dir) - parser.add_argument('--data-dir', type=str, default=env.channel_input_dirs['training']) - parser.add_argument('--num-gpus', type=int, default=env.num_gpus) - - train(parser.parse_args()) diff --git a/test-toolkit/resources/mnist/model_cpu/model.pth b/test-toolkit/resources/mnist/model_cpu/model.pth new file mode 100644 index 00000000..393ea140 Binary files /dev/null and b/test-toolkit/resources/mnist/model_cpu/model.pth differ diff --git a/test-toolkit/resources/call_model_fn_once.py b/test-toolkit/resources/mnist/model_eia/mnist.py similarity index 53% rename from test-toolkit/resources/call_model_fn_once.py rename to test-toolkit/resources/mnist/model_eia/mnist.py index 1bbd3e27..d151a3f3 100644 --- a/test-toolkit/resources/call_model_fn_once.py +++ b/test-toolkit/resources/mnist/model_eia/mnist.py @@ -10,28 +10,4 @@ # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. -from __future__ import absolute_import - -import os - - -def model_fn(model_dir): - lock_file = os.path.join(model_dir, 'model_fn.lock.{}'.format(os.getpid())) - if os.path.exists(lock_file): - raise RuntimeError('model_fn called more than once (lock: {})'.format(lock_file)) - - open(lock_file, 'a').close() - - return 'model' - - -def input_fn(data, content_type): - return data - - -def predict_fn(data, model): - return b'output' - - -def output_fn(prediction, accept): - return prediction +# This file is intentionally left blank to utilize default_model_fn and default_predict_fn diff --git a/test-toolkit/resources/mnist/model_eia/model.tar.gz b/test-toolkit/resources/mnist/model_eia/model.tar.gz new file mode 100644 index 00000000..a992cf5e Binary files /dev/null and b/test-toolkit/resources/mnist/model_eia/model.tar.gz differ diff --git a/test-toolkit/resources/mnist/model_eia/model_mnist.tar.gz b/test-toolkit/resources/mnist/model_eia/model_mnist.tar.gz new file mode 100644 index 00000000..6afd83a4 Binary files /dev/null and b/test-toolkit/resources/mnist/model_eia/model_mnist.tar.gz differ diff --git a/test-toolkit/resources/mnist/model_gpu/mnist.py b/test-toolkit/resources/mnist/model_gpu/mnist.py index 5d50b30a..e0a1dfbe 100644 --- a/test-toolkit/resources/mnist/model_gpu/mnist.py +++ b/test-toolkit/resources/mnist/model_gpu/mnist.py @@ -11,21 +11,16 @@ # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. from __future__ import absolute_import -import argparse + import logging import os import sys -import cv2 as cv -import sagemaker_containers import torch -import torch.distributed as dist import torch.nn as nn import torch.nn.functional as F -import torch.optim as optim import torch.utils.data import torch.utils.data.distributed -from torchvision import datasets, transforms logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) @@ -54,174 +49,9 @@ def forward(self, x): return F.log_softmax(x, dim=1) -def _get_train_data_loader(batch_size, training_dir, is_distributed, **kwargs): - logger.info("Get train data loader") - dataset = datasets.MNIST(training_dir, train=True, transform=transforms.Compose([ - transforms.ToTensor(), - transforms.Normalize((0.1307,), (0.3081,)) - ])) - train_sampler = torch.utils.data.distributed.DistributedSampler(dataset) if is_distributed else None - return torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=train_sampler is None, - sampler=train_sampler, **kwargs) - - -def _get_test_data_loader(test_batch_size, training_dir, **kwargs): - logger.info("Get test data loader") - return torch.utils.data.DataLoader( - datasets.MNIST(training_dir, train=False, transform=transforms.Compose([ - transforms.ToTensor(), - transforms.Normalize((0.1307,), (0.3081,)) - ])), - batch_size=test_batch_size, shuffle=True, **kwargs) - - -def _average_gradients(model): - # Gradient averaging. - size = float(dist.get_world_size()) - for param in model.parameters(): - dist.all_reduce(param.grad.data, op=dist.reduce_op.SUM) - param.grad.data /= size - - -def train(args): - is_distributed = len(args.hosts) > 1 and args.backend is not None - logger.debug("Distributed training - {}".format(is_distributed)) - use_cuda = (args.processor == 'gpu') or (args.num_gpus > 0) - logger.debug("Number of gpus available - {}".format(args.num_gpus)) - kwargs = {'num_workers': 1, 'pin_memory': True} if use_cuda else {} - device = torch.device("cuda" if use_cuda else "cpu") - - if is_distributed: - # Initialize the distributed environment. - world_size = len(args.hosts) - os.environ['WORLD_SIZE'] = str(world_size) - host_rank = args.hosts.index(args.current_host) - os.environ['RANK'] = str(host_rank) - dist.init_process_group(backend=args.backend, rank=host_rank, world_size=world_size) - logger.info('Initialized the distributed environment: \'{}\' backend on {} nodes. '.format( - args.backend, dist.get_world_size()) + 'Current host rank is {}. Number of gpus: {}'.format( - dist.get_rank(), args.num_gpus)) - - # set the seed for generating random numbers - torch.manual_seed(args.seed) - if use_cuda: - torch.cuda.manual_seed(args.seed) - - train_loader = _get_train_data_loader(args.batch_size, args.data_dir, is_distributed, **kwargs) - test_loader = _get_test_data_loader(args.test_batch_size, args.data_dir, **kwargs) - - # TODO: assert the logs when we move to the SDK local mode - logger.debug("Processes {}/{} ({:.0f}%) of train data".format( - len(train_loader.sampler), len(train_loader.dataset), - 100. * len(train_loader.sampler) / len(train_loader.dataset) - )) - - logger.debug("Processes {}/{} ({:.0f}%) of test data".format( - len(test_loader.sampler), len(test_loader.dataset), - 100. * len(test_loader.sampler) / len(test_loader.dataset) - )) - - model = Net().to(device) - if is_distributed and use_cuda: - # multi-machine multi-gpu case - logger.debug("Multi-machine multi-gpu: using DistributedDataParallel.") - model = torch.nn.parallel.DistributedDataParallel(model) - elif use_cuda: - # single-machine multi-gpu case - logger.debug("Single-machine multi-gpu: using DataParallel().cuda().") - model = torch.nn.DataParallel(model).to(device) - else: - # single-machine or multi-machine cpu case - logger.debug("Single-machine/multi-machine cpu: using DataParallel.") - model = torch.nn.DataParallel(model) - - optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum) - - for epoch in range(1, args.epochs + 1): - model.train() - for batch_idx, (data, target) in enumerate(train_loader, 1): - data, target = data.to(device), target.to(device) - optimizer.zero_grad() - output = model(data) - loss = F.nll_loss(output, target) - loss.backward() - if is_distributed and not use_cuda: - # average gradients manually for multi-machine cpu case only - _average_gradients(model) - optimizer.step() - if batch_idx % args.log_interval == 0: - logger.debug('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format( - epoch, batch_idx * len(data), len(train_loader.sampler), - 100. * batch_idx / len(train_loader), loss.item())) - test(model, test_loader, device) - save_model(model, args.model_dir) - - -def test(model, test_loader, device): - model.eval() - test_loss = 0 - correct = 0 - with torch.no_grad(): - for data, target in test_loader: - data, target = data.to(device), target.to(device) - output = model(data) - test_loss += F.nll_loss(output, target, size_average=None).item() # sum up batch loss - pred = output.max(1, keepdim=True)[1] # get the index of the max log-probability - correct += pred.eq(target.view_as(pred)).sum().item() - - test_loss /= len(test_loader.dataset) - logger.debug('Test set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format( - test_loss, correct, len(test_loader.dataset), - 100. * correct / len(test_loader.dataset))) - - def model_fn(model_dir): logger.info('model_fn') model = torch.nn.DataParallel(Net()) with open(os.path.join(model_dir, 'model.pth'), 'rb') as f: model.load_state_dict(torch.load(f)) return model - - -def save_model(model, model_dir): - logger.info("Saving the model.") - path = os.path.join(model_dir, 'model.pth') - # recommended way from http://pytorch.org/docs/master/notes/serialization.html - torch.save(model.state_dict(), path) - - -if __name__ == '__main__': - # test opencv - print(cv.__version__) - - parser = argparse.ArgumentParser() - - # Data and model checkpoints directories - parser.add_argument('--batch-size', type=int, default=64, metavar='N', - help='input batch size for training (default: 64)') - parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N', - help='input batch size for testing (default: 1000)') - parser.add_argument('--epochs', type=int, default=1, metavar='N', - help='number of epochs to train (default: 10)') - parser.add_argument('--lr', type=float, default=0.01, metavar='LR', - help='learning rate (default: 0.01)') - parser.add_argument('--momentum', type=float, default=0.5, metavar='M', - help='SGD momentum (default: 0.5)') - parser.add_argument('--seed', type=int, default=1, metavar='S', - help='random seed (default: 1)') - parser.add_argument('--log-interval', type=int, default=100, metavar='N', - help='how many batches to wait before logging training status') - parser.add_argument('--backend', type=str, default=None, - help='backend for distributed training') - parser.add_argument('--processor', type=str, default='cpu', - help='backend for distributed training') - - # Container environment - env = sagemaker_containers.training_env() - parser.add_argument('--hosts', type=list, default=env.hosts) - parser.add_argument('--current-host', type=str, default=env.current_host) - parser.add_argument('--model-dir', type=str, default=env.model_dir) - parser.add_argument('--data-dir', type=str, default=env.channel_input_dirs['training']) - parser.add_argument('--num-gpus', type=int, default=env.num_gpus) - - train(parser.parse_args()) diff --git a/test-toolkit/unit/test_default_inference_handler.py b/test-toolkit/unit/test_default_inference_handler.py index 818c6a40..e43e026e 100644 --- a/test-toolkit/unit/test_default_inference_handler.py +++ b/test-toolkit/unit/test_default_inference_handler.py @@ -15,6 +15,7 @@ import csv import json +import mock import numpy as np import pytest import torch @@ -40,7 +41,7 @@ def __call__(self, tensor): return 3 * tensor -@pytest.fixture(scope='session', name='tensor') +@pytest.fixture(scope="session", name="tensor") def fixture_tensor(): tensor = torch.rand(5, 10, 7, 9) return tensor.to(device) @@ -51,9 +52,14 @@ def inference_handler(): return default_inference_handler.DefaultPytorchInferenceHandler() +@pytest.fixture() +def eia_inference_handler(): + return default_inference_handler.DefaultPytorchInferenceHandler() + + def test_default_model_fn(inference_handler): with pytest.raises(NotImplementedError): - inference_handler.default_model_fn('model_dir') + inference_handler.default_model_fn("model_dir") def test_default_input_fn_json(inference_handler, tensor): @@ -67,7 +73,7 @@ def test_default_input_fn_json(inference_handler, tensor): def test_default_input_fn_csv(inference_handler): array = [[1, 2, 3], [4, 5, 6]] str_io = StringIO() - csv.writer(str_io, delimiter=',').writerows(array) + csv.writer(str_io, delimiter=",").writerows(array) deserialized_np_array = inference_handler.default_input_fn(str_io.getvalue(), content_types.CSV) @@ -78,7 +84,7 @@ def test_default_input_fn_csv(inference_handler): def test_default_input_fn_csv_bad_columns(inference_handler): str_io = StringIO() - csv_writer = csv.writer(str_io, delimiter=',') + csv_writer = csv.writer(str_io, delimiter=",") csv_writer.writerow([1, 2, 3]) csv_writer.writerow([1, 2, 3, 4]) @@ -97,7 +103,7 @@ def test_default_input_fn_npy(inference_handler, tensor): def test_default_input_fn_bad_content_type(inference_handler): with pytest.raises(errors.UnsupportedFormatError): - inference_handler.default_input_fn('', 'application/not_supported') + inference_handler.default_input_fn("", "application/not_supported") def test_default_predict_fn(inference_handler, tensor): @@ -162,7 +168,7 @@ def test_default_output_fn_csv_float(inference_handler): def test_default_output_fn_bad_accept(inference_handler): with pytest.raises(errors.UnsupportedFormatError): - inference_handler.default_output_fn('', 'application/not_supported') + inference_handler.default_output_fn("", "application/not_supported") @pytest.mark.skipif(not torch.cuda.is_available(), reason="cuda is not available") @@ -171,4 +177,34 @@ def test_default_output_fn_gpu(inference_handler): output = inference_handler.default_output_fn(tensor_gpu, content_types.CSV) - assert '1,2,3\n4,5,6\n'.encode("utf-8") == output + assert "1,2,3\n4,5,6\n".encode("utf-8") == output + + +def test_eia_default_model_fn(eia_inference_handler): + with mock.patch("sagemaker_pytorch_serving_container.default_inference_handler.os") as mock_os: + mock_os.getenv.return_value = "true" + mock_os.path.join.return_value = "model_dir" + mock_os.path.exists.return_value = True + with mock.patch("torch.jit.load") as mock_torch: + mock_torch.return_value = DummyModel() + model = eia_inference_handler.default_model_fn("model_dir") + assert model is not None + + +def test_eia_default_model_fn_error(eia_inference_handler): + with mock.patch("sagemaker_pytorch_serving_container.default_inference_handler.os") as mock_os: + mock_os.getenv.return_value = "true" + mock_os.path.join.return_value = "model_dir" + mock_os.path.exists.return_value = False + with pytest.raises(FileNotFoundError): + eia_inference_handler.default_model_fn("model_dir") + + +def test_eia_default_predict_fn(eia_inference_handler, tensor): + model = DummyModel() + with mock.patch("sagemaker_pytorch_serving_container.default_inference_handler.os") as mock_os: + mock_os.getenv.return_value = "true" + with mock.patch("torch.jit.optimized_execution") as mock_torch: + mock_torch.__enter__.return_value = "dummy" + eia_inference_handler.default_predict_fn(tensor, model) + mock_torch.assert_called_once() diff --git a/test-toolkit/utils/__init__.py b/test-toolkit/utils/__init__.py index 199e66b9..79cb9cdf 100644 --- a/test-toolkit/utils/__init__.py +++ b/test-toolkit/utils/__init__.py @@ -1,4 +1,4 @@ -# Copyright 2019-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# Copyright 2018-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"). You # may not use this file except in compliance with the License. A copy of diff --git a/test-toolkit/utils/image_utils.py b/test-toolkit/utils/image_utils.py index 0a6ecf69..2cb696ce 100644 --- a/test-toolkit/utils/image_utils.py +++ b/test-toolkit/utils/image_utils.py @@ -18,50 +18,40 @@ CYAN_COLOR = '\033[36m' END_COLOR = '\033[0m' +DLC_AWS_ID = '763104351884' -def build_base_image(framework_name, framework_version, py_version, - processor, base_image_tag, cwd='.'): - base_image_uri = get_base_image_uri(framework_name, base_image_tag) +def build_image(framework_version, dockerfile, image_uri, region, cwd='.'): + _check_call('python setup.py sdist') - dockerfile_location = os.path.join('docker', framework_version, 'base', - 'Dockerfile.{}'.format(processor)) + if 'dlc' in dockerfile: + ecr_login(region, DLC_AWS_ID) - subprocess.check_call(['docker', 'build', '-t', base_image_uri, - '-f', dockerfile_location, '--build-arg', - 'py_version={}'.format(py_version[-1]), cwd], cwd=cwd) - print('created image {}'.format(base_image_uri)) - return base_image_uri - - -def build_image(framework_name, framework_version, py_version, processor, tag, cwd='.'): - _check_call('python setup.py bdist_wheel') - - image_uri = get_image_uri(framework_name, tag) - - dockerfile_location = os.path.join('docker', framework_version, 'final', - 'Dockerfile.{}'.format(processor)) + dockerfile_location = os.path.join('test-toolkit', 'docker', framework_version, dockerfile) subprocess.check_call( ['docker', 'build', '-t', image_uri, '-f', dockerfile_location, '--build-arg', - 'py_version={}'.format(py_version[-1]), cwd], cwd=cwd) + 'region={}'.format(region), cwd], cwd=cwd) print('created image {}'.format(image_uri)) return image_uri -def get_base_image_uri(framework_name, base_image_tag): - return '{}-base:{}'.format(framework_name, base_image_tag) +def push_image(ecr_image, region, aws_id): + ecr_login(region, aws_id) + _check_call('docker push {}'.format(ecr_image)) -def get_image_uri(framework_name, tag): - return '{}:{}'.format(framework_name, tag) +def ecr_login(region, aws_id): + login = _check_call('aws ecr get-login --registry-ids {} '.format(aws_id) + + '--no-include-email --region {}'.format(region)) + _check_call(login.decode('utf-8').rstrip('\n')) def _check_call(cmd, *popenargs, **kwargs): if isinstance(cmd, str): cmd = cmd.split(" ") _print_cmd(cmd) - subprocess.check_call(cmd, *popenargs, **kwargs) + return subprocess.check_output(cmd, *popenargs, **kwargs) def _print_cmd(cmd): diff --git a/test-toolkit/utils/local_mode_utils.py b/test-toolkit/utils/local_mode_utils.py index 23e8945a..acd1197e 100644 --- a/test-toolkit/utils/local_mode_utils.py +++ b/test-toolkit/utils/local_mode_utils.py @@ -18,7 +18,7 @@ import tarfile import time -from test.integration import resources_path +from integration import resources_path LOCK_PATH = os.path.join(resources_path, 'local_mode_lock') diff --git a/tox.ini b/tox.ini index 570c07ab..fba5050a 100644 --- a/tox.ini +++ b/tox.ini @@ -51,7 +51,7 @@ passenv = AWS_DEFAULT_REGION commands = coverage run --source sagemaker_pytorch_serving_container -m pytest {posargs} - {env:IGNORE_COVERAGE:} coverage report --fail-under=90 --include *sagemaker_pytorch_serving_container* + {env:IGNORE_COVERAGE:} coverage report --fail-under=95 --include *sagemaker_pytorch_serving_container* deps = coverage pytest @@ -69,7 +69,7 @@ deps = future [testenv:flake8] -basepython = python +basepython = python3 deps = flake8 flake8-future-import