-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Added handler for pipeline variable while creating process job #5122
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 24 commits
25f16ef
0ed85d6
b69ffcb
8d7f4a8
9321367
f972222
dadbb22
28b3fe8
fe64f82
7775c63
acc861a
16dc02b
06597c6
249872d
58f8746
c6bad70
0bf6404
c67d7df
1f84662
ea1810b
6079269
c9fcefb
16b6f0c
89e18a9
10d4c4f
7f15e19
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -60,9 +60,10 @@ | |
) | ||
from sagemaker.workflow import is_pipeline_variable | ||
from sagemaker.workflow.entities import PipelineVariable | ||
from sagemaker.workflow.execution_variables import ExecutionVariables | ||
from sagemaker.workflow.execution_variables import ExecutionVariable, ExecutionVariables | ||
from sagemaker.workflow.functions import Join | ||
from sagemaker.workflow.pipeline_context import runnable_by_pipeline | ||
from sagemaker.workflow.parameters import Parameter | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
@@ -314,6 +315,15 @@ def _normalize_args( | |
"code argument has to be a valid S3 URI or local file path " | ||
+ "rather than a pipeline variable" | ||
) | ||
if arguments is not None: | ||
normalized_arguments = [] | ||
for arg in arguments: | ||
if isinstance(arg, PipelineVariable): | ||
normalized_value = self._normalize_pipeline_variable(arg) | ||
normalized_arguments.append(normalized_value) | ||
else: | ||
normalized_arguments.append(str(arg)) | ||
arguments = normalized_arguments | ||
|
||
self._current_job_name = self._generate_current_job_name(job_name=job_name) | ||
|
||
|
@@ -499,6 +509,37 @@ def _normalize_outputs(self, outputs=None): | |
normalized_outputs.append(output) | ||
return normalized_outputs | ||
|
||
def _normalize_pipeline_variable(self, value): | ||
"""Helper function to normalize PipelineVariable objects""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What does normalize mean in this case ? Is this like a |
||
try: | ||
if isinstance(value, Parameter): | ||
return str(value.default_value) if value.default_value is not None else None | ||
|
||
elif isinstance(value, ExecutionVariable): | ||
return f"{value.name}" | ||
|
||
elif isinstance(value, Join): | ||
normalized_values = [ | ||
normalize_pipeline_variable(v) if isinstance(v, PipelineVariable) else str(v) | ||
for v in value.values | ||
] | ||
return value.on.join(normalized_values) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So this function is only called on value being |
||
|
||
elif isinstance(value, PipelineVariable): | ||
if hasattr(value, 'default_value'): | ||
return str(value.default_value) | ||
elif hasattr(value, 'expr'): | ||
return str(value.expr) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we instead overload the |
||
|
||
return str(value) | ||
|
||
except AttributeError as e: | ||
raise ValueError(f"Missing required attribute while normalizing {type(value).__name__}: {e}") | ||
except TypeError as e: | ||
raise ValueError(f"Type error while normalizing {type(value).__name__}: {e}") | ||
except Exception as e: | ||
raise ValueError(f"Error normalizing {type(value).__name__}: {e}") | ||
|
||
|
||
class ScriptProcessor(Processor): | ||
"""Handles Amazon SageMaker processing tasks for jobs using a machine learning framework.""" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add data types and return types