diff --git a/src/sagemaker/processing.py b/src/sagemaker/processing.py index cb8545d189..e118c5e868 100644 --- a/src/sagemaker/processing.py +++ b/src/sagemaker/processing.py @@ -10,10 +10,10 @@ # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. -"""This module contains code related to the Processor class, which is used -for Processing jobs. These jobs let users perform data pre-processing, -post-processing, feature engineering, data validation, and model evaluation -and interpretation on SageMaker. +"""This module contains code related to the ``Processor`` class, which is used +for Amazon SageMaker Processing Jobs. These jobs let users perform data pre-processing, +post-processing, feature engineering, data validation, and model evaluation, +and interpretation on Amazon SageMaker. """ from __future__ import print_function, absolute_import @@ -29,7 +29,7 @@ class Processor(object): - """Handles Amazon SageMaker processing tasks.""" + """Handles Amazon SageMaker Processing tasks.""" def __init__( self, @@ -48,40 +48,43 @@ def __init__( tags=None, network_config=None, ): - """Initialize a ``Processor`` instance. The Processor handles Amazon - SageMaker processing tasks. + """Initializes a ``Processor`` instance. The ``Processor`` handles Amazon + SageMaker Processing tasks. Args: - role (str): An AWS IAM role name or ARN. The Amazon SageMaker training jobs - and APIs that create Amazon SageMaker endpoints use this role - to access training data and model artifacts. After the endpoint - is created, the inference code might use the IAM role, if it - needs to access an AWS resource. - image_uri (str): The uri of the image to use for the processing - jobs started by the Processor. + role (str): An AWS IAM role name or ARN. Amazon SageMaker Processing + uses this role to access AWS resources, such as + data stored in Amazon S3. + image_uri (str): The URI of the Docker image to use for the + processing jobs. instance_count (int): The number of instances to run - the Processing job with. - instance_type (str): Type of EC2 instance to use for + a processing job with. + instance_type (str): The type of EC2 instance to use for processing, for example, 'ml.c4.xlarge'. - entrypoint ([str]): The entrypoint for the processing job. + entrypoint (list[str]): The entrypoint for the processing job (default: None). + This is in the form of a list of strings that make a command. volume_size_in_gb (int): Size in GB of the EBS volume to use for storing data during processing (default: 30). volume_kms_key (str): A KMS key for the processing - volume. - output_kms_key (str): The KMS key id for all ProcessingOutputs. - max_runtime_in_seconds (int): Timeout in seconds - After this amount of time Amazon SageMaker terminates the job + volume (default: None). + output_kms_key (str): The KMS key ID for processing job outputs (default: None). + max_runtime_in_seconds (int): Timeout in seconds (default: None). + After this amount of time, Amazon SageMaker terminates the job, regardless of its current status. - base_job_name (str): Prefix for processing name. If not specified, + base_job_name (str): Prefix for processing job name. If not specified, the processor generates a default job name, based on the - training image name and current timestamp. - sagemaker_session (sagemaker.session.Session): Session object which - manages interactions with Amazon SageMaker APIs and any other - AWS services needed. If not specified, the processor creates one - using the default AWS configuration chain. - env (dict): Environment variables to be passed to the processing job. - tags ([dict]): List of tags to be passed to the processing job. - network_config (sagemaker.network.NetworkConfig): A NetworkConfig + processing image name and current timestamp. + sagemaker_session (:class:`~sagemaker.session.Session`): + Session object which manages interactions with Amazon SageMaker and + any other AWS services needed. If not specified, the processor creates + one using the default AWS configuration chain. + env (dict[str, str]): Environment variables to be passed to + the processing jobs (default: None). + tags (list[dict]): List of tags to be passed to the processing job + (default: None). For more, see + https://docs.aws.amazon.com/sagemaker/latest/dg/API_Tag.html. + network_config (:class:`~sagemaker.network.NetworkConfig`): + A :class:`~sagemaker.network.NetworkConfig` object that configures network isolation, encryption of inter-container traffic, security group IDs, and subnets. """ @@ -115,25 +118,28 @@ def run( job_name=None, experiment_config=None, ): - """Run a processing job. + """Runs a processing job. Args: - inputs ([sagemaker.processing.ProcessingInput]): Input files for the processing - job. These must be provided as ProcessingInput objects. - outputs ([sagemaker.processing.ProcessingOutput]): Outputs for the processing - job. These can be specified as either a path string or a ProcessingOutput - object. - arguments ([str]): A list of string arguments to be passed to a - processing job. + inputs (list[:class:`~sagemaker.processing.ProcessingInput`]): Input files for + the processing job. These must be provided as + :class:`~sagemaker.processing.ProcessingInput` objects (default: None). + outputs (list[:class:`~sagemaker.processing.ProcessingOutput`]): Outputs for + the processing job. These can be specified as either path strings or + :class:`~sagemaker.processing.ProcessingOutput` objects (default: None). + arguments (list[str]): A list of string arguments to be passed to a + processing job (default: None). wait (bool): Whether the call should wait until the job completes (default: True). logs (bool): Whether to show the logs produced by the job. - Only meaningful when wait is True (default: True). + Only meaningful when ``wait`` is True (default: True). job_name (str): Processing job name. If not specified, the processor generates - a default job name, based on the image name and current timestamp. + a default job name, based on the base job name and current timestamp. experiment_config (dict[str, str]): Experiment management configuration. - Dictionary contains three optional keys, + Dictionary contains three optional keys: 'ExperimentName', 'TrialName', and 'TrialComponentDisplayName'. + Raises: + ValueError: if ``logs`` is True but ``wait`` is False. """ if logs and not wait: raise ValueError( @@ -158,7 +164,7 @@ def run( self.latest_job.wait(logs=logs) def _generate_current_job_name(self, job_name=None): - """Generate the job name before running a processing job. + """Generates the job name before running a processing job. Args: job_name (str): Name of the processing job to be created. If not @@ -179,15 +185,19 @@ def _generate_current_job_name(self, job_name=None): return name_from_base(base_name) def _normalize_inputs(self, inputs=None): - """Ensure that all the ProcessingInput objects have names and S3 uris. + """Ensures that all the ``ProcessingInput`` objects have names and S3 URIs. Args: - inputs ([sagemaker.processing.ProcessingInput]): A list of ProcessingInput - objects to be normalized. + inputs (list[sagemaker.processing.ProcessingInput]): A list of ``ProcessingInput`` + objects to be normalized (default: None). If not specified, + an empty list is returned. Returns: - [sagemaker.processing.ProcessingInput]: The list of normalized - ProcessingInput objects. + list[sagemaker.processing.ProcessingInput]: The list of normalized + ``ProcessingInput`` objects. + + Raises: + TypeError: if the inputs are not ``ProcessingInput`` objects. """ # Initialize a list of normalized ProcessingInput objects. normalized_inputs = [] @@ -220,17 +230,21 @@ def _normalize_inputs(self, inputs=None): return normalized_inputs def _normalize_outputs(self, outputs=None): - """Ensure that all the outputs are ProcessingOutput objects with - names and S3 uris. + """Ensures that all the outputs are ``ProcessingOutput`` objects with + names and S3 URIs. Args: - outputs ([sagemaker.processing.ProcessingOutput]): A list - of outputs to be normalized. Can be either strings or - ProcessingOutput objects. + outputs (list[sagemaker.processing.ProcessingOutput]): A list + of outputs to be normalized (default: None). Can be either strings or + ``ProcessingOutput`` objects. If not specified, + an empty list is returned. Returns: - [sagemaker.processing.ProcessingOutput]: The list of normalized - ProcessingOutput objects. + list[sagemaker.processing.ProcessingOutput]: The list of normalized + ``ProcessingOutput`` objects. + + Raises: + TypeError: if the outputs are not ``ProcessingOutput`` objects. """ # Initialize a list of normalized ProcessingOutput objects. normalized_outputs = [] @@ -277,41 +291,43 @@ def __init__( tags=None, network_config=None, ): - """Initialize a ``ScriptProcessor`` instance. The ScriptProcessor - handles Amazon SageMaker processing tasks for jobs using script mode. + """Initializes a ``ScriptProcessor`` instance. The ``ScriptProcessor`` + handles Amazon SageMaker Processing tasks for jobs using a machine learning framework. Args: - role (str): An AWS IAM role name or ARN. The Amazon SageMaker training jobs - and APIs that create Amazon SageMaker endpoints use this role - to access training data and model artifacts. After the endpoint - is created, the inference code might use the IAM role, if it - needs to access an AWS resource. - image_uri (str): The uri of the image to use for the processing - jobs started by the Processor. + role (str): An AWS IAM role name or ARN. Amazon SageMaker Processing + uses this role to access AWS resources, such as + data stored in Amazon S3. + image_uri (str): The URI of the Docker image to use for the + processing jobs. command ([str]): The command to run, along with any command-line flags. Example: ["python3", "-v"]. instance_count (int): The number of instances to run - the Processing job with. - instance_type (str): Type of EC2 instance to use for + a processing job with. + instance_type (str): The type of EC2 instance to use for processing, for example, 'ml.c4.xlarge'. volume_size_in_gb (int): Size in GB of the EBS volume to use for storing data during processing (default: 30). volume_kms_key (str): A KMS key for the processing - volume. - output_kms_key (str): The KMS key id for all ProcessingOutputs. - max_runtime_in_seconds (int): Timeout in seconds. - After this amount of time Amazon SageMaker terminates the job + volume (default: None). + output_kms_key (str): The KMS key ID for processing job outputs (default: None). + max_runtime_in_seconds (int): Timeout in seconds (default: None). + After this amount of time, Amazon SageMaker terminates the job, regardless of its current status. base_job_name (str): Prefix for processing name. If not specified, the processor generates a default job name, based on the - training image name and current timestamp. - sagemaker_session (sagemaker.session.Session): Session object which - manages interactions with Amazon SageMaker APIs and any other - AWS services needed. If not specified, the processor creates one - using the default AWS configuration chain. - env (dict): Environment variables to be passed to the processing job. - tags ([dict]): List of tags to be passed to the processing job. - network_config (sagemaker.network.NetworkConfig): A NetworkConfig + processing image name and current timestamp. + sagemaker_session (:class:`~sagemaker.session.Session`): + Session object which manages interactions with Amazon SageMaker and + any other AWS services needed. If not specified, the processor creates + one using the default AWS configuration chain. + env (dict[str, str]): Environment variables to be passed to + the processing jobs (default: None). + tags (list[dict]): List of tags to be passed to the processing job + (default: None). For more, see + https://docs.aws.amazon.com/sagemaker/latest/dg/API_Tag.html. + network_config (:class:`~sagemaker.network.NetworkConfig`): + A :class:`~sagemaker.network.NetworkConfig` object that configures network isolation, encryption of inter-container traffic, security group IDs, and subnets. """ @@ -346,25 +362,26 @@ def run( job_name=None, experiment_config=None, ): - """Run a processing job with Script Mode. + """Runs a processing job. Args: - code (str): This can be an S3 uri or a local path to either - a directory or a file with the user's script to run. - inputs ([sagemaker.processing.ProcessingInput]): Input files for the processing - job. These must be provided as ProcessingInput objects. - outputs ([str or sagemaker.processing.ProcessingOutput]): Outputs for the processing - job. These can be specified as either a path string or a ProcessingOutput - object. - arguments ([str]): A list of string arguments to be passed to a - processing job. + code (str): This can be an S3 URI or a local path to + a file with the framework script to run. + inputs (list[:class:`~sagemaker.processing.ProcessingInput`]): Input files for + the processing job. These must be provided as + :class:`~sagemaker.processing.ProcessingInput` objects (default: None). + outputs (list[:class:`~sagemaker.processing.ProcessingOutput`]): Outputs for + the processing job. These can be specified as either path strings or + :class:`~sagemaker.processing.ProcessingOutput` objects (default: None). + arguments (list[str]): A list of string arguments to be passed to a + processing job (default: None). wait (bool): Whether the call should wait until the job completes (default: True). logs (bool): Whether to show the logs produced by the job. Only meaningful when wait is True (default: True). job_name (str): Processing job name. If not specified, the processor generates - a default job name, based on the image name and current timestamp. + a default job name, based on the base job name and current timestamp. experiment_config (dict[str, str]): Experiment management configuration. - Dictionary contains three optional keys, + Dictionary contains three optional keys: 'ExperimentName', 'TrialName', and 'TrialComponentDisplayName'. """ self._current_job_name = self._generate_current_job_name(job_name=job_name) @@ -415,6 +432,9 @@ def _handle_user_code_url(self, code): Returns: str: The S3 URL to the customer's code. + Raises: + ValueError: if the code isn't found, is a directory, or + does not have a valid URL scheme. """ code_url = urlparse(code) if code_url.scheme == "s3": @@ -446,13 +466,13 @@ def _handle_user_code_url(self, code): def _upload_code(self, code): """Uploads a code file or directory specified as a string - and returns the S3 uri. + and returns the S3 URI. Args: code (str): A file or directory to be uploaded to S3. Returns: - str: The S3 uri of the uploaded file or directory. + str: The S3 URI of the uploaded file or directory. """ desired_s3_uri = os.path.join( @@ -467,15 +487,16 @@ def _upload_code(self, code): ) def _convert_code_and_add_to_inputs(self, inputs, s3_uri): - """Creates a ProcessingInput object from an S3 uri and adds it to the list of inputs. + """Creates a ``ProcessingInput`` object from an S3 URI and adds it to the list of inputs. Args: - inputs ([sagemaker.processing.ProcessingInput]): List of ProcessingInput objects. - s3_uri (str): S3 uri of the input to be added to inputs. + inputs (list[sagemaker.processing.ProcessingInput]): + List of ``ProcessingInput`` objects. + s3_uri (str): S3 URI of the input to be added to inputs. Returns: - [sagemaker.processing.ProcessingInput]: A new list of ProcessingInput objects, with - the ProcessingInput object created from s3_uri appended to the list. + list[sagemaker.processing.ProcessingInput]: A new list of ``ProcessingInput`` objects, + with the ``ProcessingInput`` object created from ``s3_uri`` appended to the list. """ code_file_input = ProcessingInput( @@ -506,16 +527,16 @@ def __init__(self, sagemaker_session, job_name, inputs, outputs, output_kms_key= """Initializes a Processing job. Args: - sagemaker_session (sagemaker.session.Session): Session object which - manages interactions with Amazon SageMaker APIs and any other - AWS services needed. If not specified, one is created using - the default AWS configuration chain. + sagemaker_session (:class:`~sagemaker.session.Session`): + Session object which manages interactions with Amazon SageMaker and + any other AWS services needed. If not specified, the processor creates + one using the default AWS configuration chain. job_name (str): Name of the Processing job. - inputs ([sagemaker.processing.ProcessingInput]): A list of ProcessingInput objects. - outputs ([sagemaker.processing.ProcessingOutput]): A list of ProcessingOutput objects. - output_kms_key (str): The output kms key associated with the job. Defaults to None - if not provided. - + inputs (list[:class:`~sagemaker.processing.ProcessingInput`]): A list of + :class:`~sagemaker.processing.ProcessingInput` objects. + outputs (list[:class:`~sagemaker.processing.ProcessingOutput`]): A list of + :class:`~sagemaker.processing.ProcessingOutput` objects. + output_kms_key (str): The output KMS key associated with the job (default: None). """ self.inputs = inputs self.outputs = outputs @@ -524,21 +545,22 @@ def __init__(self, sagemaker_session, job_name, inputs, outputs, output_kms_key= @classmethod def start_new(cls, processor, inputs, outputs, experiment_config): - """Start a new processing job using the provided inputs and outputs. + """Starts a new processing job using the provided inputs and outputs. Args: - processor (sagemaker.processing.Processor): The Processor instance + processor (:class:`~sagemaker.processing.Processor`): The ``Processor`` instance that started the job. - inputs ([sagemaker.processing.ProcessingInput]): A list of ProcessingInput objects. - outputs ([sagemaker.processing.ProcessingOutput]): A list of ProcessingOutput objects. + inputs (list[:class:`~sagemaker.processing.ProcessingInput`]): A list of + :class:`~sagemaker.processing.ProcessingInput` objects. + outputs (list[:class:`~sagemaker.processing.ProcessingOutput`]): A list of + :class:`~sagemaker.processing.ProcessingOutput` objects. experiment_config (dict[str, str]): Experiment management configuration. - Dictionary contains three optional keys, + Dictionary contains three optional keys: 'ExperimentName', 'TrialName', and 'TrialComponentDisplayName'. Returns: - sagemaker.processing.ProcessingJob: The instance of ProcessingJob created - using the current job name. - + :class:`~sagemaker.processing.ProcessingJob`: The instance of ``ProcessingJob`` created + using the ``Processor``. """ # Initialize an empty dictionary for arguments to be passed to sagemaker_session.process. process_request_args = {} @@ -611,18 +633,18 @@ def start_new(cls, processor, inputs, outputs, experiment_config): @classmethod def from_processing_name(cls, sagemaker_session, processing_job_name): - """Initializes a Processing job from a Processing job name. + """Initializes a ``ProcessingJob`` from a processing job name. Args: processing_job_name (str): Name of the processing job. - sagemaker_session (sagemaker.session.Session): Session object which - manages interactions with Amazon SageMaker APIs and any other - AWS services needed. If not specified, one is created using - the default AWS configuration chain. + sagemaker_session (:class:`~sagemaker.session.Session`): + Session object which manages interactions with Amazon SageMaker and + any other AWS services needed. If not specified, the processor creates + one using the default AWS configuration chain. Returns: - sagemaker.processing.ProcessingJob: The instance of ProcessingJob created - using the current job name. + :class:`~sagemaker.processing.ProcessingJob`: The instance of ``ProcessingJob`` created + from the job name. """ job_desc = sagemaker_session.describe_processing_job(job_name=processing_job_name) @@ -659,18 +681,18 @@ def from_processing_name(cls, sagemaker_session, processing_job_name): @classmethod def from_processing_arn(cls, sagemaker_session, processing_job_arn): - """Initializes a Processing job from a Processing ARN. + """Initializes a ``ProcessingJob`` from a Processing ARN. Args: processing_job_arn (str): ARN of the processing job. - sagemaker_session (sagemaker.session.Session): Session object which - manages interactions with Amazon SageMaker APIs and any other - AWS services needed. If not specified, one is created using - the default AWS configuration chain. + sagemaker_session (:class:`~sagemaker.session.Session`): + Session object which manages interactions with Amazon SageMaker and + any other AWS services needed. If not specified, the processor creates + one using the default AWS configuration chain. Returns: - sagemaker.processing.ProcessingJob: The instance of ProcessingJob created - using the current job name. + :class:`~sagemaker.processing.ProcessingJob`: The instance of ``ProcessingJob`` created + from the processing job's ARN. """ processing_job_name = processing_job_arn.split(":")[5][ len("processing-job/") : @@ -681,12 +703,22 @@ def from_processing_arn(cls, sagemaker_session, processing_job_arn): def _is_local_channel(self, input_url): """Used for Local Mode. Not yet implemented. + Args: - input_url (str): + input_url (str): input URL + + Raises: + NotImplementedError: this method is not yet implemented. """ raise NotImplementedError def wait(self, logs=True): + """Waits for the processing job to complete. + + Args: + logs (bool): Whether to show the logs produced by the job (default: True). + + """ if logs: self.sagemaker_session.logs_for_processing_job(self.job_name, wait=True) else: @@ -702,8 +734,8 @@ def stop(self): class ProcessingInput(object): - """Accepts parameters that specify an S3 input for a processing job and provides - a method to turn those parameters into a dictionary.""" + """Accepts parameters that specify an Amazon S3 input for a processing job and + provides a method to turn those parameters into a dictionary.""" def __init__( self, @@ -715,16 +747,16 @@ def __init__( s3_data_distribution_type="FullyReplicated", s3_compression_type="None", ): - """Initialize a ``ProcessingInput`` instance. ProcessingInput accepts parameters - that specify an S3 input for a processing job and provides a method + """Initializes a ``ProcessingInput`` instance. ``ProcessingInput`` accepts parameters + that specify an Amazon S3 input for a processing job and provides a method to turn those parameters into a dictionary. Args: source (str): The source for the input. If a local path is provided, it will - automatically be uploaded to s3 under: + automatically be uploaded to S3 under: "s3:////input/". destination (str): The destination of the input. - input_name (str): The user-provided name for the input. If a name + input_name (str): The name for the input. If a name is not provided, one will be generated (eg. "input-1"). s3_data_type (str): Valid options are "ManifestFile" or "S3Prefix". s3_input_mode (str): Valid options are "Pipe" or "File". @@ -765,12 +797,12 @@ def _to_request_dict(self): class ProcessingOutput(object): - """Accepts parameters that specify an S3 output for a processing job and provides + """Accepts parameters that specify an Amazon S3 output for a processing job and provides a method to turn those parameters into a dictionary.""" def __init__(self, source, destination=None, output_name=None, s3_upload_mode="EndOfJob"): - """Initialize a ``ProcessingOutput`` instance. ProcessingOutput accepts parameters that - specify an S3 output for a processing job and provides a method to turn + """Initializes a ``ProcessingOutput`` instance. ``ProcessingOutput`` accepts parameters that + specify an Amazon S3 output for a processing job and provides a method to turn those parameters into a dictionary. Args: