diff --git a/README.md b/README.md index dbe22668..18a2fc3e 100644 --- a/README.md +++ b/README.md @@ -620,24 +620,24 @@ To deploy a Multi-Model endpoint with TFS container, please start the container ### Multi-Model Interfaces We provide four different interfaces for user to interact with a Multi-Model Mode container: -+---------------------+---------------------------------+---------------------------------------------+ -| Functionality | Request | Response/Actions | -+---------------------+---------------------------------+---------------------------------------------+ -| List A Single Model | GET /models/{model_name} | Information about the specified model | -+---------------------+---------------------------------+---------------------------------------------+ -| List All Models | GET /models | List of Information about all loaded models | -+---------------------+---------------------------------+---------------------------------------------+ -| | POST /models | Load model with "model_name" from | -| | data = { | specified url | -| Load A Model | "model_name": , | | -| | "url": | | -| | } | | -+---------------------+---------------------------------+---------------------------------------------+ -| Make Invocations | POST /models/{model_name}/invoke| Return inference result from | -| | data = | the specified model | -+---------------------+---------------------------------+---------------------------------------------+ -| Unload A Model | DELETE /models/{model_name} | Unload the specified model | -+---------------------+---------------------------------+---------------------------------------------+ + +---------------------+---------------------------------+---------------------------------------------+ + | Functionality | Request | Response/Actions | + +---------------------+---------------------------------+---------------------------------------------+ + | List A Single Model | GET /models/{model_name} | Information about the specified model | + +---------------------+---------------------------------+---------------------------------------------+ + | List All Models | GET /models | List of Information about all loaded models | + +---------------------+---------------------------------+---------------------------------------------+ + | | POST /models | Load model with "model_name" from | + | | data = { | specified url | + | Load A Model | "model_name": , | | + | | "url": | | + | | } | | + +---------------------+---------------------------------+---------------------------------------------+ + | Make Invocations | POST /models/{model_name}/invoke| Return inference result from | + | | data = | the specified model | + +---------------------+---------------------------------+---------------------------------------------+ + | Unload A Model | DELETE /models/{model_name} | Unload the specified model | + +---------------------+---------------------------------+---------------------------------------------+ ### Maximum Number of Models Also please note the environment variable ``SAGEMAKER_SAFE_PORT_RANGE`` will limit the number of models that can be loaded to the endpoint at the same time. diff --git a/docker/build_artifacts/deep_learning_container.py b/docker/build_artifacts/deep_learning_container.py index b60ea9f2..1e82e61e 100644 --- a/docker/build_artifacts/deep_learning_container.py +++ b/docker/build_artifacts/deep_learning_container.py @@ -20,7 +20,7 @@ def _validate_instance_id(instance_id): """ Validate instance ID """ - instance_id_regex = r'^(i-\S{17})' + instance_id_regex = r"^(i-\S{17})" compiled_regex = re.compile(instance_id_regex) match = compiled_regex.match(instance_id) diff --git a/docker/build_artifacts/dockerd-entrypoint.py b/docker/build_artifacts/dockerd-entrypoint.py index 68e1e966..fc4ce388 100644 --- a/docker/build_artifacts/dockerd-entrypoint.py +++ b/docker/build_artifacts/dockerd-entrypoint.py @@ -17,6 +17,6 @@ import sys if not os.path.exists("/opt/ml/input/config"): - subprocess.call(['python', '/usr/local/bin/deep_learning_container.py', '&>/dev/null', '&']) + subprocess.call(["python", "/usr/local/bin/deep_learning_container.py", "&>/dev/null", "&"]) subprocess.check_call(shlex.split(' '.join(sys.argv[1:]))) diff --git a/docker/build_artifacts/sagemaker/multi_model_utils.py b/docker/build_artifacts/sagemaker/multi_model_utils.py index 6267a067..5d2c47f4 100644 --- a/docker/build_artifacts/sagemaker/multi_model_utils.py +++ b/docker/build_artifacts/sagemaker/multi_model_utils.py @@ -15,13 +15,13 @@ import time from contextlib import contextmanager -MODEL_CONFIG_FILE = '/sagemaker/model-config.cfg' -DEFAULT_LOCK_FILE = '/sagemaker/lock-file.lock' +MODEL_CONFIG_FILE = "/sagemaker/model-config.cfg" +DEFAULT_LOCK_FILE = "/sagemaker/lock-file.lock" @contextmanager def lock(path=DEFAULT_LOCK_FILE): - f = open(path, 'w') + f = open(path, "w") fd = f.fileno() fcntl.lockf(fd, fcntl.LOCK_EX) @@ -35,7 +35,7 @@ def lock(path=DEFAULT_LOCK_FILE): @contextmanager def timeout(seconds=60): def _raise_timeout_error(signum, frame): - raise Exception(408, 'Timed out after {} seconds'.format(seconds)) + raise Exception(408, "Timed out after {} seconds".format(seconds)) try: signal.signal(signal.SIGALRM, _raise_timeout_error) diff --git a/docker/build_artifacts/sagemaker/python_service.py b/docker/build_artifacts/sagemaker/python_service.py index cf586bd7..44b9498c 100644 --- a/docker/build_artifacts/sagemaker/python_service.py +++ b/docker/build_artifacts/sagemaker/python_service.py @@ -27,20 +27,20 @@ from multi_model_utils import lock, timeout, MultiModelException import tfs_utils -SAGEMAKER_MULTI_MODEL_ENABLED = os.environ.get('SAGEMAKER_MULTI_MODEL', 'false').lower() == 'true' -INFERENCE_SCRIPT_PATH = '/opt/ml/model/code/inference.py' +SAGEMAKER_MULTI_MODEL_ENABLED = os.environ.get("SAGEMAKER_MULTI_MODEL", "false").lower() == "true" +INFERENCE_SCRIPT_PATH = "/opt/ml/model/code/inference.py" -SAGEMAKER_BATCHING_ENABLED = os.environ.get('SAGEMAKER_TFS_ENABLE_BATCHING', 'false').lower() -MODEL_CONFIG_FILE_PATH = '/sagemaker/model-config.cfg' -TFS_GRPC_PORT = os.environ.get('TFS_GRPC_PORT') -TFS_REST_PORT = os.environ.get('TFS_REST_PORT') -SAGEMAKER_TFS_PORT_RANGE = os.environ.get('SAGEMAKER_SAFE_PORT_RANGE') +SAGEMAKER_BATCHING_ENABLED = os.environ.get("SAGEMAKER_TFS_ENABLE_BATCHING", "false").lower() +MODEL_CONFIG_FILE_PATH = "/sagemaker/model-config.cfg" +TFS_GRPC_PORT = os.environ.get("TFS_GRPC_PORT") +TFS_REST_PORT = os.environ.get("TFS_REST_PORT") +SAGEMAKER_TFS_PORT_RANGE = os.environ.get("SAGEMAKER_SAFE_PORT_RANGE") logging.basicConfig(level=logging.INFO) log = logging.getLogger(__name__) -CUSTOM_ATTRIBUTES_HEADER = 'X-Amzn-SageMaker-Custom-Attributes' +CUSTOM_ATTRIBUTES_HEADER = "X-Amzn-SageMaker-Custom-Attributes" def default_handler(data, context): @@ -78,15 +78,15 @@ def __init__(self): else: self._handlers = default_handler - self._tfs_enable_batching = SAGEMAKER_BATCHING_ENABLED == 'true' - self._tfs_default_model_name = os.environ.get('TFS_DEFAULT_MODEL_NAME', "None") + self._tfs_enable_batching = SAGEMAKER_BATCHING_ENABLED == "true" + self._tfs_default_model_name = os.environ.get("TFS_DEFAULT_MODEL_NAME", "None") def on_post(self, req, res, model_name=None): log.info(req.uri) if model_name or "invocations" in req.uri: self._handle_invocation_post(req, res, model_name) else: - data = json.loads(req.stream.read().decode('utf-8')) + data = json.loads(req.stream.read().decode("utf-8")) self._handle_load_model_post(res, data) def _parse_sagemaker_port_range(self, port_range): @@ -96,37 +96,37 @@ def _parse_sagemaker_port_range(self, port_range): rest_port = lower grpc_port = (lower + upper) // 2 tfs_ports = { - 'rest_port': [port for port in range(rest_port, grpc_port)], - 'grpc_port': [port for port in range(grpc_port, upper)], + "rest_port": [port for port in range(rest_port, grpc_port)], + "grpc_port": [port for port in range(grpc_port, upper)], } return tfs_ports def _ports_available(self): with lock(): - rest_ports = self._tfs_ports['rest_port'] - grpc_ports = self._tfs_ports['grpc_port'] + rest_ports = self._tfs_ports["rest_port"] + grpc_ports = self._tfs_ports["grpc_port"] return len(rest_ports) > 0 and len(grpc_ports) > 0 def _handle_load_model_post(self, res, data): # noqa: C901 - model_name = data['model_name'] - base_path = data['url'] + model_name = data["model_name"] + base_path = data["url"] # model is already loaded if model_name in self._model_tfs_pid: res.status = falcon.HTTP_409 res.body = json.dumps({ - 'error': 'Model {} is already loaded.'.format(model_name) + "error": "Model {} is already loaded.".format(model_name) }) # check if there are available ports if not self._ports_available(): res.status = falcon.HTTP_507 res.body = json.dumps({ - 'error': 'Memory exhausted: no available ports to load the model.' + "error": "Memory exhausted: no available ports to load the model." }) with lock(): - self._model_tfs_rest_port[model_name] = self._tfs_ports['rest_port'].pop() - self._model_tfs_grpc_port[model_name] = self._tfs_ports['grpc_port'].pop() + self._model_tfs_rest_port[model_name] = self._tfs_ports["rest_port"].pop() + self._model_tfs_grpc_port[model_name] = self._tfs_ports["grpc_port"].pop() # validate model files are in the specified base_path if self.validate_model_dir(base_path): @@ -135,13 +135,13 @@ def _handle_load_model_post(self, res, data): # noqa: C901 self._import_custom_modules(model_name) tfs_config = tfs_utils.create_tfs_config_individual_model(model_name, base_path) - tfs_config_file = '/sagemaker/tfs-config/{}/model-config.cfg'.format(model_name) - log.info('tensorflow serving model config: \n%s\n', tfs_config) + tfs_config_file = "/sagemaker/tfs-config/{}/model-config.cfg".format(model_name) + log.info("tensorflow serving model config: \n%s\n", tfs_config) os.makedirs(os.path.dirname(tfs_config_file)) - with open(tfs_config_file, 'w') as f: + with open(tfs_config_file, "w") as f: f.write(tfs_config) - batching_config_file = '/sagemaker/batching/{}/batching-config.cfg'.format( + batching_config_file = "/sagemaker/batching/{}/batching-config.cfg".format( model_name) if self._tfs_enable_batching: tfs_utils.create_batching_config(batching_config_file) @@ -156,16 +156,16 @@ def _handle_load_model_post(self, res, data): # noqa: C901 p = subprocess.Popen(cmd.split()) self._wait_for_model(model_name) - log.info('started tensorflow serving (pid: %d)', p.pid) + log.info("started tensorflow serving (pid: %d)", p.pid) # update model name <-> tfs pid map self._model_tfs_pid[model_name] = p res.status = falcon.HTTP_200 res.body = json.dumps({ - 'success': - 'Successfully loaded model {}, ' - 'listening on rest port {} ' - 'and grpc port {}.'.format(model_name, + "success": + "Successfully loaded model {}, " + "listening on rest port {} " + "and grpc port {}.".format(model_name, self._model_tfs_rest_port, self._model_tfs_grpc_port,) }) @@ -183,22 +183,22 @@ def _handle_load_model_post(self, res, data): # noqa: C901 except FileExistsError as e: res.status = falcon.HTTP_409 res.body = json.dumps({ - 'error': 'Model {} is already loaded. {}'.format(model_name, str(e)) + "error": "Model {} is already loaded. {}".format(model_name, str(e)) }) except OSError as os_error: self._cleanup_config_file(tfs_config_file) self._cleanup_config_file(batching_config_file) if os_error.errno == 12: raise MultiModelException(falcon.HTTP_507, - 'Memory exhausted: ' - 'not enough memory to start TFS instance') + "Memory exhausted: " + "not enough memory to start TFS instance") else: raise MultiModelException(falcon.HTTP_500, os_error.strerror) else: res.status = falcon.HTTP_404 res.body = json.dumps({ - 'error': - 'Could not find valid base path {} for servable {}'.format(base_path, + "error": + "Could not find valid base path {} for servable {}".format(base_path, model_name) }) @@ -244,7 +244,7 @@ def _wait_for_model(self, model_name): session.mount('http://', requests.adapters.HTTPAdapter(max_retries=retries)) response = session.get(url) if response.status_code == 200: - versions = json.loads(response.content)['model_version_status'] + versions = json.loads(response.content)["model_version_status"] if all(version["state"] == "AVAILABLE" for version in versions): break except ConnectionError: @@ -256,7 +256,7 @@ def _handle_invocation_post(self, req, res, model_name=None): if model_name not in self._model_tfs_rest_port: res.status = falcon.HTTP_404 res.body = json.dumps({ - 'error': "Model {} is not loaded yet.".format(model_name) + "error": "Model {} is not loaded yet.".format(model_name) }) return else: @@ -271,7 +271,7 @@ def _handle_invocation_post(self, req, res, model_name=None): else: res.status = falcon.HTTP_400 res.body = json.dumps({ - 'error': 'Invocation request does not contain model name.' + "error": "Invocation request does not contain model name." }) else: data, context = tfs_utils.parse_request(req, self._tfs_rest_port, self._tfs_grpc_port, @@ -286,28 +286,28 @@ def _handle_invocation_post(self, req, res, model_name=None): else: res.body, res.content_type = self._handlers(data, context) except Exception as e: # pylint: disable=broad-except - log.exception('exception handling request: {}'.format(e)) + log.exception("exception handling request: {}".format(e)) res.status = falcon.HTTP_500 res.body = json.dumps({ - 'error': str(e) - }).encode('utf-8') # pylint: disable=E1101 + "error": str(e) + }).encode("utf-8") # pylint: disable=E1101 def _import_handlers(self, model_name=None): inference_script = INFERENCE_SCRIPT_PATH if model_name: inference_script = "/opt/ml/models/{}/model/code/inference.py".format(model_name) - spec = importlib.util.spec_from_file_location('inference', inference_script) + spec = importlib.util.spec_from_file_location("inference", inference_script) inference = importlib.util.module_from_spec(spec) spec.loader.exec_module(inference) _custom_handler, _custom_input_handler, _custom_output_handler = None, None, None - if hasattr(inference, 'handler'): + if hasattr(inference, "handler"): _custom_handler = inference.handler - elif hasattr(inference, 'input_handler') and hasattr(inference, 'output_handler'): + elif hasattr(inference, "input_handler") and hasattr(inference, "output_handler"): _custom_input_handler = inference.input_handler _custom_output_handler = inference.output_handler else: - raise NotImplementedError('Handlers are not implemented correctly in user script.') + raise NotImplementedError("Handlers are not implemented correctly in user script.") return _custom_handler, _custom_input_handler, _custom_output_handler @@ -325,69 +325,69 @@ def handler(data, context): def on_get(self, req, res, model_name=None): # pylint: disable=W0613 if model_name is None: models_info = {} - uri = 'http://localhost:{}/v1/models/{}' + uri = "http://localhost:{}/v1/models/{}" for model, port in self._model_tfs_rest_port.items(): try: info = json.loads(requests.get(uri.format(port, model)).content) models_info[model] = info except ValueError as e: - log.exception('exception handling request: {}'.format(e)) + log.exception("exception handling request: {}".format(e)) res.status = falcon.HTTP_500 res.body = json.dumps({ - 'error': str(e) - }).encode('utf-8') + "error": str(e) + }).encode("utf-8") res.status = falcon.HTTP_200 res.body = json.dumps(models_info) else: if model_name not in self._model_tfs_rest_port: res.status = falcon.HTTP_404 res.body = json.dumps({ - 'error': 'Model {} is loaded yet.'.format(model_name) - }).encode('utf-8') + "error": "Model {} is loaded yet.".format(model_name) + }).encode("utf-8") else: port = self._model_tfs_rest_port[model_name] - uri = 'http://localhost:{}/v1/models/{}'.format(port, model_name) + uri = "http://localhost:{}/v1/models/{}".format(port, model_name) try: info = requests.get(uri) res.status = falcon.HTTP_200 res.body = json.dumps({ - 'model': info - }).encode('utf-8') + "model": info + }).encode("utf-8") except ValueError as e: - log.exception('exception handling GET models request.') + log.exception("exception handling GET models request.") res.status = falcon.HTTP_500 res.body = json.dumps({ - 'error': str(e) - }).encode('utf-8') + "error": str(e) + }).encode("utf-8") def on_delete(self, req, res, model_name): # pylint: disable=W0613 if model_name not in self._model_tfs_pid: res.status = falcon.HTTP_404 res.body = json.dumps({ - 'error': 'Model {} is not loaded yet'.format(model_name) + "error": "Model {} is not loaded yet".format(model_name) }) else: try: self._model_tfs_pid[model_name].kill() - os.remove('/sagemaker/tfs-config/{}/model-config.cfg'.format(model_name)) - os.rmdir('/sagemaker/tfs-config/{}'.format(model_name)) + os.remove("/sagemaker/tfs-config/{}/model-config.cfg".format(model_name)) + os.rmdir("/sagemaker/tfs-config/{}".format(model_name)) release_rest_port = self._model_tfs_rest_port[model_name] release_grpc_port = self._model_tfs_grpc_port[model_name] with lock(): - bisect.insort(self._tfs_ports['rest_port'], release_rest_port) - bisect.insort(self._tfs_ports['grpc_port'], release_grpc_port) + bisect.insort(self._tfs_ports["rest_port"], release_rest_port) + bisect.insort(self._tfs_ports["grpc_port"], release_grpc_port) del self._model_tfs_rest_port[model_name] del self._model_tfs_grpc_port[model_name] del self._model_tfs_pid[model_name] res.status = falcon.HTTP_200 res.body = json.dumps({ - 'success': 'Successfully unloaded model {}.'.format(model_name) + "success": "Successfully unloaded model {}.".format(model_name) }) except OSError as error: res.status = falcon.HTTP_500 res.body = json.dumps({ - 'error': str(error) - }).encode('utf-8') + "error": str(error) + }).encode("utf-8") def validate_model_dir(self, model_path): # model base path doesn't exits @@ -425,13 +425,13 @@ def __init__(self): self._ping_resource = PingResource() def add_routes(self, application): - application.add_route('/ping', self._ping_resource) - application.add_route('/invocations', self._python_service_resource) + application.add_route("/ping", self._ping_resource) + application.add_route("/invocations", self._python_service_resource) if self._enable_model_manager: - application.add_route('/models', self._python_service_resource) - application.add_route('/models/{model_name}', self._python_service_resource) - application.add_route('/models/{model_name}/invoke', self._python_service_resource) + application.add_route("/models", self._python_service_resource) + application.add_route("/models/{model_name}", self._python_service_resource) + application.add_route("/models/{model_name}/invoke", self._python_service_resource) app = falcon.API() diff --git a/docker/build_artifacts/sagemaker/serve.py b/docker/build_artifacts/sagemaker/serve.py index 6bf1f8a6..231cbc59 100644 --- a/docker/build_artifacts/sagemaker/serve.py +++ b/docker/build_artifacts/sagemaker/serve.py @@ -23,161 +23,161 @@ logging.basicConfig(level=logging.INFO) log = logging.getLogger(__name__) -JS_PING = 'js_content ping' -JS_INVOCATIONS = 'js_content invocations' -GUNICORN_PING = 'proxy_pass http://gunicorn_upstream/ping' -GUNICORN_INVOCATIONS = 'proxy_pass http://gunicorn_upstream/invocations' +JS_PING = "js_content ping" +JS_INVOCATIONS = "js_content invocations" +GUNICORN_PING = "proxy_pass http://gunicorn_upstream/ping" +GUNICORN_INVOCATIONS = "proxy_pass http://gunicorn_upstream/invocations" -PYTHON_LIB_PATH = '/opt/ml/model/code/lib' -REQUIREMENTS_PATH = '/opt/ml/model/code/requirements.txt' -INFERENCE_PATH = '/opt/ml/model/code/inference.py' +PYTHON_LIB_PATH = "/opt/ml/model/code/lib" +REQUIREMENTS_PATH = "/opt/ml/model/code/requirements.txt" +INFERENCE_PATH = "/opt/ml/model/code/inference.py" class ServiceManager(object): def __init__(self): - self._state = 'initializing' + self._state = "initializing" self._nginx = None self._tfs = None self._gunicorn = None self._gunicorn_command = None self._enable_python_service = os.path.exists(INFERENCE_PATH) - self._tfs_version = os.environ.get('SAGEMAKER_TFS_VERSION', '1.13') - self._nginx_http_port = os.environ.get('SAGEMAKER_BIND_TO_PORT', '8080') - self._nginx_loglevel = os.environ.get('SAGEMAKER_TFS_NGINX_LOGLEVEL', 'error') - self._tfs_default_model_name = os.environ.get('SAGEMAKER_TFS_DEFAULT_MODEL_NAME', 'None') - self._sagemaker_port_range = os.environ.get('SAGEMAKER_SAFE_PORT_RANGE', None) - self._tfs_config_path = '/sagemaker/model-config.cfg' - self._tfs_batching_config_path = '/sagemaker/batching-config.cfg' - - _enable_batching = os.environ.get('SAGEMAKER_TFS_ENABLE_BATCHING', 'false').lower() - _enable_multi_model_endpoint = os.environ.get('SAGEMAKER_MULTI_MODEL', - 'false').lower() - - if _enable_batching not in ['true', 'false']: - raise ValueError('SAGEMAKER_TFS_ENABLE_BATCHING must be "true" or "false"') - self._tfs_enable_batching = _enable_batching == 'true' - - if _enable_multi_model_endpoint not in ['true', 'false']: - raise ValueError('SAGEMAKER_MULTI_MODEL must be "true" or "false"') - self._tfs_enable_multi_model_endpoint = _enable_multi_model_endpoint == 'true' + self._tfs_version = os.environ.get("SAGEMAKER_TFS_VERSION", "1.13") + self._nginx_http_port = os.environ.get("SAGEMAKER_BIND_TO_PORT", "8080") + self._nginx_loglevel = os.environ.get("SAGEMAKER_TFS_NGINX_LOGLEVEL", "error") + self._tfs_default_model_name = os.environ.get("SAGEMAKER_TFS_DEFAULT_MODEL_NAME", "None") + self._sagemaker_port_range = os.environ.get("SAGEMAKER_SAFE_PORT_RANGE", None) + self._tfs_config_path = "/sagemaker/model-config.cfg" + self._tfs_batching_config_path = "/sagemaker/batching-config.cfg" + + _enable_batching = os.environ.get("SAGEMAKER_TFS_ENABLE_BATCHING", "false").lower() + _enable_multi_model_endpoint = os.environ.get("SAGEMAKER_MULTI_MODEL", + "false").lower() + + if _enable_batching not in ["true", "false"]: + raise ValueError("SAGEMAKER_TFS_ENABLE_BATCHING must be 'true' or 'false'") + self._tfs_enable_batching = _enable_batching == "true" + + if _enable_multi_model_endpoint not in ["true", "false"]: + raise ValueError("SAGEMAKER_MULTI_MODEL must be 'true' or 'false'") + self._tfs_enable_multi_model_endpoint = _enable_multi_model_endpoint == "true" self._use_gunicorn = self._enable_python_service or self._tfs_enable_multi_model_endpoint if self._sagemaker_port_range is not None: - parts = self._sagemaker_port_range.split('-') + parts = self._sagemaker_port_range.split("-") low = int(parts[0]) hi = int(parts[1]) if low + 2 > hi: - raise ValueError('not enough ports available in SAGEMAKER_SAFE_PORT_RANGE ({})' + raise ValueError("not enough ports available in SAGEMAKER_SAFE_PORT_RANGE ({})" .format(self._sagemaker_port_range)) self._tfs_grpc_port = str(low) self._tfs_rest_port = str(low + 1) else: # just use the standard default ports - self._tfs_grpc_port = '9000' - self._tfs_rest_port = '8501' + self._tfs_grpc_port = "9000" + self._tfs_rest_port = "8501" # set environment variable for python service - os.environ['TFS_GRPC_PORT'] = self._tfs_grpc_port - os.environ['TFS_REST_PORT'] = self._tfs_rest_port + os.environ["TFS_GRPC_PORT"] = self._tfs_grpc_port + os.environ["TFS_REST_PORT"] = self._tfs_rest_port def _create_tfs_config(self): models = tfs_utils.find_models() if not models: - raise ValueError('no SavedModel bundles found!') + raise ValueError("no SavedModel bundles found!") - if self._tfs_default_model_name == 'None': + if self._tfs_default_model_name == "None": default_model = os.path.basename(models[0]) if default_model: self._tfs_default_model_name = default_model - log.info('using default model name: {}'.format(self._tfs_default_model_name)) + log.info("using default model name: {}".format(self._tfs_default_model_name)) else: - log.info('no default model detected') + log.info("no default model detected") # config (may) include duplicate 'config' keys, so we can't just dump a dict - config = 'model_config_list: {\n' + config = "model_config_list: {\n" for m in models: - config += ' config: {\n' - config += ' name: "{}",\n'.format(os.path.basename(m)) - config += ' base_path: "{}",\n'.format(m) - config += ' model_platform: "tensorflow"\n' - config += ' }\n' - config += '}\n' + config += " config: {\n" + config += " name: '{}',\n".format(os.path.basename(m)) + config += " base_path: '{}',\n".format(m) + config += " model_platform: 'tensorflow'\n" + config += " }\n" + config += "}\n" - log.info('tensorflow serving model config: \n%s\n', config) + log.info("tensorflow serving model config: \n%s\n", config) - with open('/sagemaker/model-config.cfg', 'w') as f: + with open("/sagemaker/model-config.cfg", "w") as f: f.write(config) def _setup_gunicorn(self): python_path_content = [] - python_path_option = '' + python_path_option = "" if self._enable_python_service: lib_path_exists = os.path.exists(PYTHON_LIB_PATH) requirements_exists = os.path.exists(REQUIREMENTS_PATH) - python_path_content = ['/opt/ml/model/code'] - python_path_option = '--pythonpath ' + python_path_content = ["/opt/ml/model/code"] + python_path_option = "--pythonpath " if lib_path_exists: python_path_content.append(PYTHON_LIB_PATH) if requirements_exists: if lib_path_exists: - log.warning('loading modules in "{}", ignoring requirements.txt' + log.warning("loading modules in '{}', ignoring requirements.txt" .format(PYTHON_LIB_PATH)) else: - log.info('installing packages from requirements.txt...') - pip_install_cmd = 'pip3 install -r {}'.format(REQUIREMENTS_PATH) + log.info("installing packages from requirements.txt...") + pip_install_cmd = "pip3 install -r {}".format(REQUIREMENTS_PATH) try: subprocess.check_call(pip_install_cmd.split()) except subprocess.CalledProcessError: - log.error('failed to install required packages, exiting.') + log.error("failed to install required packages, exiting.") self._stop() - raise ChildProcessError('failed to install required packages.') + raise ChildProcessError("failed to install required packages.") gunicorn_command = ( - 'gunicorn -b unix:/tmp/gunicorn.sock -k gevent --chdir /sagemaker ' - '{}{} -e TFS_GRPC_PORT={} -e SAGEMAKER_MULTI_MODEL={} -e SAGEMAKER_SAFE_PORT_RANGE={} ' - 'python_service:app').format(python_path_option, ','.join(python_path_content), + "gunicorn -b unix:/tmp/gunicorn.sock -k gevent --chdir /sagemaker " + "{}{} -e TFS_GRPC_PORT={} -e SAGEMAKER_MULTI_MODEL={} -e SAGEMAKER_SAFE_PORT_RANGE={} " + "python_service:app").format(python_path_option, ",".join(python_path_content), self._tfs_grpc_port, self._tfs_enable_multi_model_endpoint, self._sagemaker_port_range) - log.info('gunicorn command: {}'.format(gunicorn_command)) + log.info("gunicorn command: {}".format(gunicorn_command)) self._gunicorn_command = gunicorn_command def _create_nginx_config(self): template = self._read_nginx_template() - pattern = re.compile(r'%(\w+)%') + pattern = re.compile(r"%(\w+)%") template_values = { - 'TFS_VERSION': self._tfs_version, - 'TFS_REST_PORT': self._tfs_rest_port, - 'TFS_DEFAULT_MODEL_NAME': self._tfs_default_model_name, - 'NGINX_HTTP_PORT': self._nginx_http_port, - 'NGINX_LOG_LEVEL': self._nginx_loglevel, - 'FORWARD_PING_REQUESTS': GUNICORN_PING if self._use_gunicorn else JS_PING, - 'FORWARD_INVOCATION_REQUESTS': GUNICORN_INVOCATIONS if self._use_gunicorn + "TFS_VERSION": self._tfs_version, + "TFS_REST_PORT": self._tfs_rest_port, + "TFS_DEFAULT_MODEL_NAME": self._tfs_default_model_name, + "NGINX_HTTP_PORT": self._nginx_http_port, + "NGINX_LOG_LEVEL": self._nginx_loglevel, + "FORWARD_PING_REQUESTS": GUNICORN_PING if self._use_gunicorn else JS_PING, + "FORWARD_INVOCATION_REQUESTS": GUNICORN_INVOCATIONS if self._use_gunicorn else JS_INVOCATIONS, } config = pattern.sub(lambda x: template_values[x.group(1)], template) - log.info('nginx config: \n%s\n', config) + log.info("nginx config: \n%s\n", config) - with open('/sagemaker/nginx.conf', 'w') as f: + with open("/sagemaker/nginx.conf", "w") as f: f.write(config) def _read_nginx_template(self): - with open('/sagemaker/nginx.conf.template', 'r') as f: + with open("/sagemaker/nginx.conf.template", "r") as f: template = f.read() if not template: - raise ValueError('failed to read nginx.conf.template') + raise ValueError("failed to read nginx.conf.template") return template def _start_tfs(self): - self._log_version('tensorflow_model_server --version', 'tensorflow version info:') + self._log_version("tensorflow_model_server --version", "tensorflow version info:") cmd = tfs_utils.tfs_command( self._tfs_grpc_port, self._tfs_rest_port, @@ -185,37 +185,37 @@ def _start_tfs(self): self._tfs_enable_batching, self._tfs_batching_config_path, ) - log.info('tensorflow serving command: {}'.format(cmd)) + log.info("tensorflow serving command: {}".format(cmd)) p = subprocess.Popen(cmd.split()) - log.info('started tensorflow serving (pid: %d)', p.pid) + log.info("started tensorflow serving (pid: %d)", p.pid) self._tfs = p def _start_gunicorn(self): - self._log_version('gunicorn --version', 'gunicorn version info:') + self._log_version("gunicorn --version", "gunicorn version info:") env = os.environ.copy() - env['TFS_DEFAULT_MODEL_NAME'] = self._tfs_default_model_name + env["TFS_DEFAULT_MODEL_NAME"] = self._tfs_default_model_name p = subprocess.Popen(self._gunicorn_command.split(), env=env) - log.info('started gunicorn (pid: %d)', p.pid) + log.info("started gunicorn (pid: %d)", p.pid) self._gunicorn = p def _start_nginx(self): - self._log_version('/usr/sbin/nginx -V', 'nginx version info:') - p = subprocess.Popen('/usr/sbin/nginx -c /sagemaker/nginx.conf'.split()) - log.info('started nginx (pid: %d)', p.pid) + self._log_version("/usr/sbin/nginx -V", "nginx version info:") + p = subprocess.Popen("/usr/sbin/nginx -c /sagemaker/nginx.conf".split()) + log.info("started nginx (pid: %d)", p.pid) self._nginx = p def _log_version(self, command, message): try: output = subprocess.check_output( command.split(), - stderr=subprocess.STDOUT).decode('utf-8', 'backslashreplace').strip() - log.info('{}\n{}'.format(message, output)) + stderr=subprocess.STDOUT).decode("utf-8", "backslashreplace").strip() + log.info("{}\n{}".format(message, output)) except subprocess.CalledProcessError: - log.warning('failed to run command: %s', command) + log.warning("failed to run command: %s", command) def _stop(self, *args): # pylint: disable=W0613 - self._state = 'stopping' - log.info('stopping services') + self._state = "stopping" + log.info("stopping services") try: os.kill(self._nginx.pid, signal.SIGQUIT) except OSError: @@ -230,19 +230,19 @@ def _stop(self, *args): # pylint: disable=W0613 except OSError: pass - self._state = 'stopped' - log.info('stopped') + self._state = "stopped" + log.info("stopped") def _wait_for_gunicorn(self): while True: - if os.path.exists('/tmp/gunicorn.sock'): - log.info('gunicorn server is ready!') + if os.path.exists("/tmp/gunicorn.sock"): + log.info("gunicorn server is ready!") return @contextmanager def _timeout(self, seconds): def _raise_timeout_error(signum, frame): - raise TimeoutError('time out after {} seconds'.format(seconds)) + raise TimeoutError("time out after {} seconds".format(seconds)) try: signal.signal(signal.SIGALRM, _raise_timeout_error) @@ -252,12 +252,12 @@ def _raise_timeout_error(signum, frame): signal.alarm(0) def start(self): - log.info('starting services') - self._state = 'starting' + log.info("starting services") + self._state = "starting" signal.signal(signal.SIGTERM, self._stop) if self._tfs_enable_multi_model_endpoint: - log.info('multi-model endpoint is enabled, TFS model servers will be started later') + log.info("multi-model endpoint is enabled, TFS model servers will be started later") else: tfs_utils.create_tfs_config( self._tfs_default_model_name, @@ -269,7 +269,7 @@ def start(self): self._create_nginx_config() if self._tfs_enable_batching: - log.info('batching is enabled') + log.info("batching is enabled") tfs_utils.create_batching_config(self._tfs_batching_config_path) if self._use_gunicorn: @@ -280,30 +280,30 @@ def start(self): self._wait_for_gunicorn() self._start_nginx() - self._state = 'started' + self._state = "started" while True: pid, status = os.wait() - if self._state != 'started': + if self._state != "started": break if pid == self._nginx.pid: - log.warning('unexpected nginx exit (status: {}). restarting.'.format(status)) + log.warning("unexpected nginx exit (status: {}). restarting.".format(status)) self._start_nginx() elif pid == self._tfs.pid: log.warning( - 'unexpected tensorflow serving exit (status: {}). restarting.'.format(status)) + "unexpected tensorflow serving exit (status: {}). restarting.".format(status)) self._start_tfs() elif self._gunicorn and pid == self._gunicorn.pid: - log.warning('unexpected gunicorn exit (status: {}). restarting.' + log.warning("unexpected gunicorn exit (status: {}). restarting." .format(status)) self._start_gunicorn() self._stop() -if __name__ == '__main__': +if __name__ == "__main__": ServiceManager().start() diff --git a/docker/build_artifacts/sagemaker/tfs_utils.py b/docker/build_artifacts/sagemaker/tfs_utils.py index 42247f9f..67d61f95 100644 --- a/docker/build_artifacts/sagemaker/tfs_utils.py +++ b/docker/build_artifacts/sagemaker/tfs_utils.py @@ -21,13 +21,13 @@ logging.basicConfig(level=logging.INFO) log = logging.getLogger(__name__) -DEFAULT_CONTENT_TYPE = 'application/json' -DEFAULT_ACCEPT_HEADER = 'application/json' -CUSTOM_ATTRIBUTES_HEADER = 'X-Amzn-SageMaker-Custom-Attributes' +DEFAULT_CONTENT_TYPE = "application/json" +DEFAULT_ACCEPT_HEADER = "application/json" +CUSTOM_ATTRIBUTES_HEADER = "X-Amzn-SageMaker-Custom-Attributes" -Context = namedtuple('Context', - 'model_name, model_version, method, rest_uri, grpc_port, ' - 'custom_attributes, request_content_type, accept_header, content_length') +Context = namedtuple("Context", + "model_name, model_version, method, rest_uri, grpc_port, " + "custom_attributes, request_content_type, accept_header, content_length") def parse_request(req, rest_port, grpc_port, default_model_name, model_name=None): @@ -35,16 +35,16 @@ def parse_request(req, rest_port, grpc_port, default_model_name, model_name=None tfs_uri = make_tfs_uri(rest_port, tfs_attributes, default_model_name, model_name) if not model_name: - model_name = tfs_attributes.get('tfs-model-name') + model_name = tfs_attributes.get("tfs-model-name") context = Context(model_name, - tfs_attributes.get('tfs-model-version'), - tfs_attributes.get('tfs-method'), + tfs_attributes.get("tfs-model-version"), + tfs_attributes.get("tfs-method"), tfs_uri, grpc_port, req.get_header(CUSTOM_ATTRIBUTES_HEADER), - req.get_header('Content-Type') or DEFAULT_CONTENT_TYPE, - req.get_header('Accept') or DEFAULT_ACCEPT_HEADER, + req.get_header("Content-Type") or DEFAULT_CONTENT_TYPE, + req.get_header("Accept") or DEFAULT_ACCEPT_HEADER, req.content_length) data = req.stream @@ -55,13 +55,13 @@ def make_tfs_uri(port, attributes, default_model_name, model_name=None): log.info("sagemaker tfs attributes: \n{}".format(attributes)) tfs_model_name = model_name or attributes.get("tfs-model-name", default_model_name) - tfs_model_version = attributes.get('tfs-model-version') - tfs_method = attributes.get('tfs-method', 'predict') + tfs_model_version = attributes.get("tfs-model-version") + tfs_method = attributes.get("tfs-method", "predict") - uri = 'http://localhost:{}/v1/models/{}'.format(port, tfs_model_name) + uri = "http://localhost:{}/v1/models/{}".format(port, tfs_model_name) if tfs_model_version: - uri += '/versions/' + tfs_model_version - uri += ':' + tfs_method + uri += "/versions/" + tfs_model_version + uri += ":" + tfs_method return uri @@ -75,13 +75,13 @@ def parse_tfs_custom_attributes(req): def create_tfs_config_individual_model(model_name, base_path): - config = 'model_config_list: {\n' - config += ' config: {\n' - config += ' name: "{}",\n'.format(model_name) - config += ' base_path: "{}",\n'.format(base_path) - config += ' model_platform: "tensorflow"\n' - config += ' }\n' - config += '}\n' + config = "model_config_list: {\n" + config += " config: {\n" + config += " name: '{}',\n".format(model_name) + config += " base_path: '{}',\n".format(base_path) + config += " model_platform: 'tensorflow'\n" + config += " }\n" + config += "}\n" return config @@ -91,25 +91,25 @@ def create_tfs_config( ): models = find_models() if not models: - raise ValueError('no SavedModel bundles found!') + raise ValueError("no SavedModel bundles found!") - if tfs_default_model_name == 'None': + if tfs_default_model_name == "None": default_model = os.path.basename(models[0]) if default_model: tfs_default_model_name = default_model - log.info('using default model name: {}'.format(tfs_default_model_name)) + log.info("using default model name: {}".format(tfs_default_model_name)) else: - log.info('no default model detected') + log.info("no default model detected") # config (may) include duplicate 'config' keys, so we can't just dump a dict - config = 'model_config_list: {\n' + config = "model_config_list: {\n" for m in models: - config += ' config: {\n' - config += ' name: "{}",\n'.format(os.path.basename(m)) - config += ' base_path: "{}",\n'.format(m) - config += ' model_platform: "tensorflow"\n' - config += ' }\n' - config += '}\n' + config += " config: {\n" + config += " name: '{}',\n".format(os.path.basename(m)) + config += " base_path: '{}',\n".format(m) + config += " model_platform: 'tensorflow'\n" + config += " }\n" + config += "}\n" with open(tfs_config_path, 'w') as f: f.write(config) @@ -131,12 +131,12 @@ def tfs_command(tfs_grpc_port, def find_models(): - base_path = '/opt/ml/model' + base_path = "/opt/ml/model" models = [] for f in _find_saved_model_files(base_path): - parts = f.split('/') - if len(parts) >= 6 and re.match(r'^\d+$', parts[-2]): - model_path = '/'.join(parts[0:-2]) + parts = f.split("/") + if len(parts) >= 6 and re.match(r"^\d+$", parts[-2]): + model_path = "/".join(parts[0:-2]) if model_path not in models: models.append(model_path) return models @@ -147,7 +147,7 @@ def _find_saved_model_files(path): if e.is_dir(): yield from _find_saved_model_files(os.path.join(path, e.name)) else: - if e.name == 'saved_model.pb': + if e.name == "saved_model.pb": yield os.path.join(path, e.name) @@ -169,39 +169,39 @@ def __init__(self, key, env_var, value, defaulted_message): cpu_count = multiprocessing.cpu_count() batching_parameters = [ - _BatchingParameter('max_batch_size', 'SAGEMAKER_TFS_MAX_BATCH_SIZE', 8, + _BatchingParameter("max_batch_size", "SAGEMAKER_TFS_MAX_BATCH_SIZE", 8, "max_batch_size defaulted to {}. Set {} to override default. " "Tuning this parameter may yield better performance."), - _BatchingParameter('batch_timeout_micros', 'SAGEMAKER_TFS_BATCH_TIMEOUT_MICROS', 1000, + _BatchingParameter("batch_timeout_micros", "SAGEMAKER_TFS_BATCH_TIMEOUT_MICROS", 1000, "batch_timeout_micros defaulted to {}. Set {} to override " "default. Tuning this parameter may yield better performance."), - _BatchingParameter('num_batch_threads', 'SAGEMAKER_TFS_NUM_BATCH_THREADS', + _BatchingParameter("num_batch_threads", "SAGEMAKER_TFS_NUM_BATCH_THREADS", cpu_count, "num_batch_threads defaulted to {}," "the number of CPUs. Set {} to override default."), - _BatchingParameter('max_enqueued_batches', 'SAGEMAKER_TFS_MAX_ENQUEUED_BATCHES', + _BatchingParameter("max_enqueued_batches", "SAGEMAKER_TFS_MAX_ENQUEUED_BATCHES", # Batch limits number of concurrent requests, which limits number # of enqueued batches, so this can be set high for Batch - 100000000 if 'SAGEMAKER_BATCH' in os.environ else cpu_count, + 100000000 if "SAGEMAKER_BATCH" in os.environ else cpu_count, "max_enqueued_batches defaulted to {}. Set {} to override default. " "Tuning this parameter may be necessary to tune out-of-memory " "errors occur."), ] - warning_message = '' + warning_message = "" for batching_parameter in batching_parameters: if batching_parameter.env_var in os.environ: batching_parameter.value = os.environ[batching_parameter.env_var] else: warning_message += batching_parameter.defaulted_message.format( batching_parameter.value, batching_parameter.env_var) - warning_message += '\n' + warning_message += "\n" if warning_message: log.warning(warning_message) - config = '' + config = "" for batching_parameter in batching_parameters: - config += '%s { value: %s }\n' % (batching_parameter.key, batching_parameter.value) + config += "%s { value: %s }\n" % (batching_parameter.key, batching_parameter.value) - log.info('batching config: \n%s\n', config) - with open(batching_config_file, 'w') as f: + log.info("batching config: \n%s\n", config) + with open(batching_config_file, "w") as f: f.write(config) diff --git a/test/integration/local/conftest.py b/test/integration/local/conftest.py index f610329b..050ba552 100644 --- a/test/integration/local/conftest.py +++ b/test/integration/local/conftest.py @@ -13,51 +13,51 @@ import pytest -FRAMEWORK_LATEST_VERSION = '1.13' -TFS_DOCKER_BASE_NAME = 'sagemaker-tensorflow-serving' +FRAMEWORK_LATEST_VERSION = "1.13" +TFS_DOCKER_BASE_NAME = "sagemaker-tensorflow-serving" def pytest_addoption(parser): - parser.addoption('--docker-base-name', default=TFS_DOCKER_BASE_NAME) - parser.addoption('--framework-version', default=FRAMEWORK_LATEST_VERSION, required=True) - parser.addoption('--processor', default='cpu', choices=['cpu', 'gpu']) - parser.addoption('--tag') + parser.addoption("--docker-base-name", default=TFS_DOCKER_BASE_NAME) + parser.addoption("--framework-version", default=FRAMEWORK_LATEST_VERSION, required=True) + parser.addoption("--processor", default="cpu", choices=["cpu", "gpu"]) + parser.addoption("--tag") -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def docker_base_name(request): - return request.config.getoption('--docker-base-name') + return request.config.getoption("--docker-base-name") -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def framework_version(request): - return request.config.getoption('--framework-version') + return request.config.getoption("--framework-version") -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def processor(request): - return request.config.getoption('--processor') + return request.config.getoption("--processor") -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def runtime_config(request, processor): - if processor == 'gpu': - return '--runtime=nvidia ' + if processor == "gpu": + return "--runtime=nvidia " else: - return '' + return "" -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def tag(request, framework_version, processor): - image_tag = request.config.getoption('--tag') + image_tag = request.config.getoption("--tag") if not image_tag: - image_tag = '{}-{}'.format(framework_version, processor) + image_tag = "{}-{}".format(framework_version, processor) return image_tag @pytest.fixture(autouse=True) def skip_by_device_type(request, processor): is_gpu = processor == "gpu" - if (request.node.get_closest_marker('skip_gpu') and is_gpu) or \ - (request.node.get_closest_marker('skip_cpu') and not is_gpu): - pytest.skip('Skipping because running on \'{}\' instance'.format(processor)) + if (request.node.get_closest_marker("skip_gpu") and is_gpu) or \ + (request.node.get_closest_marker("skip_cpu") and not is_gpu): + pytest.skip("Skipping because running on \"{}\" instance".format(processor)) diff --git a/test/integration/local/multi_model_endpoint_test_utils.py b/test/integration/local/multi_model_endpoint_test_utils.py index bce63250..508b6615 100644 --- a/test/integration/local/multi_model_endpoint_test_utils.py +++ b/test/integration/local/multi_model_endpoint_test_utils.py @@ -11,50 +11,49 @@ # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. -import encodings import json import requests -INVOCATION_URL = 'http://localhost:8080/models/{}/invoke' -MODELS_URL = 'http://localhost:8080/models' -DELETE_MODEL_URL = 'http://localhost:8080/models/{}' +INVOCATION_URL = "http://localhost:8080/models/{}/invoke" +MODELS_URL = "http://localhost:8080/models" +DELETE_MODEL_URL = "http://localhost:8080/models/{}" -def make_headers(content_type='application/json', method='predict'): +def make_headers(content_type="application/json", method="predict"): headers = { - 'Content-Type': content_type, - 'X-Amzn-SageMaker-Custom-Attributes': 'tfs-method=%s' % method + "Content-Type": content_type, + "X-Amzn-SageMaker-Custom-Attributes": "tfs-method=%s" % method } return headers -def make_invocation_request(data, model_name, content_type='application/json'): +def make_invocation_request(data, model_name, content_type="application/json"): headers = { - 'Content-Type': content_type, - 'X-Amzn-SageMaker-Custom-Attributes': 'tfs-method=predict' + "Content-Type": content_type, + "X-Amzn-SageMaker-Custom-Attributes": "tfs-method=predict" } response = requests.post(INVOCATION_URL.format(model_name), data=data, headers=headers) - return response.status_code, response.content.decode(encodings.utf_8.getregentry().name) + return response.status_code, response.content.decode("utf-8") def make_list_model_request(): response = requests.get(MODELS_URL) - return response.status_code, response.content.decode(encodings.utf_8.getregentry().name) + return response.status_code, response.content.decode("utf-8") def make_get_model_request(model_name): - response = requests.get(MODELS_URL + '/{}'.format(model_name)) - return response.status_code, response.content.decode(encodings.utf_8.getregentry().name) + response = requests.get(MODELS_URL + "/{}".format(model_name)) + return response.status_code, response.content.decode("utf-8") -def make_load_model_request(data, content_type='application/json'): +def make_load_model_request(data, content_type="application/json"): headers = { - 'Content-Type': content_type + "Content-Type": content_type } response = requests.post(MODELS_URL, data=data, headers=headers) - return response.status_code, response.content.decode(encodings.utf_8.getregentry().name) + return response.status_code, response.content.decode("utf-8") def make_unload_model_request(model_name): response = requests.delete(DELETE_MODEL_URL.format(model_name)) - return response.status_code, response.content.decode(encodings.utf_8.getregentry().name) + return response.status_code, response.content.decode("utf-8") diff --git a/test/integration/local/test_container.py b/test/integration/local/test_container.py index b1e814eb..21650dd3 100644 --- a/test/integration/local/test_container.py +++ b/test/integration/local/test_container.py @@ -20,36 +20,36 @@ import pytest import requests -BASE_URL = 'http://localhost:8080/invocations' +BASE_URL = "http://localhost:8080/invocations" -@pytest.fixture(scope='session', autouse=True) +@pytest.fixture(scope="session", autouse=True) def volume(): try: - model_dir = os.path.abspath('test/resources/models') + model_dir = os.path.abspath("test/resources/models") subprocess.check_call( - 'docker volume create --name model_volume --opt type=none ' - '--opt device={} --opt o=bind'.format(model_dir).split()) + "docker volume create --name model_volume --opt type=none " + "--opt device={} --opt o=bind".format(model_dir).split()) yield model_dir finally: - subprocess.check_call('docker volume rm model_volume'.split()) + subprocess.check_call("docker volume rm model_volume".split()) -@pytest.fixture(scope='module', autouse=True, params=[True, False]) +@pytest.fixture(scope="module", autouse=True, params=[True, False]) def container(request, docker_base_name, tag, runtime_config): try: if request.param: - batching_config = ' -e SAGEMAKER_TFS_ENABLE_BATCHING=true' + batching_config = " -e SAGEMAKER_TFS_ENABLE_BATCHING=true" else: - batching_config = '' + batching_config = "" command = ( - 'docker run {}--name sagemaker-tensorflow-serving-test -p 8080:8080' - ' --mount type=volume,source=model_volume,target=/opt/ml/model,readonly' - ' -e SAGEMAKER_TFS_NGINX_LOGLEVEL=info' - ' -e SAGEMAKER_BIND_TO_PORT=8080' - ' -e SAGEMAKER_SAFE_PORT_RANGE=9000-9999' - ' {}' - ' {}:{} serve' + "docker run {}--name sagemaker-tensorflow-serving-test -p 8080:8080" + " --mount type=volume,source=model_volume,target=/opt/ml/model,readonly" + " -e SAGEMAKER_TFS_NGINX_LOGLEVEL=info" + " -e SAGEMAKER_BIND_TO_PORT=8080" + " -e SAGEMAKER_SAFE_PORT_RANGE=9000-9999" + " {}" + " {}:{} serve" ).format(runtime_config, batching_config, docker_base_name, tag) proc = subprocess.Popen(command.split(), stdout=sys.stdout, stderr=subprocess.STDOUT) @@ -59,7 +59,7 @@ def container(request, docker_base_name, tag, runtime_config): while attempts < 40: time.sleep(3) try: - res_code = requests.get('http://localhost:8080/ping').status_code + res_code = requests.get("http://localhost:8080/ping").status_code if res_code == 200: break except: @@ -68,126 +68,126 @@ def container(request, docker_base_name, tag, runtime_config): yield proc.pid finally: - subprocess.check_call('docker rm -f sagemaker-tensorflow-serving-test'.split()) + subprocess.check_call("docker rm -f sagemaker-tensorflow-serving-test".split()) -def make_request(data, content_type='application/json', method='predict'): +def make_request(data, content_type="application/json", method="predict"): headers = { - 'Content-Type': content_type, - 'X-Amzn-SageMaker-Custom-Attributes': - 'tfs-model-name=half_plus_three,tfs-method=%s' % method + "Content-Type": content_type, + "X-Amzn-SageMaker-Custom-Attributes": + "tfs-model-name=half_plus_three,tfs-method=%s" % method } response = requests.post(BASE_URL, data=data, headers=headers) - return json.loads(response.content.decode('utf-8')) + return json.loads(response.content.decode("utf-8")) def test_predict(): x = { - 'instances': [1.0, 2.0, 5.0] + "instances": [1.0, 2.0, 5.0] } y = make_request(json.dumps(x)) - assert y == {'predictions': [3.5, 4.0, 5.5]} + assert y == {"predictions": [3.5, 4.0, 5.5]} def test_predict_twice(): x = { - 'instances': [1.0, 2.0, 5.0] + "instances": [1.0, 2.0, 5.0] } y = make_request(json.dumps(x)) z = make_request(json.dumps(x)) - assert y == {'predictions': [3.5, 4.0, 5.5]} - assert z == {'predictions': [3.5, 4.0, 5.5]} + assert y == {"predictions": [3.5, 4.0, 5.5]} + assert z == {"predictions": [3.5, 4.0, 5.5]} def test_predict_two_instances(): x = { - 'instances': [[1.0, 2.0, 5.0], [1.0, 2.0, 5.0]] + "instances": [[1.0, 2.0, 5.0], [1.0, 2.0, 5.0]] } y = make_request(json.dumps(x)) - assert y == {'predictions': [[3.5, 4.0, 5.5], [3.5, 4.0, 5.5]]} + assert y == {"predictions": [[3.5, 4.0, 5.5], [3.5, 4.0, 5.5]]} def test_predict_jsons_json_content_type(): - x = '[1.0, 2.0, 5.0]\n[1.0, 2.0, 5.0]' + x = "[1.0, 2.0, 5.0]\n[1.0, 2.0, 5.0]" y = make_request(x) - assert y == {'predictions': [[3.5, 4.0, 5.5], [3.5, 4.0, 5.5]]} + assert y == {"predictions": [[3.5, 4.0, 5.5], [3.5, 4.0, 5.5]]} def test_predict_jsonlines(): - x = '[1.0, 2.0, 5.0]\n[1.0, 2.0, 5.0]' - y = make_request(x, 'application/jsonlines') - assert y == {'predictions': [[3.5, 4.0, 5.5], [3.5, 4.0, 5.5]]} + x = "[1.0, 2.0, 5.0]\n[1.0, 2.0, 5.0]" + y = make_request(x, "application/jsonlines") + assert y == {"predictions": [[3.5, 4.0, 5.5], [3.5, 4.0, 5.5]]} def test_predict_jsons(): - x = '[1.0, 2.0, 5.0]\n[1.0, 2.0, 5.0]' - y = make_request(x, 'application/jsons') - assert y == {'predictions': [[3.5, 4.0, 5.5], [3.5, 4.0, 5.5]]} + x = "[1.0, 2.0, 5.0]\n[1.0, 2.0, 5.0]" + y = make_request(x, "application/jsons") + assert y == {"predictions": [[3.5, 4.0, 5.5], [3.5, 4.0, 5.5]]} def test_predict_jsons_2(): - x = '{"x": [1.0, 2.0, 5.0]}\n{"x": [1.0, 2.0, 5.0]}' + x = "{\"x\": [1.0, 2.0, 5.0]}\n{\"x\": [1.0, 2.0, 5.0]}" y = make_request(x) - assert y == {'predictions': [[3.5, 4.0, 5.5], [3.5, 4.0, 5.5]]} + assert y == {"predictions": [[3.5, 4.0, 5.5], [3.5, 4.0, 5.5]]} def test_predict_generic_json(): x = [1.0, 2.0, 5.0] y = make_request(json.dumps(x)) - assert y == {'predictions': [[3.5, 4.0, 5.5]]} + assert y == {"predictions": [[3.5, 4.0, 5.5]]} def test_predict_generic_json_two_instances(): x = [[1.0, 2.0, 5.0], [1.0, 2.0, 5.0]] y = make_request(json.dumps(x)) - assert y == {'predictions': [[3.5, 4.0, 5.5], [3.5, 4.0, 5.5]]} + assert y == {"predictions": [[3.5, 4.0, 5.5], [3.5, 4.0, 5.5]]} def test_predict_csv(): - x = '1.0' - y = make_request(x, 'text/csv') - assert y == {'predictions': [3.5]} + x = "1.0" + y = make_request(x, "text/csv") + assert y == {"predictions": [3.5]} def test_predict_csv_with_zero(): - x = '0.0' - y = make_request(x, 'text/csv') - assert y == {'predictions': [3.0]} + x = "0.0" + y = make_request(x, "text/csv") + assert y == {"predictions": [3.0]} def test_predict_csv_one_instance_three_values_with_zero(): - x = '0.0,2.0,5.0' - y = make_request(x, 'text/csv') - assert y == {'predictions': [[3.0, 4.0, 5.5]]} + x = "0.0,2.0,5.0" + y = make_request(x, "text/csv") + assert y == {"predictions": [[3.0, 4.0, 5.5]]} def test_predict_csv_one_instance_three_values(): - x = '1.0,2.0,5.0' - y = make_request(x, 'text/csv') - assert y == {'predictions': [[3.5, 4.0, 5.5]]} + x = "1.0,2.0,5.0" + y = make_request(x, "text/csv") + assert y == {"predictions": [[3.5, 4.0, 5.5]]} def test_predict_csv_two_instances_three_values(): - x = '1.0,2.0,5.0\n1.0,2.0,5.0' - y = make_request(x, 'text/csv') - assert y == {'predictions': [[3.5, 4.0, 5.5], [3.5, 4.0, 5.5]]} + x = "1.0,2.0,5.0\n1.0,2.0,5.0" + y = make_request(x, "text/csv") + assert y == {"predictions": [[3.5, 4.0, 5.5], [3.5, 4.0, 5.5]]} def test_predict_csv_three_instances(): - x = '1.0\n2.0\n5.0' - y = make_request(x, 'text/csv') - assert y == {'predictions': [3.5, 4.0, 5.5]} + x = "1.0\n2.0\n5.0" + y = make_request(x, "text/csv") + assert y == {"predictions": [3.5, 4.0, 5.5]} def test_predict_csv_wide_categorical_input(): - x = ('0.0,1.0,0.0,1.0,0.0,1.0,0.0,1.0,0.0,1.0,1.0,0.0,1.0,0.0,1.0,0.0,1.0,0.0,1.0,0.0,0.0,1.0,0.0,1.0,0.0,1.0,0.0,1.0,0.0,0.0\n' # noqa - '0.0,1.0,0.0,1.0,0.0,1.0,0.0,1.0,0.0,1.0,1.0,0.0,1.0,0.0,1.0,0.0,1.0,0.0,1.0,0.0,1.0,0.0,1.0,0.0,1.0,0.0,1.0,0.0,6.0,0.0\n') # noqa + x = ("0.0,1.0,0.0,1.0,0.0,1.0,0.0,1.0,0.0,1.0,1.0,0.0,1.0,0.0,1.0,0.0,1.0,0.0,1.0,0.0,0.0,1.0,0.0,1.0,0.0,1.0,0.0,1.0,0.0,0.0\n" # noqa + "0.0,1.0,0.0,1.0,0.0,1.0,0.0,1.0,0.0,1.0,1.0,0.0,1.0,0.0,1.0,0.0,1.0,0.0,1.0,0.0,1.0,0.0,1.0,0.0,1.0,0.0,1.0,0.0,6.0,0.0\n") # noqa - y = make_request(x, 'text/csv') - predictions = y['predictions'] + y = make_request(x, "text/csv") + predictions = y["predictions"] assert 2 == len(predictions) assert 30 == len(predictions[0]) @@ -197,12 +197,12 @@ def test_predict_csv_wide_categorical_input(): def test_regress(): x = { - 'signature_name': 'tensorflow/serving/regress', - 'examples': [{'x': 1.0}, {'x': 2.0}] + "signature_name": "tensorflow/serving/regress", + "examples": [{"x": 1.0}, {"x": 2.0}] } - y = make_request(json.dumps(x), method='regress') - assert y == {'results': [3.5, 4.0]} + y = make_request(json.dumps(x), method="regress") + assert y == {"results": [3.5, 4.0]} def test_regress_one_instance(): @@ -210,48 +210,48 @@ def test_regress_one_instance(): # but it is actually 'results' # this test will catch if they change api to match docs (unlikely) x = { - 'signature_name': 'tensorflow/serving/regress', - 'examples': [{'x': 1.0}] + "signature_name": "tensorflow/serving/regress", + "examples": [{"x": 1.0}] } - y = make_request(json.dumps(x), method='regress') - assert y == {'results': [3.5]} + y = make_request(json.dumps(x), method="regress") + assert y == {"results": [3.5]} def test_predict_bad_input(): - y = make_request('whatever') - assert 'error' in y + y = make_request("whatever") + assert "error" in y def test_predict_bad_input_instances(): - x = json.dumps({'junk': 'data'}) + x = json.dumps({"junk": "data"}) y = make_request(x) - assert y['error'].startswith('Failed to process element: 0 key: junk of \'instances\' list.') + assert y["error"].startswith("Failed to process element: 0 key: junk of \'instances\' list.") def test_predict_no_custom_attributes_header(): x = { - 'instances': [1.0, 2.0, 5.0] + "instances": [1.0, 2.0, 5.0] } headers = { 'Content-Type': 'application/json' } response = requests.post(BASE_URL, data=json.dumps(x), headers=headers) - y = json.loads(response.content.decode('utf-8')) + y = json.loads(response.content.decode("utf-8")) - assert y == {'predictions': [3.5, 4.0, 5.5]} + assert y == {"predictions": [3.5, 4.0, 5.5]} def test_predict_with_jsonlines(): x = { - 'instances': [1.0, 2.0, 5.0] + "instances": [1.0, 2.0, 5.0] } headers = { - 'Content-Type': 'application/json', - 'Accept': 'application/jsonlines' + "Content-Type": "application/json", + "Accept": "application/jsonlines" } response = requests.post(BASE_URL, data=json.dumps(x), headers=headers) - assert response.headers['Content-Type'] == 'application/jsonlines' - assert response.content.decode('utf-8') == '{ "predictions": [3.5, 4.0, 5.5 ]}' + assert response.headers["Content-Type"] == "application/jsonlines" + assert response.content.decode("utf-8") == "{ \"predictions\": [3.5, 4.0, 5.5 ]}" diff --git a/test/integration/local/test_multi_model_endpoint.py b/test/integration/local/test_multi_model_endpoint.py index 7c1b575d..c38faee9 100644 --- a/test/integration/local/test_multi_model_endpoint.py +++ b/test/integration/local/test_multi_model_endpoint.py @@ -27,32 +27,32 @@ make_unload_model_request, ) -PING_URL = 'http://localhost:8080/ping' +PING_URL = "http://localhost:8080/ping" -@pytest.fixture(scope='session', autouse=True) +@pytest.fixture(scope="session", autouse=True) def volume(): try: - model_dir = os.path.abspath('test/resources/mme') + model_dir = os.path.abspath("test/resources/mme") subprocess.check_call( - 'docker volume create --name dynamic_endpoint_model_volume --opt type=none ' - '--opt device={} --opt o=bind'.format(model_dir).split()) + "docker volume create --name dynamic_endpoint_model_volume --opt type=none " + "--opt device={} --opt o=bind".format(model_dir).split()) yield model_dir finally: - subprocess.check_call('docker volume rm dynamic_endpoint_model_volume'.split()) + subprocess.check_call("docker volume rm dynamic_endpoint_model_volume".split()) -@pytest.fixture(scope='module', autouse=True) +@pytest.fixture(scope="module", autouse=True) def container(request, docker_base_name, tag, runtime_config): try: command = ( - 'docker run {}--name sagemaker-tensorflow-serving-test -p 8080:8080' - ' --mount type=volume,source=dynamic_endpoint_model_volume,target=/opt/ml/models,readonly' - ' -e SAGEMAKER_TFS_NGINX_LOGLEVEL=info' - ' -e SAGEMAKER_BIND_TO_PORT=8080' - ' -e SAGEMAKER_SAFE_PORT_RANGE=9000-9999' - ' -e SAGEMAKER_MULTI_MODEL=true' - ' {}:{} serve' + "docker run {}--name sagemaker-tensorflow-serving-test -p 8080:8080" + " --mount type=volume,source=dynamic_endpoint_model_volume,target=/opt/ml/models,readonly" + " -e SAGEMAKER_TFS_NGINX_LOGLEVEL=info" + " -e SAGEMAKER_BIND_TO_PORT=8080" + " -e SAGEMAKER_SAFE_PORT_RANGE=9000-9999" + " -e SAGEMAKER_MULTI_MODEL=true" + " {}:{} serve" ).format(runtime_config, docker_base_name, tag) proc = subprocess.Popen(command.split(), stdout=sys.stdout, stderr=subprocess.STDOUT) @@ -61,7 +61,7 @@ def container(request, docker_base_name, tag, runtime_config): while attempts < 40: time.sleep(3) try: - res_code = requests.get('http://localhost:8080/ping').status_code + res_code = requests.get("http://localhost:8080/ping").status_code if res_code == 200: break except: @@ -70,7 +70,7 @@ def container(request, docker_base_name, tag, runtime_config): yield proc.pid finally: - subprocess.check_call('docker rm -f sagemaker-tensorflow-serving-test'.split()) + subprocess.check_call("docker rm -f sagemaker-tensorflow-serving-test".split()) @pytest.mark.skip_gpu @@ -82,9 +82,9 @@ def test_ping(): @pytest.mark.skip_gpu def test_container_start_invocation_fail(): x = { - 'instances': [1.0, 2.0, 5.0] + "instances": [1.0, 2.0, 5.0] } - code, y = make_invocation_request(json.dumps(x), 'half_plus_three') + code, y = make_invocation_request(json.dumps(x), "half_plus_three") y = json.loads(y) assert code == 404 assert "Model half_plus_three is not loaded yet." in str(y) @@ -101,29 +101,29 @@ def test_list_models_empty(): @pytest.mark.skip_gpu def test_delete_unloaded_model(): # unloads the given model/version, no-op if not loaded - model_name = 'non-existing-model' + model_name = "non-existing-model" code, res = make_unload_model_request(model_name) assert code == 404 - assert 'Model {} is not loaded yet'.format(model_name) in res + assert "Model {} is not loaded yet".format(model_name) in res @pytest.mark.skip_gpu def test_delete_model(): - model_name = 'half_plus_three' + model_name = "half_plus_three" model_data = { - 'model_name': model_name, - 'url': '/opt/ml/models/half_plus_three' + "model_name": model_name, + "url": "/opt/ml/models/half_plus_three" } code, res = make_load_model_request(json.dumps(model_data)) assert code == 200 - assert 'Successfully loaded model {}'.format(model_name) in res + assert "Successfully loaded model {}".format(model_name) in res x = { - 'instances': [1.0, 2.0, 5.0] + "instances": [1.0, 2.0, 5.0] } _, y = make_invocation_request(json.dumps(x), model_name) y = json.loads(y) - assert y == {'predictions': [3.5, 4.0, 5.5]} + assert y == {"predictions": [3.5, 4.0, 5.5]} code_unload, res2 = make_unload_model_request(model_name) assert code_unload == 200 @@ -131,44 +131,44 @@ def test_delete_model(): code_invoke, y2 = make_invocation_request(json.dumps(x), model_name) y2 = json.loads(y2) assert code_invoke == 404 - assert 'Model {} is not loaded yet.'.format(model_name) in str(y2) + assert "Model {} is not loaded yet.".format(model_name) in str(y2) @pytest.mark.skip_gpu def test_load_two_models(): - model_name_1 = 'half_plus_two' + model_name_1 = "half_plus_two" model_data_1 = { - 'model_name': model_name_1, - 'url': '/opt/ml/models/half_plus_two' + "model_name": model_name_1, + "url": "/opt/ml/models/half_plus_two" } code1, res1 = make_load_model_request(json.dumps(model_data_1)) assert code1 == 200 - assert 'Successfully loaded model {}'.format(model_name_1) in res1 + assert "Successfully loaded model {}".format(model_name_1) in res1 # load second model - model_name_2 = 'half_plus_three' + model_name_2 = "half_plus_three" model_data_2 = { - 'model_name': model_name_2, - 'url': '/opt/ml/models/half_plus_three' + "model_name": model_name_2, + "url": "/opt/ml/models/half_plus_three" } code2, res2 = make_load_model_request(json.dumps(model_data_2)) assert code2 == 200 - assert 'Successfully loaded model {}'.format(model_name_2) in res2 + assert "Successfully loaded model {}".format(model_name_2) in res2 # make invocation request to the first model x = { - 'instances': [1.0, 2.0, 5.0] + "instances": [1.0, 2.0, 5.0] } code_invoke1, y1 = make_invocation_request(json.dumps(x), model_name_1) y1 = json.loads(y1) assert code_invoke1 == 200 - assert y1 == {'predictions': [2.5, 3.0, 4.5]} + assert y1 == {"predictions": [2.5, 3.0, 4.5]} # make invocation request to the second model - code_invoke2, y2 = make_invocation_request(json.dumps(x), 'half_plus_three') + code_invoke2, y2 = make_invocation_request(json.dumps(x), "half_plus_three") y2 = json.loads(y2) assert code_invoke2 == 200 - assert y2 == {'predictions': [3.5, 4.0, 5.5]} + assert y2 == {"predictions": [3.5, 4.0, 5.5]} code_list, res3 = make_list_model_request() res3 = json.loads(res3) @@ -177,38 +177,38 @@ def test_load_two_models(): @pytest.mark.skip_gpu def test_load_one_model_two_times(): - model_name = 'cifar' + model_name = "cifar" model_data = { - 'model_name': model_name, - 'url': '/opt/ml/models/cifar' + "model_name": model_name, + "url": "/opt/ml/models/cifar" } code_load, res = make_load_model_request(json.dumps(model_data)) assert code_load == 200 - assert 'Successfully loaded model {}'.format(model_name) in res + assert "Successfully loaded model {}".format(model_name) in res code_load2, res2 = make_load_model_request(json.dumps(model_data)) assert code_load2 == 409 - assert'Model {} is already loaded'.format(model_name) in res2 + assert "Model {} is already loaded".format(model_name) in res2 @pytest.mark.skip_gpu def test_load_non_existing_model(): - model_name = 'non-existing' - base_path = '/opt/ml/models/non-existing' + model_name = "non-existing" + base_path = "/opt/ml/models/non-existing" model_data = { - 'model_name': model_name, - 'url': base_path + "model_name": model_name, + "url": base_path } code, res = make_load_model_request(json.dumps(model_data)) assert code == 404 - assert 'Could not find valid base path {} for servable {}'.format(base_path, model_name) in str(res) + assert "Could not find valid base path {} for servable {}".format(base_path, model_name) in str(res) @pytest.mark.skip_gpu def test_bad_model_reqeust(): bad_model_data = { - 'model_name': 'model_name', - 'uri': '/opt/ml/models/non-existing' + "model_name": "model_name", + "uri": "/opt/ml/models/non-existing" } code, _ = make_load_model_request(json.dumps(bad_model_data)) assert code == 500 @@ -216,12 +216,12 @@ def test_bad_model_reqeust(): @pytest.mark.skip_gpu def test_invalid_model_version(): - model_name = 'invalid_version' - base_path = '/opt/ml/models/invalid_version' + model_name = "invalid_version" + base_path = "/opt/ml/models/invalid_version" invalid_model_version_data = { - 'model_name': model_name, - 'url': base_path + "model_name": model_name, + "url": base_path } code, res = make_load_model_request(json.dumps(invalid_model_version_data)) assert code == 404 - assert 'Could not find valid base path {} for servable {}'.format(base_path, model_name) in str(res) + assert "Could not find valid base path {} for servable {}".format(base_path, model_name) in str(res) diff --git a/test/integration/local/test_pre_post_processing.py b/test/integration/local/test_pre_post_processing.py index 59dacd2e..1106b0e5 100644 --- a/test/integration/local/test_pre_post_processing.py +++ b/test/integration/local/test_pre_post_processing.py @@ -22,41 +22,41 @@ import requests -PING_URL = 'http://localhost:8080/ping' -INVOCATIONS_URL = 'http://localhost:8080/invocations' +PING_URL = "http://localhost:8080/ping" +INVOCATIONS_URL = "http://localhost:8080/invocations" -@pytest.fixture(scope='module', autouse=True, params=['1', '2', '3', '4', '5']) +@pytest.fixture(scope="module", autouse=True, params=["1", "2", "3", "4", "5"]) def volume(tmpdir_factory, request): try: print(str(tmpdir_factory)) - model_dir = os.path.join(tmpdir_factory.mktemp('test'), 'model') - code_dir = os.path.join(model_dir, 'code') - test_example = 'test/resources/examples/test{}'.format(request.param) + model_dir = os.path.join(tmpdir_factory.mktemp("test"), "model") + code_dir = os.path.join(model_dir, "code") + test_example = "test/resources/examples/test{}".format(request.param) - model_src_dir = 'test/resources/models' + model_src_dir = "test/resources/models" shutil.copytree(model_src_dir, model_dir) shutil.copytree(test_example, code_dir) - volume_name = f'model_volume_{request.param}' + volume_name = f"model_volume_{request.param}" subprocess.check_call( - 'docker volume create --name {} --opt type=none ' - '--opt device={} --opt o=bind'.format(volume_name, model_dir).split()) + "docker volume create --name {} --opt type=none " + "--opt device={} --opt o=bind".format(volume_name, model_dir).split()) yield volume_name finally: - subprocess.check_call(f'docker volume rm {volume_name}'.split()) + subprocess.check_call(f"docker volume rm {volume_name}".split()) -@pytest.fixture(scope='module', autouse=True) +@pytest.fixture(scope="module", autouse=True) def container(volume, docker_base_name, tag, runtime_config): try: command = ( - 'docker run {}--name sagemaker-tensorflow-serving-test -p 8080:8080' - ' --mount type=volume,source={},target=/opt/ml/model,readonly' - ' -e SAGEMAKER_TFS_NGINX_LOGLEVEL=info' - ' -e SAGEMAKER_BIND_TO_PORT=8080' - ' -e SAGEMAKER_SAFE_PORT_RANGE=9000-9999' - ' {}:{} serve' + "docker run {}--name sagemaker-tensorflow-serving-test -p 8080:8080" + " --mount type=volume,source={},target=/opt/ml/model,readonly" + " -e SAGEMAKER_TFS_NGINX_LOGLEVEL=info" + " -e SAGEMAKER_BIND_TO_PORT=8080" + " -e SAGEMAKER_SAFE_PORT_RANGE=9000-9999" + " {}:{} serve" ).format(runtime_config, volume, docker_base_name, tag) proc = subprocess.Popen(command.split(), stdout=sys.stdout, stderr=subprocess.STDOUT) @@ -65,7 +65,7 @@ def container(volume, docker_base_name, tag, runtime_config): while attempts < 40: time.sleep(3) try: - res_code = requests.get('http://localhost:8080/ping').status_code + res_code = requests.get("http://localhost:8080/ping").status_code if res_code == 200: break except: @@ -74,56 +74,56 @@ def container(volume, docker_base_name, tag, runtime_config): yield proc.pid finally: - subprocess.check_call('docker rm -f sagemaker-tensorflow-serving-test'.split()) + subprocess.check_call("docker rm -f sagemaker-tensorflow-serving-test".split()) def make_headers(content_type, method): headers = { - 'Content-Type': content_type, - 'X-Amzn-SageMaker-Custom-Attributes': 'tfs-model-name=half_plus_three,tfs-method=%s' % method + "Content-Type": content_type, + "X-Amzn-SageMaker-Custom-Attributes": "tfs-model-name=half_plus_three,tfs-method=%s" % method } return headers def test_predict_json(): - headers = make_headers('application/json', 'predict') - data = '{"instances": [1.0, 2.0, 5.0]}' + headers = make_headers("application/json", "predict") + data = "{\"instances\": [1.0, 2.0, 5.0]}" response = requests.post(INVOCATIONS_URL, data=data, headers=headers).json() - assert response == {'predictions': [3.5, 4.0, 5.5]} + assert response == {"predictions": [3.5, 4.0, 5.5]} def test_zero_content(): - headers = make_headers('application/json', 'predict') - data = '' + headers = make_headers("application/json", "predict") + data = "" response = requests.post(INVOCATIONS_URL, data=data, headers=headers) assert 500 == response.status_code - assert 'document is empty' in response.text + assert "document is empty" in response.text def test_large_input(): - headers = make_headers('text/csv', 'predict') - data_file = 'test/resources/inputs/test-large.csv' + headers = make_headers("text/csv", "predict") + data_file = "test/resources/inputs/test-large.csv" - with open(data_file, 'r') as file: + with open(data_file, "r") as file: large_data = file.read() response = requests.post(INVOCATIONS_URL, data=large_data, headers=headers).json() - predictions = response['predictions'] + predictions = response["predictions"] assert len(predictions) == 753936 def test_csv_input(): - headers = make_headers('text/csv', 'predict') - data = '1.0,2.0,5.0' + headers = make_headers("text/csv", "predict") + data = "1.0,2.0,5.0" response = requests.post(INVOCATIONS_URL, data=data, headers=headers).json() - assert response == {'predictions': [3.5, 4.0, 5.5]} + assert response == {"predictions": [3.5, 4.0, 5.5]} def test_unsupported_content_type(): - headers = make_headers('unsupported-type', 'predict') - data = 'aW1hZ2UgYnl0ZXM=' + headers = make_headers("unsupported-type", "predict") + data = "aW1hZ2UgYnl0ZXM=" response = requests.post(INVOCATIONS_URL, data=data, headers=headers) assert 500 == response.status_code - assert 'unsupported content type' in response.text + assert "unsupported content type" in response.text def test_ping_service(): diff --git a/test/integration/local/test_pre_post_processing_mme.py b/test/integration/local/test_pre_post_processing_mme.py index e6209975..6a7d1b45 100644 --- a/test/integration/local/test_pre_post_processing_mme.py +++ b/test/integration/local/test_pre_post_processing_mme.py @@ -25,43 +25,43 @@ from multi_model_endpoint_test_utils import make_load_model_request, make_headers -PING_URL = 'http://localhost:8080/ping' -INVOCATION_URL = 'http://localhost:8080/models/{}/invoke' -MODEL_NAME = 'half_plus_three' +PING_URL = "http://localhost:8080/ping" +INVOCATION_URL = "http://localhost:8080/models/{}/invoke" +MODEL_NAME = "half_plus_three" -@pytest.fixture(scope='module', autouse=True) +@pytest.fixture(scope="module", autouse=True) def volume(tmpdir_factory, request): try: print(str(tmpdir_factory)) - model_dir = os.path.join(tmpdir_factory.mktemp('test'), 'model') - code_dir = os.path.join(model_dir, 'code') - test_example = 'test/resources/examples/test1' + model_dir = os.path.join(tmpdir_factory.mktemp("test"), "model") + code_dir = os.path.join(model_dir, "code") + test_example = "test/resources/examples/test1" - model_src_dir = 'test/resources/models' + model_src_dir = "test/resources/models" shutil.copytree(model_src_dir, model_dir) shutil.copytree(test_example, code_dir) - volume_name = f'model_volume_1' + volume_name = f"model_volume_1" subprocess.check_call( - 'docker volume create --name {} --opt type=none ' - '--opt device={} --opt o=bind'.format(volume_name, model_dir).split()) + "docker volume create --name {} --opt type=none " + "--opt device={} --opt o=bind".format(volume_name, model_dir).split()) yield volume_name finally: - subprocess.check_call(f'docker volume rm {volume_name}'.split()) + subprocess.check_call(f"docker volume rm {volume_name}".split()) -@pytest.fixture(scope='module', autouse=True) +@pytest.fixture(scope="module", autouse=True) def container(volume, docker_base_name, tag, runtime_config): try: command = ( - 'docker run {}--name sagemaker-tensorflow-serving-test -p 8080:8080' - ' --mount type=volume,source={},target=/opt/ml/models/half_plus_three/model,readonly' - ' -e SAGEMAKER_TFS_NGINX_LOGLEVEL=info' - ' -e SAGEMAKER_BIND_TO_PORT=8080' - ' -e SAGEMAKER_SAFE_PORT_RANGE=9000-9999' - ' -e SAGEMAKER_MULTI_MODEL=True' - ' {}:{} serve' + "docker run {}--name sagemaker-tensorflow-serving-test -p 8080:8080" + " --mount type=volume,source={},target=/opt/ml/models/half_plus_three/model,readonly" + " -e SAGEMAKER_TFS_NGINX_LOGLEVEL=info" + " -e SAGEMAKER_BIND_TO_PORT=8080" + " -e SAGEMAKER_SAFE_PORT_RANGE=9000-9999" + " -e SAGEMAKER_MULTI_MODEL=True" + " {}:{} serve" ).format(runtime_config, volume, docker_base_name, tag) proc = subprocess.Popen(command.split(), stdout=sys.stdout, stderr=subprocess.STDOUT) @@ -70,7 +70,7 @@ def container(volume, docker_base_name, tag, runtime_config): while attempts < 40: time.sleep(3) try: - res_code = requests.get('http://localhost:8080/ping').status_code + res_code = requests.get("http://localhost:8080/ping").status_code if res_code == 200: break except: @@ -79,14 +79,14 @@ def container(volume, docker_base_name, tag, runtime_config): yield proc.pid finally: - subprocess.check_call('docker rm -f sagemaker-tensorflow-serving-test'.split()) + subprocess.check_call("docker rm -f sagemaker-tensorflow-serving-test".split()) @pytest.fixture def model(): model_data = { - 'model_name': MODEL_NAME, - 'url': '/opt/ml/models/half_plus_three/model/half_plus_three' + "model_name": MODEL_NAME, + "url": "/opt/ml/models/half_plus_three/model/half_plus_three" } make_load_model_request(json.dumps(model_data)) return MODEL_NAME @@ -101,44 +101,44 @@ def test_ping_service(): @pytest.mark.skip_gpu def test_predict_json(model): headers = make_headers() - data = '{"instances": [1.0, 2.0, 5.0]}' + data = "{\"instances\": [1.0, 2.0, 5.0]}" response = requests.post(INVOCATION_URL.format(model), data=data, headers=headers).json() - assert response == {'predictions': [3.5, 4.0, 5.5]} + assert response == {"predictions": [3.5, 4.0, 5.5]} @pytest.mark.skip_gpu def test_zero_content(): headers = make_headers() - x = '' + x = "" response = requests.post(INVOCATION_URL.format(MODEL_NAME), data=x, headers=headers) assert 500 == response.status_code - assert 'document is empty' in response.text + assert "document is empty" in response.text @pytest.mark.skip_gpu def test_large_input(): - data_file = 'test/resources/inputs/test-large.csv' + data_file = "test/resources/inputs/test-large.csv" - with open(data_file, 'r') as file: + with open(data_file, "r") as file: x = file.read() - headers = make_headers(content_type='text/csv') + headers = make_headers(content_type="text/csv") response = requests.post(INVOCATION_URL.format(MODEL_NAME), data=x, headers=headers).json() - predictions = response['predictions'] + predictions = response["predictions"] assert len(predictions) == 753936 @pytest.mark.skip_gpu def test_csv_input(): - headers = make_headers(content_type='text/csv') - data = '1.0,2.0,5.0' + headers = make_headers(content_type="text/csv") + data = "1.0,2.0,5.0" response = requests.post(INVOCATION_URL.format(MODEL_NAME), data=data, headers=headers).json() - assert response == {'predictions': [3.5, 4.0, 5.5]} + assert response == {"predictions": [3.5, 4.0, 5.5]} @pytest.mark.skip_gpu def test_unsupported_content_type(): - headers = make_headers('unsupported-type', 'predict') - data = 'aW1hZ2UgYnl0ZXM=' + headers = make_headers("unsupported-type", "predict") + data = "aW1hZ2UgYnl0ZXM=" response = requests.post(INVOCATION_URL.format(MODEL_NAME), data=data, headers=headers) assert 500 == response.status_code - assert 'unsupported content type' in response.text + assert "unsupported content type" in response.text diff --git a/test/integration/local/test_tfs_batching.py b/test/integration/local/test_tfs_batching.py index 2f952124..54d893b7 100644 --- a/test/integration/local/test_tfs_batching.py +++ b/test/integration/local/test_tfs_batching.py @@ -19,41 +19,41 @@ import pytest -@pytest.fixture(scope='session', autouse=True) +@pytest.fixture(scope="session", autouse=True) def volume(): try: - model_dir = os.path.abspath('test/resources/models') + model_dir = os.path.abspath("test/resources/models") subprocess.check_call( - 'docker volume create --name batching_model_volume --opt type=none ' - '--opt device={} --opt o=bind'.format(model_dir).split()) + "docker volume create --name batching_model_volume --opt type=none " + "--opt device={} --opt o=bind".format(model_dir).split()) yield model_dir finally: - subprocess.check_call('docker volume rm batching_model_volume'.split()) + subprocess.check_call("docker volume rm batching_model_volume".split()) def test_run_tfs_with_batching_parameters(docker_base_name, tag, runtime_config): try: command = ( - 'docker run {}--name sagemaker-tensorflow-serving-test -p 8080:8080' - ' --mount type=volume,source=batching_model_volume,target=/opt/ml/model,readonly' - ' -e SAGEMAKER_TFS_ENABLE_BATCHING=true' - ' -e SAGEMAKER_TFS_MAX_BATCH_SIZE=16' - ' -e SAGEMAKER_TFS_BATCH_TIMEOUT_MICROS=500' - ' -e SAGEMAKER_TFS_NUM_BATCH_THREADS=100' - ' -e SAGEMAKER_TFS_MAX_ENQUEUED_BATCHES=1' - ' -e SAGEMAKER_TFS_NGINX_LOGLEVEL=info' - ' -e SAGEMAKER_BIND_TO_PORT=8080' - ' -e SAGEMAKER_SAFE_PORT_RANGE=9000-9999' - ' {}:{} serve' + "docker run {}--name sagemaker-tensorflow-serving-test -p 8080:8080" + " --mount type=volume,source=batching_model_volume,target=/opt/ml/model,readonly" + " -e SAGEMAKER_TFS_ENABLE_BATCHING=true" + " -e SAGEMAKER_TFS_MAX_BATCH_SIZE=16" + " -e SAGEMAKER_TFS_BATCH_TIMEOUT_MICROS=500" + " -e SAGEMAKER_TFS_NUM_BATCH_THREADS=100" + " -e SAGEMAKER_TFS_MAX_ENQUEUED_BATCHES=1" + " -e SAGEMAKER_TFS_NGINX_LOGLEVEL=info" + " -e SAGEMAKER_BIND_TO_PORT=8080" + " -e SAGEMAKER_SAFE_PORT_RANGE=9000-9999" + " {}:{} serve" ).format(runtime_config, docker_base_name, tag) proc = subprocess.Popen(command.split(), stdout=subprocess.PIPE, stderr=subprocess.STDOUT) lines_seen = { - 'max_batch_size { value: 16 }': 0, - 'batch_timeout_micros { value: 500 }': 0, - 'num_batch_threads { value: 100 }': 0, - 'max_enqueued_batches { value: 1 }': 0 + "max_batch_size { value: 16 }": 0, + "batch_timeout_micros { value: 500 }": 0, + "num_batch_threads { value: 100 }": 0, + "max_enqueued_batches { value: 1 }": 0 } for stdout_line in iter(proc.stdout.readline, ""): @@ -67,4 +67,4 @@ def test_run_tfs_with_batching_parameters(docker_base_name, tag, runtime_config) break finally: - subprocess.check_call('docker rm -f sagemaker-tensorflow-serving-test'.split()) + subprocess.check_call("docker rm -f sagemaker-tensorflow-serving-test".split()) diff --git a/test/integration/sagemaker/conftest.py b/test/integration/sagemaker/conftest.py index d4e3083d..5009979a 100644 --- a/test/integration/sagemaker/conftest.py +++ b/test/integration/sagemaker/conftest.py @@ -19,113 +19,113 @@ # these regions have some p2 and p3 instances, but not enough for automated testing NO_P2_REGIONS = [ - 'ca-central-1', - 'eu-central-1', - 'eu-west-2', - 'us-west-1', - 'eu-west-3', - 'eu-north-1', - 'sa-east-1', - 'ap-east-1', - 'me-south-1' + "ca-central-1", + "eu-central-1", + "eu-west-2", + "us-west-1", + "eu-west-3", + "eu-north-1", + "sa-east-1", + "ap-east-1", + "me-south-1" ] NO_P3_REGIONS = [ - 'ap-southeast-1', - 'ap-southeast-2', - 'ap-south-1', - 'ca-central-1', - 'eu-central-1', - 'eu-west-2', - 'us-west-1', - 'eu-west-3', - 'eu-north-1', - 'sa-east-1', - 'ap-east-1', - 'me-south-1' + "ap-southeast-1", + "ap-southeast-2", + "ap-south-1", + "a-central-1", + "eu-central-1", + "eu-west-2", + "us-west-1", + "eu-west-3", + "eu-north-1", + "sa-east-1", + "ap-east-1", + "me-south-1" ] def pytest_addoption(parser): - parser.addoption('--region', default='us-west-2') - parser.addoption('--registry') - parser.addoption('--repo') - parser.addoption('--versions') - parser.addoption('--instance-types') - parser.addoption('--accelerator-type') - parser.addoption('--tag') + parser.addoption("--region", default="us-west-2") + parser.addoption("--registry") + parser.addoption("--repo") + parser.addoption("--versions") + parser.addoption("--instance-types") + parser.addoption("--accelerator-type") + parser.addoption("--tag") def pytest_configure(config): - os.environ['TEST_REGION'] = config.getoption('--region') - os.environ['TEST_VERSIONS'] = config.getoption('--versions') or '1.11.1,1.12.0,1.13.0' - os.environ['TEST_INSTANCE_TYPES'] = (config.getoption('--instance-types') or - 'ml.m5.xlarge,ml.p2.xlarge') + os.environ["TEST_REGION"] = config.getoption("--region") + os.environ["TEST_VERSIONS"] = config.getoption("--versions") or "1.11.1,1.12.0,1.13.0" + os.environ["TEST_INSTANCE_TYPES"] = (config.getoption("--instance-types") or + "ml.m5.xlarge,ml.p2.xlarge") - os.environ['TEST_EI_VERSIONS'] = config.getoption('--versions') or '1.11,1.12' - os.environ['TEST_EI_INSTANCE_TYPES'] = (config.getoption('--instance-types') or - 'ml.m5.xlarge') + os.environ["TEST_EI_VERSIONS"] = config.getoption("--versions") or "1.11,1.12" + os.environ["TEST_EI_INSTANCE_TYPES"] = (config.getoption("--instance-types") or + "ml.m5.xlarge") - if config.getoption('--tag'): - os.environ['TEST_VERSIONS'] = config.getoption('--tag') - os.environ['TEST_EI_VERSIONS'] = config.getoption('--tag') + if config.getoption("--tag"): + os.environ["TEST_VERSIONS"] = config.getoption("--tag") + os.environ["TEST_EI_VERSIONS"] = config.getoption("--tag") -@pytest.fixture(scope='session') +@pytest.fixture(scope="session") def region(request): - return request.config.getoption('--region') + return request.config.getoption("--region") -@pytest.fixture(scope='session') +@pytest.fixture(scope="session") def registry(request, region): - if request.config.getoption('--registry'): - return request.config.getoption('--registry') + if request.config.getoption("--registry"): + return request.config.getoption("--registry") sts = boto3.client( - 'sts', + "sts", region_name=region, - endpoint_url='https://sts.{}.amazonaws.com'.format(region) + endpoint_url="https://sts.{}.amazonaws.com".format(region) ) - return sts.get_caller_identity()['Account'] + return sts.get_caller_identity()["Account"] -@pytest.fixture(scope='session') +@pytest.fixture(scope="session") def boto_session(region): return boto3.Session(region_name=region) -@pytest.fixture(scope='session') +@pytest.fixture(scope="session") def sagemaker_client(boto_session): - return boto_session.client('sagemaker') + return boto_session.client("sagemaker") -@pytest.fixture(scope='session') +@pytest.fixture(scope="session") def sagemaker_runtime_client(boto_session): - return boto_session.client('runtime.sagemaker') + return boto_session.client("runtime.sagemaker") def unique_name_from_base(base, max_length=63): - unique = '%04x' % random.randrange(16 ** 4) # 4-digit hex + unique = "%04x" % random.randrange(16 ** 4) # 4-digit hex ts = str(int(time.time())) available_length = max_length - 2 - len(ts) - len(unique) trimmed = base[:available_length] - return '{}-{}-{}'.format(trimmed, ts, unique) + return "{}-{}-{}".format(trimmed, ts, unique) @pytest.fixture def model_name(): - return unique_name_from_base('test-tfs') + return unique_name_from_base("test-tfs") @pytest.fixture(autouse=True) def skip_gpu_instance_restricted_regions(region, instance_type): - if (region in NO_P2_REGIONS and instance_type.startswith('ml.p2')) or \ - (region in NO_P3_REGIONS and instance_type.startswith('ml.p3')): - pytest.skip('Skipping GPU test in region {}'.format(region)) + if (region in NO_P2_REGIONS and instance_type.startswith("ml.p2")) or \ + (region in NO_P3_REGIONS and instance_type.startswith("ml.p3")): + pytest.skip("Skipping GPU test in region {}".format(region)) @pytest.fixture(autouse=True) def skip_by_device_type(request, instance_type): - is_gpu = instance_type[3] in ['g', 'p'] - if (request.node.get_closest_marker('skip_gpu') and is_gpu) or \ - (request.node.get_closest_marker('skip_cpu') and not is_gpu): - pytest.skip('Skipping because running on \'{}\' instance'.format(instance_type)) \ No newline at end of file + is_gpu = instance_type[3] in ["g", "p"] + if (request.node.get_closest_marker("skip_gpu") and is_gpu) or \ + (request.node.get_closest_marker("skip_cpu") and not is_gpu): + pytest.skip("Skipping because running on \"{}\" instance".format(instance_type)) diff --git a/test/integration/sagemaker/test_ei.py b/test/integration/sagemaker/test_ei.py index e91bad11..c7244e80 100644 --- a/test/integration/sagemaker/test_ei.py +++ b/test/integration/sagemaker/test_ei.py @@ -17,23 +17,23 @@ import util -EI_SUPPORTED_REGIONS = ['us-east-1', 'us-east-2', 'us-west-2', - 'eu-west-1', 'ap-northeast-1', 'ap-northeast-2'] +EI_SUPPORTED_REGIONS = ["us-east-1", "us-east-2", "us-west-2", + "eu-west-1", "ap-northeast-1", "ap-northeast-2"] -@pytest.fixture(params=os.environ['TEST_EI_VERSIONS'].split(',')) +@pytest.fixture(params=os.environ["TEST_EI_VERSIONS"].split(",")) def version(request): return request.param @pytest.fixture def repo(request): - return request.config.getoption('--repo') or 'sagemaker-tensorflow-serving-eia' + return request.config.getoption("--repo") or "sagemaker-tensorflow-serving-eia" @pytest.fixture def tag(request, version): - return request.config.getoption('--tag') or f'{version}-cpu' + return request.config.getoption("--tag") or f"{version}-cpu" @pytest.fixture @@ -41,37 +41,37 @@ def image_uri(registry, region, repo, tag): return util.image_uri(registry, region, repo, tag) -@pytest.fixture(params=os.environ['TEST_EI_INSTANCE_TYPES'].split(',')) +@pytest.fixture(params=os.environ["TEST_EI_INSTANCE_TYPES"].split(",")) def instance_type(request, region): return request.param -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def accelerator_type(request): - return request.config.getoption('--accelerator-type') or 'ml.eia1.medium' + return request.config.getoption("--accelerator-type") or "ml.eia1.medium" -@pytest.fixture(scope='session') +@pytest.fixture(scope="session") def model_data(region): - return ('s3://sagemaker-sample-data-{}/tensorflow/model' - '/resnet/resnet_50_v2_fp32_NCHW.tar.gz').format(region) + return ("s3://sagemaker-sample-data-{}/tensorflow/model" + "/resnet/resnet_50_v2_fp32_NCHW.tar.gz").format(region) @pytest.fixture def input_data(): - return {'instances': [[[[random.random() for _ in range(3)] for _ in range(3)]]]} + return {"instances": [[[[random.random() for _ in range(3)] for _ in range(3)]]]} @pytest.fixture def skip_if_no_accelerator(accelerator_type): if accelerator_type is None: - pytest.skip('Skipping because accelerator type was not provided') + pytest.skip("Skipping because accelerator type was not provided") @pytest.fixture def skip_if_non_supported_ei_region(region): if region not in EI_SUPPORTED_REGIONS: - pytest.skip('EI is not supported in {}'.format(region)) + pytest.skip("EI is not supported in {}".format(region)) @pytest.mark.skip_if_non_supported_ei_region() diff --git a/test/integration/sagemaker/test_tfs.py b/test/integration/sagemaker/test_tfs.py index 436351d8..f73ce35c 100644 --- a/test/integration/sagemaker/test_tfs.py +++ b/test/integration/sagemaker/test_tfs.py @@ -16,27 +16,27 @@ import util -NON_P3_REGIONS = ['ap-southeast-1', 'ap-southeast-2', 'ap-south-1', - 'ca-central-1', 'eu-central-1', 'eu-west-2', 'us-west-1'] +NON_P3_REGIONS = ["ap-southeast-1", "ap-southeast-2", "ap-south-1", + "ca-central-1", "eu-central-1", "eu-west-2", "us-west-1"] -@pytest.fixture(params=os.environ['TEST_VERSIONS'].split(',')) +@pytest.fixture(params=os.environ["TEST_VERSIONS"].split(",")) def version(request): return request.param -@pytest.fixture(scope='session') +@pytest.fixture(scope="session") def repo(request): - return request.config.getoption('--repo') or 'sagemaker-tensorflow-serving' + return request.config.getoption("--repo") or "sagemaker-tensorflow-serving" @pytest.fixture def tag(request, version, instance_type): - if request.config.getoption('--tag'): - return request.config.getoption('--tag') + if request.config.getoption("--tag"): + return request.config.getoption("--tag") - arch = 'gpu' if instance_type.startswith('ml.p') else 'cpu' - return f'{version}-{arch}' + arch = "gpu" if instance_type.startswith("ml.p") else "cpu" + return f"{version}-{arch}" @pytest.fixture @@ -44,41 +44,41 @@ def image_uri(registry, region, repo, tag): return util.image_uri(registry, region, repo, tag) -@pytest.fixture(params=os.environ['TEST_INSTANCE_TYPES'].split(',')) +@pytest.fixture(params=os.environ["TEST_INSTANCE_TYPES"].split(",")) def instance_type(request, region): return request.param -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def accelerator_type(): return None -@pytest.fixture(scope='session') +@pytest.fixture(scope="session") def tfs_model(region, boto_session): return util.find_or_put_model_data(region, boto_session, - 'test/data/tfs-model.tar.gz') + "test/data/tfs-model.tar.gz") @pytest.fixture(scope='session') def python_model_with_requirements(region, boto_session): return util.find_or_put_model_data(region, boto_session, - 'test/data/python-with-requirements.tar.gz') + "test/data/python-with-requirements.tar.gz") @pytest.fixture(scope='session') def python_model_with_lib(region, boto_session): return util.find_or_put_model_data(region, boto_session, - 'test/data/python-with-lib.tar.gz') + "test/data/python-with-lib.tar.gz") def test_tfs_model(boto_session, sagemaker_client, sagemaker_runtime_client, model_name, tfs_model, image_uri, instance_type, accelerator_type): - input_data = {'instances': [1.0, 2.0, 5.0]} + input_data = {"instances": [1.0, 2.0, 5.0]} util.create_and_invoke_endpoint(boto_session, sagemaker_client, sagemaker_runtime_client, model_name, tfs_model, image_uri, instance_type, accelerator_type, input_data) @@ -104,34 +104,34 @@ def test_python_model_with_requirements(boto_session, sagemaker_client, python_model_with_requirements, image_uri, instance_type, accelerator_type): - if 'p3' in instance_type: - pytest.skip('skip for p3 instance') + if "p3" in instance_type: + pytest.skip("skip for p3 instance") # the python service needs to transform this to get a valid prediction - input_data = {'x': [1.0, 2.0, 5.0]} + input_data = {"x": [1.0, 2.0, 5.0]} output_data = util.create_and_invoke_endpoint(boto_session, sagemaker_client, sagemaker_runtime_client, model_name, python_model_with_requirements, image_uri, instance_type, accelerator_type, input_data) # python service adds this to tfs response - assert output_data['python'] is True - assert output_data['pillow'] == '6.0.0' + assert output_data["python"] is True + assert output_data["pillow"] == "6.0.0" def test_python_model_with_lib(boto_session, sagemaker_client, sagemaker_runtime_client, model_name, python_model_with_lib, image_uri, instance_type, accelerator_type): - if 'p3' in instance_type: - pytest.skip('skip for p3 instance') + if "p3" in instance_type: + pytest.skip("skip for p3 instance") # the python service needs to transform this to get a valid prediction - input_data = {'x': [1.0, 2.0, 5.0]} + input_data = {"x": [1.0, 2.0, 5.0]} output_data = util.create_and_invoke_endpoint(boto_session, sagemaker_client, sagemaker_runtime_client, model_name, python_model_with_lib, image_uri, instance_type, accelerator_type, input_data) # python service adds this to tfs response - assert output_data['python'] is True - assert output_data['dummy_module'] == '0.1' + assert output_data["python"] is True + assert output_data["dummy_module"] == "0.1" diff --git a/test/integration/sagemaker/util.py b/test/integration/sagemaker/util.py index 9118fad7..f5247b17 100644 --- a/test/integration/sagemaker/util.py +++ b/test/integration/sagemaker/util.py @@ -20,15 +20,15 @@ import time logger = logging.getLogger(__name__) -BATCH_CSV = os.path.join('test', 'data', 'batch.csv') +BATCH_CSV = os.path.join("test", "data", "batch.csv") def image_uri(registry, region, repo, tag): - return f'{registry}.dkr.ecr.{region}.amazonaws.com/{repo}:{tag}' + return f"{registry}.dkr.ecr.{region}.amazonaws.com/{repo}:{tag}" def _execution_role(boto_session): - return boto_session.resource('iam').Role('SageMakerRole').arn + return boto_session.resource("iam").Role("SageMakerRole").arn @contextlib.contextmanager @@ -37,81 +37,81 @@ def sagemaker_model(boto_session, sagemaker_client, image_uri, model_name, model ModelName=model_name, ExecutionRoleArn=_execution_role(boto_session), PrimaryContainer={ - 'Image': image_uri, - 'ModelDataUrl': model_data + "Image": image_uri, + "ModelDataUrl": model_data }) try: yield model finally: - logger.info('deleting model %s', model_name) + logger.info("deleting model %s", model_name) sagemaker_client.delete_model(ModelName=model_name) def _production_variants(model_name, instance_type, accelerator_type): production_variants = [{ - 'VariantName': 'AllTraffic', - 'ModelName': model_name, - 'InitialInstanceCount': 1, - 'InstanceType': instance_type + "VariantName": "AllTraffic", + "ModelName": model_name, + "InitialInstanceCount": 1, + "InstanceType": instance_type }] if accelerator_type: - production_variants[0]['AcceleratorType'] = accelerator_type + production_variants[0]["AcceleratorType"] = accelerator_type return production_variants def _test_bucket(region, boto_session): sts = boto_session.client( - 'sts', + "sts", region_name=region, - endpoint_url='https://sts.{}.amazonaws.com'.format(region) + endpoint_url="https://sts.{}.amazonaws.com".format(region) ) - account = sts.get_caller_identity()['Account'] - return f'sagemaker-{region}-{account}' + account = sts.get_caller_identity()["Account"] + return f"sagemaker-{region}-{account}" def find_or_put_model_data(region, boto_session, local_path): model_file = os.path.basename(local_path) bucket = _test_bucket(region, boto_session) - key = f'test-tfs/{model_file}' + key = f"test-tfs/{model_file}" - s3 = boto_session.client('s3', region) + s3 = boto_session.client("s3", region) try: s3.head_bucket(Bucket=bucket) except botocore.exceptions.ClientError as e: - if e.response['Error']['Code'] != '404': + if e.response["Error"]["Code"] != "404": raise # bucket doesn't exist, create it - if region == 'us-east-1': + if region == "us-east-1": s3.create_bucket(Bucket=bucket) else: s3.create_bucket(Bucket=bucket, - CreateBucketConfiguration={'LocationConstraint': region}) + CreateBucketConfiguration={"LocationConstraint": region}) try: s3.head_object(Bucket=bucket, Key=key) except botocore.exceptions.ClientError as e: - if e.response['Error']['Code'] != '404': + if e.response["Error"]["Code"] != "404": raise # file doesn't exist - upload it s3.upload_file(local_path, bucket, key) - return f's3://{bucket}/{key}' + return f"s3://{bucket}/{key}" @contextlib.contextmanager def sagemaker_endpoint(sagemaker_client, model_name, instance_type, accelerator_type=None): - logger.info('creating endpoint %s', model_name) + logger.info("creating endpoint %s", model_name) # Add jitter so we can run tests in parallel without running into service side limits. delay = round(random.random()*5, 3) - logger.info('waiting for {} seconds'.format(delay)) + logger.info("waiting for {} seconds".format(delay)) time.sleep(delay) production_variants = _production_variants(model_name, instance_type, accelerator_type) @@ -121,74 +121,74 @@ def sagemaker_endpoint(sagemaker_client, model_name, instance_type, accelerator_ sagemaker_client.create_endpoint(EndpointName=model_name, EndpointConfigName=model_name) try: - sagemaker_client.get_waiter('endpoint_in_service').wait(EndpointName=model_name) + sagemaker_client.get_waiter("endpoint_in_service").wait(EndpointName=model_name) finally: - status = sagemaker_client.describe_endpoint(EndpointName=model_name)['EndpointStatus'] - if status != 'InService': - raise ValueError(f'failed to create endpoint {model_name}') + status = sagemaker_client.describe_endpoint(EndpointName=model_name)["EndpointStatus"] + if status != "InService": + raise ValueError(f"failed to create endpoint {model_name}") try: yield model_name # return the endpoint name finally: - logger.info('deleting endpoint and endpoint config %s', model_name) + logger.info("deleting endpoint and endpoint config %s", model_name) sagemaker_client.delete_endpoint(EndpointName=model_name) sagemaker_client.delete_endpoint_config(EndpointConfigName=model_name) def _create_transform_job_request(model_name, batch_output, batch_input, instance_type): return { - 'TransformJobName': model_name, - 'ModelName': model_name, - 'BatchStrategy': 'MultiRecord', - 'TransformOutput': { - 'S3OutputPath': batch_output + "TransformJobName": model_name, + "ModelName": model_name, + "BatchStrategy": "MultiRecord", + "TransformOutput": { + "S3OutputPath": batch_output }, - 'TransformInput': { - 'DataSource': { - 'S3DataSource': { - 'S3DataType': 'S3Prefix', - 'S3Uri': batch_input + "TransformInput": { + "DataSource": { + "S3DataSource": { + "S3DataType": "S3Prefix", + "S3Uri": batch_input } }, - 'ContentType': 'text/csv', - 'SplitType': 'Line', - 'CompressionType': 'None' + "ContentType": "text/csv", + "SplitType": "Line", + "CompressionType": "None" }, - 'TransformResources': { - 'InstanceType': instance_type, - 'InstanceCount': 1 + "TransformResources": { + "InstanceType": instance_type, + "InstanceCount": 1 } } def _read_batch_output(region, boto_session, bucket, model_name): - s3 = boto_session.client('s3', region) - output_file = f'/tmp/{model_name}.out' - s3.download_file(bucket, f'output/{model_name}/batch.csv.out', output_file) - return json.loads(open(output_file, 'r').read())['predictions'] + s3 = boto_session.client("s3", region) + output_file = f"/tmp/{model_name}.out" + s3.download_file(bucket, f"output/{model_name}/batch.csv.out", output_file) + return json.loads(open(output_file, "r").read())["predictions"] def _wait_for_transform_job(region, boto_session, sagemaker_client, model_name, poll, timeout): - status = sagemaker_client.describe_transform_job(TransformJobName=model_name)['TransformJobStatus'] + status = sagemaker_client.describe_transform_job(TransformJobName=model_name)["TransformJobStatus"] job_runtime = 0 - while status == 'InProgress': + while status == "InProgress": - logger.info(f'Waiting for batch transform job {model_name} to finish') + logger.info(f"Waiting for batch transform job {model_name} to finish") time.sleep(poll) job_runtime += poll if job_runtime > timeout: - raise ValueError(f'Batch transform job {model_name} exceeded maximum runtime {timeout} seconds') + raise ValueError(f"Batch transform job {model_name} exceeded maximum runtime {timeout} seconds") - status = sagemaker_client.describe_transform_job(TransformJobName=model_name)['TransformJobStatus'] - if status == 'Completed': + status = sagemaker_client.describe_transform_job(TransformJobName=model_name)["TransformJobStatus"] + if status == "Completed": return _read_batch_output(region=region, boto_session=boto_session, bucket=_test_bucket(region, boto_session), model_name=model_name) - if status == 'Failed': - raise ValueError(f'Failed to execute batch transform job {model_name}') - if status in ['Stopped', 'Stopping']: - raise ValueError(f'Batch transform job {model_name} was stopped') + if status == "Failed": + raise ValueError(f"Failed to execute batch transform job {model_name}") + if status in ["Stopped", "Stopping"]: + raise ValueError(f"Batch transform job {model_name} was stopped") def run_batch_transform_job(region, boto_session, model_data, image_uri, @@ -198,7 +198,7 @@ def run_batch_transform_job(region, boto_session, model_data, image_uri, with sagemaker_model(boto_session, sagemaker_client, image_uri, model_name, model_data): batch_input = find_or_put_model_data(region, boto_session, BATCH_CSV) bucket = _test_bucket(region, boto_session) - batch_output = f's3://{bucket}/output/{model_name}' + batch_output = f"s3://{bucket}/output/{model_name}" request = _create_transform_job_request( model_name=model_name, batch_input=batch_input, @@ -216,10 +216,10 @@ def run_batch_transform_job(region, boto_session, model_data, image_uri, def invoke_endpoint(sagemaker_runtime_client, endpoint_name, input_data): response = sagemaker_runtime_client.invoke_endpoint(EndpointName=endpoint_name, - ContentType='application/json', + ContentType="application/json", Body=json.dumps(input_data)) - result = json.loads(response['Body'].read().decode()) - assert result['predictions'] is not None + result = json.loads(response["Body"].read().decode()) + assert result["predictions"] is not None return result