diff --git a/src/sagemaker/sklearn/processing.py b/src/sagemaker/sklearn/processing.py index c2c7b80839..c5445e31f4 100644 --- a/src/sagemaker/sklearn/processing.py +++ b/src/sagemaker/sklearn/processing.py @@ -17,42 +17,24 @@ """ from __future__ import absolute_import -from sagemaker.processing import FrameworkProcessor -from sagemaker.sklearn.estimator import SKLearn +from sagemaker import image_uris, Session +from sagemaker.processing import ScriptProcessor +from sagemaker.sklearn import defaults -class SKLearnProcessor(FrameworkProcessor): - """Initialize an ``SKLearnProcessor`` instance. - - The SKLearnProcessor handles Amazon SageMaker processing tasks for jobs using scikit-learn. - - Unless ``image_uri`` is specified, the scikit-learn environment is an - Amazon-built Docker container that executes functions defined in the supplied - ``code`` Python script. - - The arguments have the exact same meaning as in ``FrameworkProcessor``. - - .. tip:: - - You can find additional parameters for initializing this class at - :class:`~sagemaker.processing.FrameworkProcessor`. - """ - - estimator_cls = SKLearn +class SKLearnProcessor(ScriptProcessor): + """Handles Amazon SageMaker processing tasks for jobs using scikit-learn.""" def __init__( self, - framework_version, # New arg + framework_version, role, - instance_count, instance_type, - py_version="py3", # New kwarg - image_uri=None, + instance_count, command=None, volume_size_in_gb=30, volume_kms_key=None, output_kms_key=None, - code_location=None, # New arg max_runtime_in_seconds=None, base_job_name=None, sagemaker_session=None, @@ -60,24 +42,68 @@ def __init__( tags=None, network_config=None, ): - """This processor executes a Python script in a scikit-learn execution environment.""" - super().__init__( - self.estimator_cls, - framework_version, - role, - instance_count, - instance_type, - py_version, - image_uri, - command, - volume_size_in_gb, - volume_kms_key, - output_kms_key, - code_location, - max_runtime_in_seconds, - base_job_name, - sagemaker_session, - env, - tags, - network_config, + """Initialize an ``SKLearnProcessor`` instance. + + The SKLearnProcessor handles Amazon SageMaker processing tasks for jobs using scikit-learn. + + Args: + framework_version (str): The version of scikit-learn. + role (str): An AWS IAM role name or ARN. The Amazon SageMaker training jobs + and APIs that create Amazon SageMaker endpoints use this role + to access training data and model artifacts. After the endpoint + is created, the inference code might use the IAM role, if it + needs to access an AWS resource. + instance_type (str): Type of EC2 instance to use for + processing, for example, 'ml.c4.xlarge'. + instance_count (int): The number of instances to run + the Processing job with. Defaults to 1. + command ([str]): The command to run, along with any command-line flags. + Example: ["python3", "-v"]. If not provided, ["python3"] or ["python2"] + will be chosen based on the py_version parameter. + volume_size_in_gb (int): Size in GB of the EBS volume to + use for storing data during processing (default: 30). + volume_kms_key (str): A KMS key for the processing + volume. + output_kms_key (str): The KMS key id for all ProcessingOutputs. + max_runtime_in_seconds (int): Timeout in seconds. + After this amount of time Amazon SageMaker terminates the job + regardless of its current status. + base_job_name (str): Prefix for processing name. If not specified, + the processor generates a default job name, based on the + training image name and current timestamp. + sagemaker_session (sagemaker.session.Session): Session object which + manages interactions with Amazon SageMaker APIs and any other + AWS services needed. If not specified, the processor creates one + using the default AWS configuration chain. + env (dict): Environment variables to be passed to the processing job. + tags ([dict]): List of tags to be passed to the processing job. + network_config (sagemaker.network.NetworkConfig): A NetworkConfig + object that configures network isolation, encryption of + inter-container traffic, security group IDs, and subnets. + """ + if not command: + command = ["python3"] + + session = sagemaker_session or Session() + region = session.boto_region_name + + image_uri = image_uris.retrieve( + defaults.SKLEARN_NAME, region, version=framework_version, instance_type=instance_type + ) + + super(SKLearnProcessor, self).__init__( + role=role, + image_uri=image_uri, + instance_count=instance_count, + instance_type=instance_type, + command=command, + volume_size_in_gb=volume_size_in_gb, + volume_kms_key=volume_kms_key, + output_kms_key=output_kms_key, + max_runtime_in_seconds=max_runtime_in_seconds, + base_job_name=base_job_name, + sagemaker_session=session, + env=env, + tags=tags, + network_config=network_config, ) diff --git a/tests/integ/test_local_mode.py b/tests/integ/test_local_mode.py index b2db1a040d..462f2cfde7 100644 --- a/tests/integ/test_local_mode.py +++ b/tests/integ/test_local_mode.py @@ -349,12 +349,12 @@ def test_local_processing_sklearn(sagemaker_local_session_no_local_code, sklearn job_description = sklearn_processor.latest_job.describe() - assert len(job_description["ProcessingInputs"]) == 3 + assert len(job_description["ProcessingInputs"]) == 2 assert job_description["ProcessingResources"]["ClusterConfig"]["InstanceCount"] == 1 assert job_description["ProcessingResources"]["ClusterConfig"]["InstanceType"] == "local" assert job_description["AppSpecification"]["ContainerEntrypoint"] == [ - "/bin/bash", - "/opt/ml/processing/input/entrypoint/runproc.sh", + "python3", + "/opt/ml/processing/input/code/dummy_script.py", ] assert job_description["RoleArn"] == "" diff --git a/tests/integ/test_processing.py b/tests/integ/test_processing.py index 9729150a97..2e0693051e 100644 --- a/tests/integ/test_processing.py +++ b/tests/integ/test_processing.py @@ -125,6 +125,7 @@ def test_sklearn(sagemaker_session, sklearn_latest_version, cpu_instance_type): role=ROLE, instance_type=cpu_instance_type, instance_count=1, + command=["python3"], sagemaker_session=sagemaker_session, base_job_name="test-sklearn", ) @@ -138,7 +139,7 @@ def test_sklearn(sagemaker_session, sklearn_latest_version, cpu_instance_type): job_description = sklearn_processor.latest_job.describe() - assert len(job_description["ProcessingInputs"]) == 3 + assert len(job_description["ProcessingInputs"]) == 2 assert job_description["ProcessingResources"]["ClusterConfig"]["InstanceCount"] == 1 assert ( job_description["ProcessingResources"]["ClusterConfig"]["InstanceType"] == cpu_instance_type @@ -146,8 +147,8 @@ def test_sklearn(sagemaker_session, sklearn_latest_version, cpu_instance_type): assert job_description["ProcessingResources"]["ClusterConfig"]["VolumeSizeInGB"] == 30 assert job_description["StoppingCondition"] == {"MaxRuntimeInSeconds": 86400} assert job_description["AppSpecification"]["ContainerEntrypoint"] == [ - "/bin/bash", - "/opt/ml/processing/input/entrypoint/runproc.sh", + "python3", + "/opt/ml/processing/input/code/dummy_script.py", ] assert ROLE in job_description["RoleArn"] @@ -203,7 +204,6 @@ def test_sklearn_with_customizations( assert job_description["ProcessingInputs"][0]["InputName"] == "dummy_input" assert job_description["ProcessingInputs"][1]["InputName"] == "code" - assert job_description["ProcessingInputs"][2]["InputName"] == "entrypoint" assert job_description["ProcessingJobName"].startswith("test-sklearn-with-customizations") @@ -220,8 +220,8 @@ def test_sklearn_with_customizations( assert job_description["AppSpecification"]["ContainerArguments"] == ["-v"] assert job_description["AppSpecification"]["ContainerEntrypoint"] == [ - "/bin/bash", - "/opt/ml/processing/input/entrypoint/runproc.sh", + "python3", + "/opt/ml/processing/input/code/dummy_script.py", ] assert job_description["AppSpecification"]["ImageUri"] == image_uri @@ -245,6 +245,7 @@ def test_sklearn_with_custom_default_bucket( sklearn_processor = SKLearnProcessor( framework_version=sklearn_latest_version, role=ROLE, + command=["python3"], instance_type=cpu_instance_type, instance_count=1, volume_size_in_gb=100, @@ -287,9 +288,6 @@ def test_sklearn_with_custom_default_bucket( assert job_description["ProcessingInputs"][0]["InputName"] == "dummy_input" assert custom_bucket_name in job_description["ProcessingInputs"][0]["S3Input"]["S3Uri"] - assert job_description["ProcessingInputs"][1]["InputName"] == "code" - assert custom_bucket_name in job_description["ProcessingInputs"][1]["S3Input"]["S3Uri"] - assert job_description["ProcessingInputs"][2]["InputName"] == "entrypoint" assert custom_bucket_name in job_description["ProcessingInputs"][2]["S3Input"]["S3Uri"] @@ -308,8 +306,8 @@ def test_sklearn_with_custom_default_bucket( assert job_description["AppSpecification"]["ContainerArguments"] == ["-v"] assert job_description["AppSpecification"]["ContainerEntrypoint"] == [ - "/bin/bash", - "/opt/ml/processing/input/entrypoint/runproc.sh", + "python3", + "/opt/ml/processing/input/code/dummy_script.py", ] assert job_description["AppSpecification"]["ImageUri"] == image_uri @@ -326,6 +324,7 @@ def test_sklearn_with_no_inputs_or_outputs( sklearn_processor = SKLearnProcessor( framework_version=sklearn_latest_version, role=ROLE, + command=["python3"], instance_type=cpu_instance_type, instance_count=1, volume_size_in_gb=100, @@ -338,16 +337,12 @@ def test_sklearn_with_no_inputs_or_outputs( ) sklearn_processor.run( - code=os.path.join(DATA_DIR, "dummy_script.py"), - arguments=["-v"], - wait=True, - logs=True, + code=os.path.join(DATA_DIR, "dummy_script.py"), arguments=["-v"], wait=True, logs=True ) job_description = sklearn_processor.latest_job.describe() assert job_description["ProcessingInputs"][0]["InputName"] == "code" - assert job_description["ProcessingInputs"][1]["InputName"] == "entrypoint" assert job_description["ProcessingJobName"].startswith("test-sklearn-with-no-inputs") @@ -361,8 +356,8 @@ def test_sklearn_with_no_inputs_or_outputs( assert job_description["AppSpecification"]["ContainerArguments"] == ["-v"] assert job_description["AppSpecification"]["ContainerEntrypoint"] == [ - "/bin/bash", - "/opt/ml/processing/input/entrypoint/runproc.sh", + "python3", + "/opt/ml/processing/input/code/dummy_script.py", ] assert job_description["AppSpecification"]["ImageUri"] == image_uri diff --git a/tests/integ/test_sklearn.py b/tests/integ/test_sklearn.py index 201a18e4bf..964e1a22fa 100644 --- a/tests/integ/test_sklearn.py +++ b/tests/integ/test_sklearn.py @@ -46,20 +46,6 @@ def sklearn_training_job( sagemaker_session.boto_region_name -def test_framework_processing_job_with_deps( - sagemaker_session, - sklearn_latest_version, - sklearn_latest_py_version, - cpu_instance_type, -): - return _run_processing_job( - sagemaker_session, - cpu_instance_type, - sklearn_latest_version, - sklearn_latest_py_version, - ) - - def test_training_with_additional_hyperparameters( sagemaker_session, sklearn_latest_version, diff --git a/tests/unit/test_processing.py b/tests/unit/test_processing.py index 75eae7b506..e9815a430c 100644 --- a/tests/unit/test_processing.py +++ b/tests/unit/test_processing.py @@ -88,7 +88,6 @@ def test_sklearn_processor_with_required_parameters( exists_mock, isfile_mock, botocore_resolver, sagemaker_session, sklearn_version ): botocore_resolver.return_value.construct_endpoint.return_value = {"hostname": ECR_HOSTNAME} - processor = SKLearnProcessor( role=ROLE, instance_type="ml.m4.xlarge", @@ -99,13 +98,12 @@ def test_sklearn_processor_with_required_parameters( processor.run(code="/local/path/to/processing_code.py") - expected_args = _get_expected_args_modular_code(processor._current_job_name) + expected_args = _get_expected_args(processor._current_job_name) sklearn_image_uri = ( "246618743249.dkr.ecr.us-west-2.amazonaws.com/sagemaker-scikit-learn:{}-cpu-py3" ).format(sklearn_version) expected_args["app_specification"]["ImageUri"] = sklearn_image_uri - sagemaker_session.process.assert_called_with(**expected_args) @@ -113,20 +111,18 @@ def test_sklearn_processor_with_required_parameters( @patch("os.path.exists", return_value=True) @patch("os.path.isfile", return_value=True) def test_sklearn_with_all_parameters( - exists_mock, isfile_mock, botocore_resolver, sklearn_version, sagemaker_session, uploaded_code + exists_mock, isfile_mock, botocore_resolver, sklearn_version, sagemaker_session ): botocore_resolver.return_value.construct_endpoint.return_value = {"hostname": ECR_HOSTNAME} processor = SKLearnProcessor( role=ROLE, framework_version=sklearn_version, - command=["Rscript"], instance_type="ml.m4.xlarge", instance_count=1, volume_size_in_gb=100, volume_kms_key="arn:aws:kms:us-west-2:012345678901:key/volume-kms-key", output_kms_key="arn:aws:kms:us-west-2:012345678901:key/output-kms-key", - code_location=MOCKED_S3_URI, max_runtime_in_seconds=3600, base_job_name="my_sklearn_processor", env={"my_env_variable": "my_env_variable_value"}, @@ -140,21 +136,18 @@ def test_sklearn_with_all_parameters( sagemaker_session=sagemaker_session, ) - with patch("sagemaker.estimator.tar_and_upload_dir", return_value=uploaded_code): - processor.run( - code="processing_code.py", - source_dir="/local/path/to/source_dir", - dependencies=["/local/path/to/dep_01"], - inputs=_get_data_inputs_all_parameters(), - outputs=_get_data_outputs_all_parameters(), - arguments=["--drop-columns", "'SelfEmployed'"], - wait=True, - logs=False, - job_name="my_job_name", - experiment_config={"ExperimentName": "AnExperiment"}, - ) + processor.run( + code="/local/path/to/processing_code.py", + inputs=_get_data_inputs_all_parameters(), + outputs=_get_data_outputs_all_parameters(), + arguments=["--drop-columns", "'SelfEmployed'"], + wait=True, + logs=False, + job_name="my_job_name", + experiment_config={"ExperimentName": "AnExperiment"}, + ) - expected_args = _get_expected_args_all_parameters_modular_code(processor._current_job_name) + expected_args = _get_expected_args_all_parameters(processor._current_job_name) sklearn_image_uri = ( "246618743249.dkr.ecr.us-west-2.amazonaws.com/sagemaker-scikit-learn:{}-cpu-py3" ).format(sklearn_version) @@ -181,21 +174,18 @@ def test_local_mode_disables_local_code_by_default(localsession_mock): @patch("os.path.exists", return_value=True) @patch("os.path.isfile", return_value=True) def test_sklearn_with_all_parameters_via_run_args( - exists_mock, isfile_mock, botocore_resolver, sklearn_version, sagemaker_session, uploaded_code + exists_mock, isfile_mock, botocore_resolver, sklearn_version, sagemaker_session ): botocore_resolver.return_value.construct_endpoint.return_value = {"hostname": ECR_HOSTNAME} - custom_command = ["Rscript"] processor = SKLearnProcessor( role=ROLE, framework_version=sklearn_version, - command=custom_command, instance_type="ml.m4.xlarge", - instance_count=2, + instance_count=1, volume_size_in_gb=100, volume_kms_key="arn:aws:kms:us-west-2:012345678901:key/volume-kms-key", output_kms_key="arn:aws:kms:us-west-2:012345678901:key/output-kms-key", - code_location=MOCKED_S3_URI, max_runtime_in_seconds=3600, base_job_name="my_sklearn_processor", env={"my_env_variable": "my_env_variable_value"}, @@ -209,32 +199,24 @@ def test_sklearn_with_all_parameters_via_run_args( sagemaker_session=sagemaker_session, ) - with patch("sagemaker.estimator.tar_and_upload_dir", return_value=uploaded_code): - run_args = processor.get_run_args( - code="processing_code.py", - source_dir="/local/path/to/source_dir", - dependencies=["/local/path/to/dep_01"], - git_config=None, - inputs=_get_data_inputs_all_parameters(), - outputs=_get_data_outputs_all_parameters(), - arguments=["--drop-columns", "'SelfEmployed'"], - ) - - processor.run( - code=run_args.code, - inputs=run_args.inputs, - outputs=run_args.outputs, - arguments=run_args.arguments, - wait=True, - logs=False, - experiment_config={"ExperimentName": "AnExperiment"}, - ) + run_args = processor.get_run_args( + code="/local/path/to/processing_code.py", + inputs=_get_data_inputs_all_parameters(), + outputs=_get_data_outputs_all_parameters(), + arguments=["--drop-columns", "'SelfEmployed'"], + ) - expected_args = _get_expected_args_all_parameters_modular_code( - processor._current_job_name, - instance_count=2, - code_s3_prefix=run_args.code.replace("/runproc.sh", ""), + processor.run( + code=run_args.code, + inputs=run_args.inputs, + outputs=run_args.outputs, + arguments=run_args.arguments, + wait=True, + logs=False, + experiment_config={"ExperimentName": "AnExperiment"}, ) + + expected_args = _get_expected_args_all_parameters(processor._current_job_name) sklearn_image_uri = ( "246618743249.dkr.ecr.us-west-2.amazonaws.com/sagemaker-scikit-learn:{}-cpu-py3" ).format(sklearn_version) @@ -242,22 +224,12 @@ def test_sklearn_with_all_parameters_via_run_args( sagemaker_session.process.assert_called_with(**expected_args) - # Verify the alternate command was applied successfully: - framework_script = processor._generate_framework_script("processing_code.py") - expected_invocation = f"{' '.join(custom_command)} processing_code.py" - assert ( - f"\n{expected_invocation}" in framework_script - ), "Framework script should contain customized invocation:\n{}\n\nGot:\n{}".format( - expected_invocation, - framework_script, - ) - @patch("sagemaker.utils._botocore_resolver") @patch("os.path.exists", return_value=True) @patch("os.path.isfile", return_value=True) def test_sklearn_with_all_parameters_via_run_args_called_twice( - exists_mock, isfile_mock, botocore_resolver, sklearn_version, sagemaker_session, uploaded_code + exists_mock, isfile_mock, botocore_resolver, sklearn_version, sagemaker_session ): botocore_resolver.return_value.construct_endpoint.return_value = {"hostname": ECR_HOSTNAME} @@ -269,7 +241,6 @@ def test_sklearn_with_all_parameters_via_run_args_called_twice( volume_size_in_gb=100, volume_kms_key="arn:aws:kms:us-west-2:012345678901:key/volume-kms-key", output_kms_key="arn:aws:kms:us-west-2:012345678901:key/output-kms-key", - code_location=MOCKED_S3_URI, max_runtime_in_seconds=3600, base_job_name="my_sklearn_processor", env={"my_env_variable": "my_env_variable_value"}, @@ -283,27 +254,12 @@ def test_sklearn_with_all_parameters_via_run_args_called_twice( sagemaker_session=sagemaker_session, ) - with patch("sagemaker.estimator.tar_and_upload_dir", return_value=uploaded_code): - run_args = processor.get_run_args( - code="processing_code.py", - source_dir="/local/path/to/source_dir", - dependencies=["/local/path/to/dep_01"], - git_config=None, - inputs=_get_data_inputs_all_parameters(), - outputs=_get_data_outputs_all_parameters(), - arguments=["--drop-columns", "'SelfEmployed'"], - ) - run_args = processor.get_run_args( code="/local/path/to/processing_code.py", - source_dir=None, - dependencies=None, - git_config=None, inputs=_get_data_inputs_all_parameters(), outputs=_get_data_outputs_all_parameters(), arguments=["--drop-columns", "'SelfEmployed'"], ) - processor.run( code=run_args.code, inputs=run_args.inputs, @@ -314,10 +270,8 @@ def test_sklearn_with_all_parameters_via_run_args_called_twice( experiment_config={"ExperimentName": "AnExperiment"}, ) - expected_args = _get_expected_args_all_parameters_modular_code( - processor._current_job_name, - code_s3_prefix=run_args.code.replace("/runproc.sh", ""), - ) + expected_args = _get_expected_args_all_parameters(processor._current_job_name) + sklearn_image_uri = ( "246618743249.dkr.ecr.us-west-2.amazonaws.com/sagemaker-scikit-learn:{}-cpu-py3" ).format(sklearn_version) @@ -853,7 +807,7 @@ def _get_script_processor(sagemaker_session): ) -def _get_expected_args(job_name, code_s3_uri=f"s3://{BUCKET_NAME}"): +def _get_expected_args(job_name, code_s3_uri="s3://mocked_s3_uri_from_upload_data"): return { "inputs": [ {