Skip to content

Use specified output_kms_key when packaging code with FrameworkProcessor #3390

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 12 commits into from
Closed
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CODEOWNERS
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* @aws/sagemaker-ml-frameworks
5 changes: 2 additions & 3 deletions src/sagemaker/git_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,8 @@ def _run_clone_command(repo_url, dest_dir):
subprocess.check_call(["git", "clone", repo_url, dest_dir], env=my_env)
elif repo_url.startswith("git@"):
with tempfile.NamedTemporaryFile() as sshnoprompt:
write_pipe = open(sshnoprompt.name, "w")
write_pipe.write("ssh -oBatchMode=yes $@")
write_pipe.close()
with open(sshnoprompt.name, "w") as write_pipe:
write_pipe.write("ssh -oBatchMode=yes $@")
os.chmod(sshnoprompt.name, 0o511)
my_env["GIT_SSH"] = sshnoprompt.name
subprocess.check_call(["git", "clone", repo_url, dest_dir], env=my_env)
Expand Down
66 changes: 65 additions & 1 deletion src/sagemaker/image_uri_config/tensorflow.json
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,9 @@
"2.5": "2.5.1",
"2.6": "2.6.3",
"2.7": "2.7.0",
"2.8": "2.8.0"
"2.8": "2.8.0",
"2.9": "2.9.2",
"2.10": "2.10.0"
},
"versions": {
"1.10.0": {
Expand Down Expand Up @@ -1468,6 +1470,68 @@
"us-west-2": "763104351884"
},
"repository": "tensorflow-inference"
},
"2.9.2": {
"registries": {
"af-south-1": "626614931356",
"ap-east-1": "871362719292",
"ap-northeast-1": "763104351884",
"ap-northeast-2": "763104351884",
"ap-northeast-3": "364406365360",
"ap-south-1": "763104351884",
"ap-southeast-1": "763104351884",
"ap-southeast-2": "763104351884",
"ap-southeast-3": "907027046896",
"ca-central-1": "763104351884",
"cn-north-1": "727897471807",
"cn-northwest-1": "727897471807",
"eu-central-1": "763104351884",
"eu-north-1": "763104351884",
"eu-south-1": "692866216735",
"eu-west-1": "763104351884",
"eu-west-2": "763104351884",
"eu-west-3": "763104351884",
"me-south-1": "217643126080",
"sa-east-1": "763104351884",
"us-east-1": "763104351884",
"us-east-2": "763104351884",
"us-gov-west-1": "442386744353",
"us-iso-east-1": "886529160074",
"us-west-1": "763104351884",
"us-west-2": "763104351884"
},
"repository": "tensorflow-inference"
},
"2.10.0": {
"registries": {
"af-south-1": "626614931356",
"ap-east-1": "871362719292",
"ap-northeast-1": "763104351884",
"ap-northeast-2": "763104351884",
"ap-northeast-3": "364406365360",
"ap-south-1": "763104351884",
"ap-southeast-1": "763104351884",
"ap-southeast-2": "763104351884",
"ap-southeast-3": "907027046896",
"ca-central-1": "763104351884",
"cn-north-1": "727897471807",
"cn-northwest-1": "727897471807",
"eu-central-1": "763104351884",
"eu-north-1": "763104351884",
"eu-south-1": "692866216735",
"eu-west-1": "763104351884",
"eu-west-2": "763104351884",
"eu-west-3": "763104351884",
"me-south-1": "217643126080",
"sa-east-1": "763104351884",
"us-east-1": "763104351884",
"us-east-2": "763104351884",
"us-gov-west-1": "442386744353",
"us-iso-east-1": "886529160074",
"us-west-1": "763104351884",
"us-west-2": "763104351884"
},
"repository": "tensorflow-inference"
}
}
},
Expand Down
20 changes: 12 additions & 8 deletions src/sagemaker/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -1493,6 +1493,7 @@ def _create_estimator(
enable_network_isolation=False, # True -> uploads to input channel. Not what we want!
image_uri=self.image_uri,
role=self.role,
output_kms_key=self.output_kms_key,
# Estimator instance_count doesn't currently matter to FrameworkProcessor, and the
# SKLearn Framework Estimator requires instance_type==1. So here we hard-wire it to 1,
# but if it matters in future perhaps we could take self.instance_count here and have
Expand Down Expand Up @@ -1587,13 +1588,13 @@ def run( # type: ignore[override]
framework script to run.Path (absolute or relative) to the local
Python source file which should be executed as the entry point
to training. When `code` is an S3 URI, ignore `source_dir`,
`dependencies, and `git_config`. If ``source_dir`` is specified,
`dependencies`, and `git_config`. If ``source_dir`` is specified,
then ``code`` must point to a file located at the root of ``source_dir``.
source_dir (str): Path (absolute, relative or an S3 URI) to a directory
with any other processing source code dependencies aside from the entry
point file (default: None). If ``source_dir`` is an S3 URI, it must
point to a tar.gz file. Structure within this directory are preserved
when processing on Amazon SageMaker (default: None).
point to a file named `sourcedir.tar.gz`. Structure within this directory
are preserved when processing on Amazon SageMaker (default: None).
dependencies (list[str]): A list of paths to directories (absolute
or relative) with any additional libraries that will be exported
to the container (default: []). The library folders will be
Expand Down Expand Up @@ -1730,12 +1731,15 @@ def _pack_and_upload_code(
"sagemaker_session unspecified when creating your Processor to have one set up "
"automatically."
)
if "/sourcedir.tar.gz" in estimator.uploaded_code.s3_prefix:
# Upload the bootstrapping code as s3://.../jobname/source/runproc.sh.
entrypoint_s3_uri = estimator.uploaded_code.s3_prefix.replace(
"sourcedir.tar.gz",
"runproc.sh",
)
else:
raise RuntimeError("S3 source_dir file must be named `sourcedir.tar.gz.`")

# Upload the bootstrapping code as s3://.../jobname/source/runproc.sh.
entrypoint_s3_uri = estimator.uploaded_code.s3_prefix.replace(
"sourcedir.tar.gz",
"runproc.sh",
)
script = estimator.uploaded_code.script_name
s3_runproc_sh = S3Uploader.upload_string_as_file_body(
self._generate_framework_script(script),
Expand Down
158 changes: 155 additions & 3 deletions src/sagemaker/transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,17 @@
from __future__ import absolute_import

from typing import Union, Optional, List, Dict
from botocore import exceptions
import logging
import copy
import time

from botocore import exceptions
from sagemaker.job import _Job
from sagemaker.session import Session
from sagemaker.session import Session, get_execution_role
from sagemaker.inputs import BatchDataCaptureConfig
from sagemaker.workflow.entities import PipelineVariable
from sagemaker.workflow.functions import Join
from sagemaker.workflow.pipeline_context import runnable_by_pipeline
from sagemaker.workflow.pipeline_context import runnable_by_pipeline, PipelineSession
from sagemaker.workflow import is_pipeline_variable
from sagemaker.workflow.execution_variables import ExecutionVariables
from sagemaker.utils import base_name_from_image, name_from_base
Expand Down Expand Up @@ -266,6 +269,155 @@ def transform(
if wait:
self.latest_transform_job.wait(logs=logs)

def transform_with_monitoring(
self,
monitoring_config,
monitoring_resource_config,
data: str,
data_type: str = "S3Prefix",
content_type: str = None,
compression_type: str = None,
split_type: str = None,
input_filter: str = None,
output_filter: str = None,
join_source: str = None,
model_client_config: Dict[str, str] = None,
batch_data_capture_config: BatchDataCaptureConfig = None,
monitor_before_transform: bool = False,
supplied_baseline_statistics: str = None,
supplied_baseline_constraints: str = None,
wait: bool = True,
pipeline_name: str = None,
role: str = None,
):
"""Runs a transform job with monitoring job.

Note that this function will not start a transform job immediately,
instead, it will create a SageMaker Pipeline and execute it.
If you provide an existing pipeline_name, no new pipeline will be created, otherwise,
each transform_with_monitoring call will create a new pipeline and execute.

Args:
monitoring_config (Union[
`sagemaker.workflow.quality_check_step.QualityCheckConfig`,
`sagemaker.workflow.quality_check_step.ClarifyCheckConfig`
]): the monitoring configuration used for run model monitoring.
monitoring_resource_config (`sagemaker.workflow.check_job_config.CheckJobConfig`):
the check job (processing job) cluster resource configuration.
transform_step_args (_JobStepArguments): the transform step transform arguments.
data (str): Input data location in S3 for the transform job
data_type (str): What the S3 location defines (default: 'S3Prefix').
Valid values:
* 'S3Prefix' - the S3 URI defines a key name prefix. All objects with this prefix
will be used as inputs for the transform job.
* 'ManifestFile' - the S3 URI points to a single manifest file listing each S3
object to use as an input for the transform job.
content_type (str): MIME type of the input data (default: None).
compression_type (str): Compression type of the input data, if
compressed (default: None). Valid values: 'Gzip', None.
split_type (str): The record delimiter for the input object
(default: 'None'). Valid values: 'None', 'Line', 'RecordIO', and
'TFRecord'.
input_filter (str): A JSONPath to select a portion of the input to
pass to the algorithm container for inference. If you omit the
field, it gets the value '$', representing the entire input.
For CSV data, each row is taken as a JSON array,
so only index-based JSONPaths can be applied, e.g. $[0], $[1:].
CSV data should follow the `RFC format <https://tools.ietf.org/html/rfc4180>`_.
See `Supported JSONPath Operators
<https://docs.aws.amazon.com/sagemaker/latest/dg/batch-transform-data-processing.html#data-processing-operators>`_
for a table of supported JSONPath operators.
For more information, see the SageMaker API documentation for
`CreateTransformJob
<https://docs.aws.amazon.com/sagemaker/latest/dg/API_CreateTransformJob.html>`_.
Some examples: "$[1:]", "$.features" (default: None).
output_filter (str): A JSONPath to select a portion of the
joined/original output to return as the output.
For more information, see the SageMaker API documentation for
`CreateTransformJob
<https://docs.aws.amazon.com/sagemaker/latest/dg/API_CreateTransformJob.html>`_.
Some examples: "$[1:]", "$.prediction" (default: None).
join_source (str): The source of data to be joined to the transform
output. It can be set to 'Input' meaning the entire input record
will be joined to the inference result. You can use OutputFilter
to select the useful portion before uploading to S3. (default:
None). Valid values: Input, None.
model_client_config (dict[str, str]): Model configuration.
Dictionary contains two optional keys,
'InvocationsTimeoutInSeconds', and 'InvocationsMaxRetries'.
(default: ``None``).
batch_data_capture_config (BatchDataCaptureConfig): Configuration object which
specifies the configurations related to the batch data capture for the transform job
(default: ``None``).
monitor_before_transform (bgool): If to run data quality
or model explainability monitoring type,
a true value of this flag indicates running the check step before the transform job.
fail_on_violation (Union[bool, PipelineVariable]): A opt-out flag to not to fail the
check step when a violation is detected.
supplied_baseline_statistics (Union[str, PipelineVariable]): The S3 path
to the supplied statistics object representing the statistics JSON file
which will be used for drift to check (default: None).
supplied_baseline_constraints (Union[str, PipelineVariable]): The S3 path
to the supplied constraints object representing the constraints JSON file
which will be used for drift to check (default: None).
wait (bool): To determine if needed to wait for the pipeline execution to complete
pipeline_name (str): The name of the Pipeline for the monitoring and transfrom step
role (str): Execution role
"""

transformer = self
if not isinstance(self.sagemaker_session, PipelineSession):
sagemaker_session = self.sagemaker_session
self.sagemaker_session = None
transformer = copy.deepcopy(self)
transformer.sagemaker_session = PipelineSession()
self.sagemaker_session = sagemaker_session

transform_step_args = transformer.transform(
data=data,
data_type=data_type,
content_type=content_type,
compression_type=compression_type,
split_type=split_type,
input_filter=input_filter,
output_filter=output_filter,
batch_data_capture_config=batch_data_capture_config,
join_source=join_source,
model_client_config=model_client_config,
)

from sagemaker.workflow.monitor_batch_transform_step import MonitorBatchTransformStep

monitoring_batch_step = MonitorBatchTransformStep(
name="MonitorBatchTransformStep",
display_name="MonitorBatchTransformStep",
description="",
transform_step_args=transform_step_args,
monitor_configuration=monitoring_config,
check_job_configuration=monitoring_resource_config,
monitor_before_transform=monitor_before_transform,
supplied_baseline_constraints=supplied_baseline_constraints,
supplied_baseline_statistics=supplied_baseline_statistics,
)

pipeline_name = (
pipeline_name if pipeline_name else f"TransformWithMonitoring{int(time.time())}"
)
# if pipeline exists, just start the execution
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
name=pipeline_name,
steps=[monitoring_batch_step],
sagemaker_session=transformer.sagemaker_session,
)
pipeline.upsert(role_arn=role if role else get_execution_role())
execution = pipeline.start()
if wait:
logging.info("Waiting for transform with monitoring to execute ...")
execution.wait()
return execution

def delete_model(self):
"""Delete the corresponding SageMaker model for this Transformer."""
self.sagemaker_session.delete_model(self.model_name)
Expand Down
5 changes: 3 additions & 2 deletions tests/data/multimodel/container/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
FROM public.ecr.aws/ubuntu/ubuntu:18.04
# added latest image from https://gallery.ecr.aws/lts/ubuntu
FROM public.ecr.aws/ubuntu/ubuntu:22.04

# Set a docker label to advertise multi-model support on the container
LABEL com.amazonaws.sagemaker.capabilities.multi-models=true
Expand All @@ -15,7 +16,7 @@ RUN apt-get update && \
curl \
vim \
&& rm -rf /var/lib/apt/lists/* \
&& curl -O https://bootstrap.pypa.io/pip/3.6/get-pip.py \
&& curl -O https://bootstrap.pypa.io/pip/get-pip.py \
&& python3 get-pip.py

RUN update-alternatives --install /usr/bin/python python /usr/bin/python3 1
Expand Down
Loading