-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Accept step object in dependson list #2504
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 28 commits
868db55
21bedbb
854dd10
8798b65
69ae4bd
a6a8449
2aa256e
06557a8
dcbfd13
fc6522e
b6f9371
86fa47d
05ccfa6
0716e9f
7ca5af4
8cf18b8
1f95b82
1b9d66b
5bc47bd
603b934
664f2a8
a8755ec
e25d36c
a9cfab4
c0066ea
e9ac9fa
eb6a523
450e4a5
3b75821
0eaf41b
dad08c4
5c45e2c
a7780ce
65f9426
e3579c5
cd1f731
a4d684a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -63,12 +63,13 @@ class Step(Entity): | |
Attributes: | ||
name (str): The name of the step. | ||
step_type (StepTypeEnum): The type of the step. | ||
depends_on (List[str]): The list of step names the current step depends on | ||
depends_on (List[str] or List[Step]): The list of step names or step | ||
instances the current step depends on | ||
""" | ||
|
||
name: str = attr.ib(factory=str) | ||
step_type: StepTypeEnum = attr.ib(factory=StepTypeEnum.factory) | ||
depends_on: List[str] = attr.ib(default=None) | ||
depends_on: Union[List[str], List["Step"]] = attr.ib(default=None) | ||
|
||
@property | ||
@abc.abstractmethod | ||
|
@@ -88,11 +89,13 @@ def to_request(self) -> RequestType: | |
"Arguments": self.arguments, | ||
} | ||
if self.depends_on: | ||
request_dict["DependsOn"] = self.depends_on | ||
request_dict["DependsOn"] = self._resolve_depends_on(self.depends_on) | ||
|
||
return request_dict | ||
|
||
def add_depends_on(self, step_names: List[str]): | ||
"""Add step names to the current step depends on list""" | ||
def add_depends_on(self, step_names: Union[List[str], List["Step"]]): | ||
"""Add step names or step instances to the current step depends on list""" | ||
|
||
if not step_names: | ||
return | ||
|
||
|
@@ -105,6 +108,19 @@ def ref(self) -> Dict[str, str]: | |
"""Gets a reference dict for steps""" | ||
return {"Name": self.name} | ||
|
||
@staticmethod | ||
def _resolve_depends_on(depends_on_list: Union[List[str], List["Step"]]): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. List["Step"] -> List[Step] There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in py37 in order to infer type to self class, we need to do |
||
"""Resolver the step depends on list""" | ||
depends_on = [] | ||
for step in depends_on_list: | ||
if isinstance(step, Step): | ||
depends_on.append(step.name) | ||
elif isinstance(step, str): | ||
depends_on.append(step) | ||
else: | ||
raise ValueError(f"Invalid input step name: {step}") | ||
return depends_on | ||
|
||
|
||
@attr.s | ||
class CacheConfig: | ||
|
@@ -147,7 +163,7 @@ def __init__( | |
estimator: EstimatorBase, | ||
inputs: Union[TrainingInput, dict, str, FileSystemInput] = None, | ||
cache_config: CacheConfig = None, | ||
depends_on: List[str] = None, | ||
depends_on: Union[List[str], List[Step]] = None, | ||
): | ||
"""Construct a TrainingStep, given an `EstimatorBase` instance. | ||
|
||
|
@@ -175,8 +191,8 @@ def __init__( | |
the path to the training dataset. | ||
|
||
cache_config (CacheConfig): A `sagemaker.workflow.steps.CacheConfig` instance. | ||
depends_on (List[str]): A list of step names this `sagemaker.workflow.steps.TrainingStep` | ||
depends on | ||
depends_on (List[str] or List[Step]): A list of step names or step instances | ||
this `sagemaker.workflow.steps.TrainingStep` depends on | ||
""" | ||
super(TrainingStep, self).__init__(name, StepTypeEnum.TRAINING, depends_on) | ||
self.estimator = estimator | ||
|
@@ -221,7 +237,11 @@ class CreateModelStep(Step): | |
"""CreateModel step for workflow.""" | ||
|
||
def __init__( | ||
self, name: str, model: Model, inputs: CreateModelInput, depends_on: List[str] = None | ||
self, | ||
name: str, | ||
model: Model, | ||
inputs: CreateModelInput, | ||
depends_on: Union[List[str], List[Step]] = None, | ||
): | ||
"""Construct a CreateModelStep, given an `sagemaker.model.Model` instance. | ||
|
||
|
@@ -233,8 +253,8 @@ def __init__( | |
model (Model): A `sagemaker.model.Model` instance. | ||
inputs (CreateModelInput): A `sagemaker.inputs.CreateModelInput` instance. | ||
Defaults to `None`. | ||
depends_on (List[str]): A list of step names this `sagemaker.workflow.steps.CreateModelStep` | ||
depends on | ||
depends_on (List[str] or List[Step]): A list of step names or step instances | ||
this `sagemaker.workflow.steps.CreateModelStep` depends on | ||
""" | ||
super(CreateModelStep, self).__init__(name, StepTypeEnum.CREATE_MODEL, depends_on) | ||
self.model = model | ||
|
@@ -279,7 +299,7 @@ def __init__( | |
transformer: Transformer, | ||
inputs: TransformInput, | ||
cache_config: CacheConfig = None, | ||
depends_on: List[str] = None, | ||
depends_on: Union[List[str], List[Step]] = None, | ||
): | ||
"""Constructs a TransformStep, given an `Transformer` instance. | ||
|
||
|
@@ -291,8 +311,8 @@ def __init__( | |
transformer (Transformer): A `sagemaker.transformer.Transformer` instance. | ||
inputs (TransformInput): A `sagemaker.inputs.TransformInput` instance. | ||
cache_config (CacheConfig): A `sagemaker.workflow.steps.CacheConfig` instance. | ||
depends_on (List[str]): A list of step names this `sagemaker.workflow.steps.TransformStep` | ||
depends on | ||
depends_on (List[str] or List[Step]): A list of step names or step instances | ||
this `sagemaker.workflow.steps.TransformStep` depends on | ||
""" | ||
super(TransformStep, self).__init__(name, StepTypeEnum.TRANSFORM, depends_on) | ||
self.transformer = transformer | ||
|
@@ -355,7 +375,7 @@ def __init__( | |
code: str = None, | ||
property_files: List[PropertyFile] = None, | ||
cache_config: CacheConfig = None, | ||
depends_on: List[str] = None, | ||
depends_on: Union[List[str], List[Step]] = None, | ||
): | ||
"""Construct a ProcessingStep, given a `Processor` instance. | ||
|
||
|
@@ -376,8 +396,8 @@ def __init__( | |
property_files (List[PropertyFile]): A list of property files that workflow looks | ||
for and resolves from the configured processing output list. | ||
cache_config (CacheConfig): A `sagemaker.workflow.steps.CacheConfig` instance. | ||
depends_on (List[str]): A list of step names this `sagemaker.workflow.steps.ProcessingStep` | ||
depends on | ||
depends_on (List[str] or List[Step]): A list of step names or step instance | ||
this `sagemaker.workflow.steps.ProcessingStep` depends on | ||
""" | ||
super(ProcessingStep, self).__init__(name, StepTypeEnum.PROCESSING, depends_on) | ||
self.processor = processor | ||
|
@@ -445,7 +465,7 @@ def __init__( | |
inputs=None, | ||
job_arguments: List[str] = None, | ||
cache_config: CacheConfig = None, | ||
depends_on: List[str] = None, | ||
depends_on: Union[List[str], List[Step]] = None, | ||
): | ||
"""Construct a TuningStep, given a `HyperparameterTuner` instance. | ||
|
||
|
@@ -485,8 +505,8 @@ def __init__( | |
job_arguments (List[str]): A list of strings to be passed into the processing job. | ||
Defaults to `None`. | ||
cache_config (CacheConfig): A `sagemaker.workflow.steps.CacheConfig` instance. | ||
depends_on (List[str]): A list of step names this `sagemaker.workflow.steps.ProcessingStep` | ||
depends on | ||
depends_on (List[str] or List[Step]): A list of step names or step instance | ||
this `sagemaker.workflow.steps.ProcessingStep` depends on | ||
""" | ||
super(TuningStep, self).__init__(name, StepTypeEnum.TUNING, depends_on) | ||
self.tuner = tuner | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
List["Step"] -> List[Step]