Skip to content

Added handler for pipeline variable while creating process job #5122

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
Apr 10, 2025
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
25f16ef
change: Allow telemetry only in supported regions
Jan 29, 2025
0ed85d6
change: Allow telemetry only in supported regions
Jan 29, 2025
b69ffcb
change: Allow telemetry only in supported regions
Jan 29, 2025
8d7f4a8
change: Allow telemetry only in supported regions
Jan 29, 2025
9321367
Merge branch 'aws:master' into rsareddy-dev
rsareddy0329 Jan 29, 2025
f972222
Merge branch 'aws:master' into rsareddy-dev
rsareddy0329 Jan 30, 2025
dadbb22
change: Allow telemetry only in supported regions
Jan 30, 2025
28b3fe8
Merge branch 'aws:master' into rsareddy-dev
rsareddy0329 Feb 23, 2025
fe64f82
Merge branch 'aws:master' into rsareddy-dev
rsareddy0329 Feb 24, 2025
7775c63
documentation: Removed a line about python version requirements of tr…
Feb 24, 2025
acc861a
Merge branch 'master' into rsareddy-dev
rsareddy0329 Feb 24, 2025
16dc02b
Merge branch 'aws:master' into rsareddy-dev
rsareddy0329 Mar 10, 2025
06597c6
Merge branch 'aws:master' into rsareddy-dev
rsareddy0329 Mar 11, 2025
249872d
Merge branch 'aws:master' into rsareddy-dev
rsareddy0329 Mar 12, 2025
58f8746
feature: Enabled update_endpoint through model_builder
Mar 12, 2025
c6bad70
Merge branch 'aws:master' into rsareddy-dev
rsareddy0329 Mar 12, 2025
0bf6404
fix: fix unit test, black-check, pylint errors
Mar 12, 2025
c67d7df
fix: fix black-check, pylint errors
Mar 12, 2025
1f84662
Merge branch 'aws:master' into rsareddy-dev
rsareddy0329 Mar 13, 2025
ea1810b
Merge branch 'aws:master' into rsareddy-dev
rsareddy0329 Mar 14, 2025
6079269
Merge branch 'aws:master' into rsareddy-dev
rsareddy0329 Mar 14, 2025
c9fcefb
Merge branch 'aws:master' into rsareddy-dev
rsareddy0329 Mar 17, 2025
16b6f0c
Merge branch 'aws:master' into rsareddy-dev
rsareddy0329 Apr 8, 2025
89e18a9
fix:Added handler for pipeline variable while creating process job
Apr 8, 2025
10d4c4f
Merge branch 'aws:master' into rsareddy-dev
rsareddy0329 Apr 9, 2025
7f15e19
fix: Added handler for pipeline variable while creating process job
Apr 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 42 additions & 1 deletion src/sagemaker/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,10 @@
)
from sagemaker.workflow import is_pipeline_variable
from sagemaker.workflow.entities import PipelineVariable
from sagemaker.workflow.execution_variables import ExecutionVariables
from sagemaker.workflow.execution_variables import ExecutionVariable, ExecutionVariables
from sagemaker.workflow.functions import Join
from sagemaker.workflow.pipeline_context import runnable_by_pipeline
from sagemaker.workflow.parameters import Parameter

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -314,6 +315,15 @@ def _normalize_args(
"code argument has to be a valid S3 URI or local file path "
+ "rather than a pipeline variable"
)
if arguments is not None:
normalized_arguments = []
for arg in arguments:
if isinstance(arg, PipelineVariable):
normalized_value = self._normalize_pipeline_variable(arg)
normalized_arguments.append(normalized_value)
else:
normalized_arguments.append(str(arg))
arguments = normalized_arguments

self._current_job_name = self._generate_current_job_name(job_name=job_name)

Expand Down Expand Up @@ -499,6 +509,37 @@ def _normalize_outputs(self, outputs=None):
normalized_outputs.append(output)
return normalized_outputs

def _normalize_pipeline_variable(self, value):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add data types and return types

"""Helper function to normalize PipelineVariable objects"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does normalize mean in this case ?

Is this like a to_string ? We can probably rename this .

try:
if isinstance(value, Parameter):
return str(value.default_value) if value.default_value is not None else None

elif isinstance(value, ExecutionVariable):
return f"{value.name}"

elif isinstance(value, Join):
normalized_values = [
normalize_pipeline_variable(v) if isinstance(v, PipelineVariable) else str(v)
for v in value.values
]
return value.on.join(normalized_values)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this function is only called on value being PipelineVariable right ?
Why do we need the rest of the If conditions ?


elif isinstance(value, PipelineVariable):
if hasattr(value, 'default_value'):
return str(value.default_value)
elif hasattr(value, 'expr'):
return str(value.expr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we instead overload the __str__ function within the class ?


return str(value)

except AttributeError as e:
raise ValueError(f"Missing required attribute while normalizing {type(value).__name__}: {e}")
except TypeError as e:
raise ValueError(f"Type error while normalizing {type(value).__name__}: {e}")
except Exception as e:
raise ValueError(f"Error normalizing {type(value).__name__}: {e}")


class ScriptProcessor(Processor):
"""Handles Amazon SageMaker processing tasks for jobs using a machine learning framework."""
Expand Down
255 changes: 254 additions & 1 deletion tests/unit/test_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@
from sagemaker.fw_utils import UploadedCode
from sagemaker.workflow.pipeline_context import PipelineSession, _PipelineConfig
from sagemaker.workflow.functions import Join
from sagemaker.workflow.execution_variables import ExecutionVariables
from sagemaker.workflow.execution_variables import ExecutionVariable, ExecutionVariables
from tests.unit import SAGEMAKER_CONFIG_PROCESSING_JOB
from sagemaker.workflow.parameters import ParameterString, Parameter

BUCKET_NAME = "mybucket"
REGION = "us-west-2"
Expand Down Expand Up @@ -1717,3 +1718,255 @@ def _get_describe_response_inputs_and_ouputs():
"ProcessingInputs": _get_expected_args_all_parameters(None)["inputs"],
"ProcessingOutputConfig": _get_expected_args_all_parameters(None)["output_config"],
}

# Parameters
def _get_data_inputs_with_parameters():
return [
ProcessingInput(
source=ParameterString(
name="input_data",
default_value="s3://dummy-bucket/input"
),
destination="/opt/ml/processing/input",
input_name="input-1"
)
]


def _get_data_outputs_with_parameters():
return [
ProcessingOutput(
source="/opt/ml/processing/output",
destination=ParameterString(
name="output_data",
default_value="s3://dummy-bucket/output"
),
output_name="output-1"
)
]


def _get_expected_args_with_parameters(job_name):
return {
"inputs": [{
"InputName": "input-1",
"S3Input": {
"S3Uri": "s3://dummy-bucket/input",
"LocalPath": "/opt/ml/processing/input",
"S3DataType": "S3Prefix",
"S3InputMode": "File",
"S3DataDistributionType": "FullyReplicated",
"S3CompressionType": "None"
}
}],
"output_config": {
"Outputs": [{
"OutputName": "output-1",
"S3Output": {
"S3Uri": "s3://dummy-bucket/output",
"LocalPath": "/opt/ml/processing/output",
"S3UploadMode": "EndOfJob"
}
}]
},
"job_name": job_name,
"resources": {
"ClusterConfig": {
"InstanceType": "ml.m4.xlarge",
"InstanceCount": 1,
"VolumeSizeInGB": 100,
"VolumeKmsKeyId": "arn:aws:kms:us-west-2:012345678901:key/volume-kms-key"
}
},
"stopping_condition": {"MaxRuntimeInSeconds": 3600},
"app_specification": {
"ImageUri": "custom-image-uri",
"ContainerArguments": [
"--input-data",
"s3://dummy-bucket/input-param",
"--output-path",
"s3://dummy-bucket/output-param"
],
"ContainerEntrypoint": ["python3"]
},
"environment": {"my_env_variable": "my_env_variable_value"},
"network_config": {
"EnableNetworkIsolation": True,
"EnableInterContainerTrafficEncryption": True,
"VpcConfig": {
"Subnets": ["my_subnet_id"],
"SecurityGroupIds": ["my_security_group_id"]
}
},
"role_arn": "dummy/role",
"tags": [{"Key": "my-tag", "Value": "my-tag-value"}],
"experiment_config": {"ExperimentName": "AnExperiment"}
}


@patch("os.path.exists", return_value=True)
@patch("os.path.isfile", return_value=True)
@patch("sagemaker.utils.repack_model")
@patch("sagemaker.utils.create_tar_file")
@patch("sagemaker.session.Session.upload_data")
def test_script_processor_with_parameter_string(
upload_data_mock,
create_tar_file_mock,
repack_model_mock,
exists_mock,
isfile_mock,
sagemaker_session,
):
"""Test ScriptProcessor with ParameterString arguments"""
upload_data_mock.return_value = "s3://mocked_s3_uri_from_upload_data"

# Setup processor
processor = ScriptProcessor(
role="arn:aws:iam::012345678901:role/SageMakerRole", # Updated role ARN
image_uri="custom-image-uri",
command=["python3"],
instance_type="ml.m4.xlarge",
instance_count=1,
volume_size_in_gb=100,
volume_kms_key="arn:aws:kms:us-west-2:012345678901:key/volume-kms-key",
output_kms_key="arn:aws:kms:us-west-2:012345678901:key/output-kms-key",
max_runtime_in_seconds=3600,
base_job_name="test_processor",
env={"my_env_variable": "my_env_variable_value"},
tags=[{"Key": "my-tag", "Value": "my-tag-value"}],
network_config=NetworkConfig(
subnets=["my_subnet_id"],
security_group_ids=["my_security_group_id"],
enable_network_isolation=True,
encrypt_inter_container_traffic=True,
),
sagemaker_session=sagemaker_session,
)

input_param = ParameterString(
name="input_param",
default_value="s3://dummy-bucket/input-param"
)
output_param = ParameterString(
name="output_param",
default_value="s3://dummy-bucket/output-param"
)
exec_var = ExecutionVariable(
name="ExecutionTest"
)
join_var = Join(
on="/",
values=["s3://bucket", "prefix", "file.txt"]
)
dummy_str_var = "test-variable"

# Define expected arguments
expected_args = {
"inputs": [
{
"InputName": "input-1",
"AppManaged": False,
"S3Input": {
"S3Uri": ParameterString(
name="input_data",
default_value="s3://dummy-bucket/input"
),
"LocalPath": "/opt/ml/processing/input",
"S3DataType": "S3Prefix",
"S3InputMode": "File",
"S3DataDistributionType": "FullyReplicated",
"S3CompressionType": "None"
}
},
{
"InputName": "code",
"AppManaged": False,
"S3Input": {
"S3Uri": "s3://mocked_s3_uri_from_upload_data",
"LocalPath": "/opt/ml/processing/input/code",
"S3DataType": "S3Prefix",
"S3InputMode": "File",
"S3DataDistributionType": "FullyReplicated",
"S3CompressionType": "None"
}
}
],
"output_config": {
"Outputs": [
{
"OutputName": "output-1",
"AppManaged": False,
"S3Output": {
"S3Uri": ParameterString(
name="output_data",
default_value="s3://dummy-bucket/output"
),
"LocalPath": "/opt/ml/processing/output",
"S3UploadMode": "EndOfJob"
}
}
],
"KmsKeyId": "arn:aws:kms:us-west-2:012345678901:key/output-kms-key"
},
"job_name": "test_job",
"resources": {
"ClusterConfig": {
"InstanceType": "ml.m4.xlarge",
"InstanceCount": 1,
"VolumeSizeInGB": 100,
"VolumeKmsKeyId": "arn:aws:kms:us-west-2:012345678901:key/volume-kms-key"
}
},
"stopping_condition": {"MaxRuntimeInSeconds": 3600},
"app_specification": {
"ImageUri": "custom-image-uri",
"ContainerArguments": [
"--input-data",
"s3://dummy-bucket/input-param",
"--output-path",
"s3://dummy-bucket/output-param",
"--exec-arg", "ExecutionTest",
"--join-arg", "s3://bucket/prefix/file.txt",
"--string-param", "test-variable"
],
"ContainerEntrypoint": ["python3", "/opt/ml/processing/input/code/processing_code.py"]
},
"environment": {"my_env_variable": "my_env_variable_value"},
"network_config": {
"EnableNetworkIsolation": True,
"EnableInterContainerTrafficEncryption": True,
"VpcConfig": {
"SecurityGroupIds": ["my_security_group_id"],
"Subnets": ["my_subnet_id"]
}
},
"role_arn": "arn:aws:iam::012345678901:role/SageMakerRole",
"tags": [{"Key": "my-tag", "Value": "my-tag-value"}],
"experiment_config": {"ExperimentName": "AnExperiment"}
}

# Run processor
processor.run(
code="/local/path/to/processing_code.py",
inputs=_get_data_inputs_with_parameters(),
outputs=_get_data_outputs_with_parameters(),
arguments=[
"--input-data",
input_param,
"--output-path",
output_param,
"--exec-arg", exec_var,
"--join-arg", join_var,
"--string-param", dummy_str_var
],
wait=True,
logs=False,
job_name="test_job",
experiment_config={"ExperimentName": "AnExperiment"},
)

# Assert
sagemaker_session.process.assert_called_with(**expected_args)
assert "test_job" in processor._current_job_name


Loading