Skip to content

Commit 6273bdd

Browse files
committed
feature: wrap up Processing feature
* Fix job name timestamps for code inputs in ScriptProcessor * Add docstrings documenting generated names and destination for ProcessingInput and ProcessingOutput * Add instance_count to SKLearnProcessor constructor * fix docstring typo
1 parent e9719b3 commit 6273bdd

File tree

5 files changed

+185
-218
lines changed

5 files changed

+185
-218
lines changed

src/sagemaker/model_monitor/model_monitoring.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -989,7 +989,7 @@ def __init__(
989989
Args:
990990
role (str): An AWS IAM role. The Amazon SageMaker jobs use this role.
991991
instance_count (int): The number of instances to run the jobs with.
992-
instance_type (str): Type of EC2 instance to use forthe job, for example,
992+
instance_type (str): Type of EC2 instance to use for the job, for example,
993993
'ml.m5.xlarge'.
994994
volume_size_in_gb (int): Size in GB of the EBS volume
995995
to use for storing data during processing (default: 30).

src/sagemaker/processing.py

Lines changed: 41 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
# ANY KIND, either express or implied. See the License for the specific
1212
# language governing permissions and limitations under the License.
1313
"""This module contains code related to the Processor class, which is used
14-
for Processing jobs. These jobs let customers perform data pre-processing,
14+
for Processing jobs. These jobs let users perform data pre-processing,
1515
post-processing, feature engineering, data validation, and model evaluation
1616
and interpretation on SageMaker.
1717
"""
@@ -148,7 +148,10 @@ def run(
148148
self.arguments = arguments
149149

150150
self.latest_job = ProcessingJob.start_new(
151-
self, normalized_inputs, normalized_outputs, experiment_config
151+
processor=self,
152+
inputs=normalized_inputs,
153+
outputs=normalized_outputs,
154+
experiment_config=experiment_config,
152155
)
153156
self.jobs.append(self.latest_job)
154157
if wait:
@@ -260,6 +263,7 @@ def __init__(
260263
self,
261264
role,
262265
image_uri,
266+
command,
263267
instance_count,
264268
instance_type,
265269
volume_size_in_gb=30,
@@ -283,11 +287,12 @@ def __init__(
283287
needs to access an AWS resource.
284288
image_uri (str): The uri of the image to use for the processing
285289
jobs started by the Processor.
290+
command ([str]): The command to run, along with any command-line flags.
291+
Example: ["python3", "-v"].
286292
instance_count (int): The number of instances to run
287293
the Processing job with.
288294
instance_type (str): Type of EC2 instance to use for
289295
processing, for example, 'ml.c4.xlarge'.
290-
py_version (str): The python version to use, for example, 'py3'.
291296
volume_size_in_gb (int): Size in GB of the EBS volume
292297
to use for storing data during processing (default: 30).
293298
volume_kms_key (str): A KMS key for the processing
@@ -311,6 +316,7 @@ def __init__(
311316
"""
312317
self._CODE_CONTAINER_BASE_PATH = "/opt/ml/processing/input/"
313318
self._CODE_CONTAINER_INPUT_NAME = "code"
319+
self.command = command
314320

315321
super(ScriptProcessor, self).__init__(
316322
role=role,
@@ -330,9 +336,7 @@ def __init__(
330336

331337
def run(
332338
self,
333-
command,
334339
code,
335-
script_name=None,
336340
inputs=None,
337341
outputs=None,
338342
arguments=None,
@@ -344,13 +348,8 @@ def run(
344348
"""Run a processing job with Script Mode.
345349
346350
Args:
347-
command([str]): This is a list of strings that includes the executable, along
348-
with any command-line flags. For example: ["python3", "-v"]
349351
code (str): This can be an S3 uri or a local path to either
350352
a directory or a file with the user's script to run.
351-
script_name (str): If the user provides a directory for source,
352-
they must specify script_name as the file within that
353-
directory to use.
354353
inputs ([sagemaker.processing.ProcessingInput]): Input files for the processing
355354
job. These must be provided as ProcessingInput objects.
356355
outputs ([str or sagemaker.processing.ProcessingOutput]): Outputs for the processing
@@ -369,50 +368,45 @@ def run(
369368
"""
370369
self._current_job_name = self._generate_current_job_name(job_name=job_name)
371370

372-
customer_script_name = self._get_customer_script_name(code, script_name)
373-
customer_code_s3_uri = self._upload_code(code)
374-
inputs_with_code = self._convert_code_and_add_to_inputs(inputs, customer_code_s3_uri)
371+
user_script_name = self._get_user_script_name(code)
372+
user_code_s3_uri = self._upload_code(code)
373+
inputs_with_code = self._convert_code_and_add_to_inputs(inputs, user_code_s3_uri)
375374

376-
self._set_entrypoint(command, customer_script_name)
375+
self._set_entrypoint(self.command, user_script_name)
376+
377+
normalized_inputs = self._normalize_inputs(inputs_with_code)
378+
normalized_outputs = self._normalize_outputs(outputs)
379+
self.arguments = arguments
377380

378-
super(ScriptProcessor, self).run(
379-
inputs=inputs_with_code,
380-
outputs=outputs,
381-
arguments=arguments,
382-
wait=wait,
383-
logs=logs,
384-
job_name=job_name,
381+
self.latest_job = ProcessingJob.start_new(
382+
processor=self,
383+
inputs=normalized_inputs,
384+
outputs=normalized_outputs,
385385
experiment_config=experiment_config,
386386
)
387+
self.jobs.append(self.latest_job)
388+
if wait:
389+
self.latest_job.wait(logs=logs)
387390

388-
def _get_customer_script_name(self, code, script_name):
389-
"""Finds the customer script name using the provided code file,
391+
def _get_user_script_name(self, code):
392+
"""Finds the user script name using the provided code file,
390393
directory, or script name.
391394
392395
Args:
393396
code (str): This can be an S3 uri or a local path to either
394397
a directory or a file.
395-
script_name (str): If the user provides a directory as source,
396-
they must specify script_name as the file within that
397-
directory to use.
398398
399399
Returns:
400400
str: The script name from the S3 uri or from the file found
401401
on the user's local machine.
402402
"""
403-
parse_result = urlparse(code)
404-
405-
if os.path.isdir(code) and script_name is None:
403+
if os.path.isdir(code) is None or not os.path.splitext(code)[1]:
406404
raise ValueError(
407-
"""You provided a directory without providing a script name.
408-
Please provide a script name inside the directory that you specified.
405+
"""You cannot provide a directory. Please package your code inside of a .whl
406+
file and pass that in, instead.
409407
"""
410408
)
411-
if (parse_result.scheme == "s3" or os.path.isdir(code)) and script_name is not None:
412-
return script_name
413-
if parse_result.scheme == "s3" or os.path.isfile(code):
414-
return os.path.basename(code)
415-
raise ValueError("The file or directory you specified does not exist.")
409+
return os.path.basename(code)
416410

417411
def _upload_code(self, code):
418412
"""Uploads a code file or directory specified as a string
@@ -457,16 +451,16 @@ def _convert_code_and_add_to_inputs(self, inputs, s3_uri):
457451
)
458452
return (inputs or []) + [code_file_input]
459453

460-
def _set_entrypoint(self, command, customer_script_name):
461-
"""Sets the entrypoint based on the customer's script and corresponding executable.
454+
def _set_entrypoint(self, command, user_script_name):
455+
"""Sets the entrypoint based on the user's script and corresponding executable.
462456
463457
Args:
464-
customer_script_name (str): A filename with an extension.
458+
user_script_name (str): A filename with an extension.
465459
"""
466-
customer_script_location = os.path.join(
467-
self._CODE_CONTAINER_BASE_PATH, self._CODE_CONTAINER_INPUT_NAME, customer_script_name
460+
user_script_location = os.path.join(
461+
self._CODE_CONTAINER_BASE_PATH, self._CODE_CONTAINER_INPUT_NAME, user_script_name
468462
)
469-
self.entrypoint = command + [customer_script_location]
463+
self.entrypoint = command + [user_script_location]
470464

471465

472466
class ProcessingJob(_Job):
@@ -602,7 +596,7 @@ def __init__(
602596
source (str): The source for the input.
603597
destination (str): The destination of the input.
604598
input_name (str): The user-provided name for the input. If a name
605-
is not provided, one will be generated.
599+
is not provided, one will be generated (eg. "input-1").
606600
s3_data_type (str): Valid options are "ManifestFile" or "S3Prefix".
607601
s3_input_mode (str): Valid options are "Pipe" or "File".
608602
s3_data_distribution_type (str): Valid options are "FullyReplicated"
@@ -652,8 +646,10 @@ def __init__(self, source, destination=None, output_name=None, s3_upload_mode="E
652646
653647
Args:
654648
source (str): The source for the output.
655-
destination (str): The destination of the output.
656-
output_name (str): The name of the output.
649+
destination (str): The destination of the output. If a destination
650+
is not provided, one will be generated (eg. "s3://bucket/job_name/output").
651+
output_name (str): The name of the output. If a name
652+
is not provided, one will be generated (eg. "output-1").
657653
s3_upload_mode (str): Valid options are "EndOfJob" or "Continuous".
658654
"""
659655
self.source = source

src/sagemaker/sklearn/processing.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ def __init__(
3030
self,
3131
framework_version,
3232
role,
33+
command,
3334
instance_type,
35+
instance_count=1,
3436
py_version="py3",
3537
volume_size_in_gb=30,
3638
volume_kms_key=None,
@@ -54,6 +56,10 @@ def __init__(
5456
needs to access an AWS resource.
5557
instance_type (str): Type of EC2 instance to use for
5658
processing, for example, 'ml.c4.xlarge'.
59+
instance_count (int): The number of instances to run
60+
the Processing job with. Defaults to 1.
61+
command ([str]): The command to run, along with any command-line flags.
62+
Example: ["python3", "-v"].
5763
py_version (str): The python version to use, for example, 'py3'.
5864
volume_size_in_gb (int): Size in GB of the EBS volume
5965
to use for storing data during processing (default: 30).
@@ -85,8 +91,9 @@ def __init__(
8591
super(SKLearnProcessor, self).__init__(
8692
role=role,
8793
image_uri=image_uri,
88-
instance_count=1,
94+
instance_count=instance_count,
8995
instance_type=instance_type,
96+
command=command,
9097
volume_size_in_gb=volume_size_in_gb,
9198
volume_kms_key=volume_kms_key,
9299
output_kms_key=output_kms_key,

0 commit comments

Comments
 (0)