Skip to content

feat: Support for ModelBuilder In_Process Mode (1/2) #4784

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 30 additions & 6 deletions src/sagemaker/serve/builder/model_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from sagemaker.serve.mode.function_pointers import Mode
from sagemaker.serve.mode.sagemaker_endpoint_mode import SageMakerEndpointMode
from sagemaker.serve.mode.local_container_mode import LocalContainerMode
from sagemaker.serve.mode.in_process_mode import InProcessMode
from sagemaker.serve.detector.pickler import save_pkl, save_xgboost
from sagemaker.serve.builder.serve_settings import _ServeSettings
from sagemaker.serve.builder.djl_builder import DJL
Expand Down Expand Up @@ -410,7 +411,7 @@ def _prepare_for_mode(
)
self.env_vars.update(env_vars_sagemaker)
return self.s3_upload_path, env_vars_sagemaker
if self.mode == Mode.LOCAL_CONTAINER:
elif self.mode == Mode.LOCAL_CONTAINER:
# init the LocalContainerMode object
self.modes[str(Mode.LOCAL_CONTAINER)] = LocalContainerMode(
inference_spec=self.inference_spec,
Expand All @@ -422,9 +423,22 @@ def _prepare_for_mode(
)
self.modes[str(Mode.LOCAL_CONTAINER)].prepare()
return None
elif self.mode == Mode.IN_PROCESS:
# init the InProcessMode object
self.modes[str(Mode.IN_PROCESS)] = InProcessMode(
inference_spec=self.inference_spec,
schema_builder=self.schema_builder,
session=self.sagemaker_session,
model_path=self.model_path,
env_vars=self.env_vars,
model_server=self.model_server,
)
self.modes[str(Mode.IN_PROCESS)].prepare()
return None

raise ValueError(
"Please specify mode in: %s, %s" % (Mode.LOCAL_CONTAINER, Mode.SAGEMAKER_ENDPOINT)
"Please specify mode in: %s, %s, %s"
% (Mode.LOCAL_CONTAINER, Mode.SAGEMAKER_ENDPOINT, Mode.IN_PROCESS)
)

def _get_client_translators(self):
Expand Down Expand Up @@ -603,10 +617,12 @@ def _overwrite_mode_in_deploy(self, overwrite_mode: str):
s3_upload_path, env_vars_sagemaker = self._prepare_for_mode()
self.pysdk_model.model_data = s3_upload_path
self.pysdk_model.env.update(env_vars_sagemaker)

elif overwrite_mode == Mode.LOCAL_CONTAINER:
self.mode = self.pysdk_model.mode = Mode.LOCAL_CONTAINER
self._prepare_for_mode()
elif overwrite_mode == Mode.IN_PROCESS:
self.mode = self.pysdk_model.mode = Mode.IN_PROCESS
self._prepare_for_mode()
else:
raise ValueError("Mode %s is not supported!" % overwrite_mode)

Expand Down Expand Up @@ -796,9 +812,10 @@ def _initialize_for_mlflow(self, artifact_path: str) -> None:
self.dependencies.update({"requirements": mlflow_model_dependency_path})

# Model Builder is a class to build the model for deployment.
# It supports two modes of deployment
# It supports two* modes of deployment
# 1/ SageMaker Endpoint
# 2/ Local launch with container
# 3/ In process mode with Transformers server in beta release
def build( # pylint: disable=R0911
self,
mode: Type[Mode] = None,
Expand Down Expand Up @@ -896,8 +913,15 @@ def build( # pylint: disable=R0911

def _build_validations(self):
"""Validations needed for model server overrides, or auto-detection or fallback"""
if self.mode == Mode.IN_PROCESS:
raise ValueError("IN_PROCESS mode is not supported yet!")
if self.mode == Mode.IN_PROCESS and self.model_server is not ModelServer.MMS:
raise ValueError(
"IN_PROCESS mode is only supported for MMS/Transformers server in beta release."
)

if self.mode == Mode.IN_PROCESS and self.model_server == ModelServer.MMS:
Copy link
Collaborator

@samruds samruds Jul 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this check will In process mode be accessed (since we are saying if Mode.INPROCESS and ModelServer.MMS throw an exception)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated it, thank you, let me know if there's anything else I can add.

raise Exception(
"IN_PROCESS mode is supported for MMS/Transformers server in beta release."
)

if self.inference_spec and self.model:
raise ValueError("Can only set one of the following: model, inference_spec.")
Expand Down
26 changes: 23 additions & 3 deletions src/sagemaker/serve/builder/transformers_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@
)
from sagemaker.serve.detector.pickler import save_pkl
from sagemaker.serve.utils.optimize_utils import _is_optimized
from sagemaker.serve.utils.predictors import TransformersLocalModePredictor
from sagemaker.serve.utils.predictors import (
TransformersLocalModePredictor,
TransformersInProcessModePredictor,
)
from sagemaker.serve.utils.types import ModelServer
from sagemaker.serve.mode.function_pointers import Mode
from sagemaker.serve.utils.telemetry_logger import _capture_telemetry
Expand All @@ -44,6 +47,7 @@

logger = logging.getLogger(__name__)
DEFAULT_TIMEOUT = 1800
LOCAL_MODES = [Mode.LOCAL_CONTAINER, Mode.IN_PROCESS]


"""Retrieves images for different libraries - Pytorch, TensorFlow from HuggingFace hub
Expand Down Expand Up @@ -161,7 +165,7 @@ def _get_hf_metadata_create_model(self) -> Type[Model]:
vpc_config=self.vpc_config,
)

if not self.image_uri and self.mode == Mode.LOCAL_CONTAINER:
if self.mode in LOCAL_MODES:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still need the not self.image_uri here . The new check will enable local mode for both IN_PROCESS and LOCAL_CONTAINER mode. Think the check needs to be kept as is, and not altered to use the List of Modes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch, I had missed that. It is returned to how it was

self.image_uri = pysdk_model.serving_image_uri(
self.sagemaker_session.boto_region_name, "local"
)
Expand Down Expand Up @@ -227,6 +231,22 @@ def _transformers_model_builder_deploy_wrapper(self, *args, **kwargs) -> Type[Pr
)
return predictor

if self.mode == Mode.IN_PROCESS:
timeout = kwargs.get("model_data_download_timeout")

predictor = TransformersInProcessModePredictor(
self.modes[str(Mode.IN_PROCESS)], serializer, deserializer
)

self.modes[str(Mode.IN_PROCESS)].create_server(
self.image_uri,
timeout if timeout else DEFAULT_TIMEOUT,
None,
predictor,
self.pysdk_model.env,
)
return predictor

if "mode" in kwargs:
del kwargs["mode"]
if "role" in kwargs:
Expand Down Expand Up @@ -274,7 +294,7 @@ def _build_transformers_env(self):

self.pysdk_model = self._create_transformers_model()

if self.mode == Mode.LOCAL_CONTAINER:
if self.mode in LOCAL_MODES:
self._prepare_for_mode()

return self.pysdk_model
Expand Down
102 changes: 102 additions & 0 deletions src/sagemaker/serve/mode/in_process_mode.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
"""Module that defines the InProcessMode class"""

from __future__ import absolute_import
from pathlib import Path
import logging
from typing import Dict, Type
import time

from sagemaker.base_predictor import PredictorBase
from sagemaker.serve.spec.inference_spec import InferenceSpec
from sagemaker.serve.builder.schema_builder import SchemaBuilder
from sagemaker.serve.utils.types import ModelServer
from sagemaker.serve.utils.exceptions import LocalDeepPingException
from sagemaker.serve.model_server.multi_model_server.server import InProcessMultiModelServer
from sagemaker.session import Session

logger = logging.getLogger(__name__)

_PING_HEALTH_CHECK_FAIL_MSG = (
"Ping health check did not pass. "
+ "Please increase container_timeout_seconds or review your inference code."
)


class InProcessMode(
InProcessMultiModelServer,
):
"""A class that holds methods to deploy model to a container in process environment"""

def __init__(
self,
model_server: ModelServer,
inference_spec: Type[InferenceSpec],
schema_builder: Type[SchemaBuilder],
session: Session,
model_path: str = None,
env_vars: Dict = None,
):
# pylint: disable=bad-super-call
super().__init__()

self.inference_spec = inference_spec
self.model_path = model_path
self.env_vars = env_vars
self.session = session
self.schema_builder = schema_builder
self.ecr = session.boto_session.client("ecr")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can remove self.ecr and container specific things here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

self.model_server = model_server
self.client = None
self.container = None
self.secret_key = None
self._ping_container = None
self._invoke_serving = None

def load(self, model_path: str = None):
"""Loads model path, checks that path exists"""
path = Path(model_path if model_path else self.model_path)
if not path.exists():
raise Exception("model_path does not exist")
if not path.is_dir():
raise Exception("model_path is not a valid directory")

return self.inference_spec.load(str(path))

def prepare(self):
"""Prepares the server"""

def create_server(
self,
image: str,
secret_key: str,
predictor: PredictorBase,
env_vars: Dict[str, str] = None,
model_path: str = None,
):
"""Creating the server and checking ping health."""

# self.destroy_server()

logger.info("Waiting for model server %s to start up...", self.model_server)

if self.model_server == ModelServer.MMS:
self._start_serving(
client=self.client,
image=image,
model_path=model_path if model_path else self.model_path,
secret_key=secret_key,
env_vars=env_vars if env_vars else self.env_vars,
)
logger.info("Starting PING")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this log line too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed :)

self._ping_container = self._multi_model_server_deep_ping

while True:
time.sleep(10)

healthy, response = self._ping_container(predictor)
if healthy:
logger.debug("Ping health check has passed. Returned %s", str(response))
break

if not healthy:
raise LocalDeepPingException(_PING_HEALTH_CHECK_FAIL_MSG)
82 changes: 79 additions & 3 deletions src/sagemaker/serve/model_server/multi_model_server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,82 @@
logger = logging.getLogger(__name__)


class InProcessMultiModelServer:
"""In Process Mode Multi Model server instance"""

def _start_serving(
self,
client: object,
image: str,
model_path: str,
secret_key: str,
env_vars: dict,
):
"""Initializes the start of the server"""
env = {
"SAGEMAKER_SUBMIT_DIRECTORY": "/opt/ml/model/code",
"SAGEMAKER_PROGRAM": "inference.py",
"SAGEMAKER_SERVE_SECRET_KEY": secret_key,
"LOCAL_PYTHON": platform.python_version(),
}
if env_vars:
env_vars.update(env)
else:
env_vars = env

self.container = client.containers.run(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we spinning up a docker container or using fast api for serving?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The container will be stubbed, this is only 1/2 of the full implementation of InProcess mode. My next PR will include the FastAPI.

image,
"serve",
network_mode="host",
detach=True,
auto_remove=True,
volumes={
Path(model_path).joinpath("code"): {
"bind": MODE_DIR_BINDING,
"mode": "rw",
},
},
environment=env_vars,
)

def _invoke_multi_model_server_serving(self, request: object, content_type: str, accept: str):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would leave these methods as stubs .... return an Exception("Not implemented")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have stubbed it, thank you.

"""Invokes the MMS server by sending POST request"""
logger.info(content_type)
logger.info(accept)

try:
response = requests.post(
"http://0.0.0.0:8080/invocations",
data=request,
headers={"Content-Type": content_type, "Accept": accept},
timeout=600,
)
response.raise_for_status()

logger.info(response.content)

return response.content
except Exception as e:
raise Exception("Unable to send request to the local container server") from e

return (True, response)

def _multi_model_server_deep_ping(self, predictor: PredictorBase):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this complete?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have stubbed it.

"""Sends a deep ping to ensure prediction"""
response = None

# try:
# response = predictor.predict(self.schema_builder.sample_input)
# return True, response
# # pylint: disable=broad-except
# except Exception as e:
# if "422 Client Error: Unprocessable Entity for url" in str(e):
# raise LocalModelInvocationException(str(e))
# return False, response

return (True, response)


class LocalMultiModelServer:
"""Local Multi Model server instance"""

Expand All @@ -31,7 +107,7 @@ def _start_serving(
secret_key: str,
env_vars: dict,
):
"""Placeholder docstring"""
"""Initializes the start of the server"""
env = {
"SAGEMAKER_SUBMIT_DIRECTORY": "/opt/ml/model/code",
"SAGEMAKER_PROGRAM": "inference.py",
Expand Down Expand Up @@ -59,7 +135,7 @@ def _start_serving(
)

def _invoke_multi_model_server_serving(self, request: object, content_type: str, accept: str):
"""Placeholder docstring"""
"""Invokes MMS server by hitting the docker host"""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks !

try:
response = requests.post(
f"http://{get_docker_host()}:8080/invocations",
Expand All @@ -73,7 +149,7 @@ def _invoke_multi_model_server_serving(self, request: object, content_type: str,
raise Exception("Unable to send request to the local container server") from e

def _multi_model_server_deep_ping(self, predictor: PredictorBase):
"""Placeholder docstring"""
"""Deep ping in order to ensure prediction"""
response = None
try:
response = predictor.predict(self.schema_builder.sample_input)
Expand Down
12 changes: 11 additions & 1 deletion src/sagemaker/serve/utils/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Placeholder Docstring"""
"""Exceptions used across different model builder invocations"""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice thanks for the update

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course, thanks for the suggestion.


from __future__ import absolute_import

Expand All @@ -24,6 +24,16 @@ def __init__(self, message):
super().__init__(message=message)


class InProcessDeepPingException(ModelBuilderException):
"""Raise when in process model serving does not pass the deep ping check"""

fmt = "Error Message: {message}"
model_builder_error_code = 1

def __init__(self, message):
super().__init__(message=message)


class LocalModelOutOfMemoryException(ModelBuilderException):
"""Raise when local model serving fails to load the model"""

Expand Down
Loading