Skip to content

Accept step object in dependson list #2504

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
868db55
add helper function to generate no-op (data ingestion only) recipe
jerrypeng7773 May 11, 2021
21bedbb
Merge branch 'aws:master' into master
jerrypeng7773 May 11, 2021
854dd10
separate flow generation by source input type + move generation helpe…
jerrypeng7773 May 11, 2021
8798b65
Merge branch 'aws:master' into master
jerrypeng7773 May 11, 2021
69ae4bd
create an internal helper function to generate output node
jerrypeng7773 May 12, 2021
a6a8449
Merge branch 'master' of github.com:jerrypeng7773/sagemaker-python-sdk
jerrypeng7773 May 12, 2021
2aa256e
Merge branch 'aws:master' into master
jerrypeng7773 May 18, 2021
06557a8
add ingestion test using dw processor via pipeline execution
jerrypeng7773 May 19, 2021
dcbfd13
Merge branch 'aws:master' into master
jerrypeng7773 May 19, 2021
fc6522e
verify the fg query df
jerrypeng7773 May 19, 2021
b6f9371
Merge branch 'master' into master
ahsan-z-khan May 19, 2021
86fa47d
fix tests
jerrypeng7773 May 19, 2021
05ccfa6
Merge branch 'master' into master
ahsan-z-khan May 20, 2021
0716e9f
Merge branch 'aws:master' into master
jerrypeng7773 Jun 14, 2021
7ca5af4
add tuning step support
jerrypeng7773 Jun 24, 2021
8cf18b8
fix docstyle check
jerrypeng7773 Jun 24, 2021
1f95b82
add helper function to get tuning step top performing model s3 uri
jerrypeng7773 Jun 29, 2021
1b9d66b
Merge branch 'aws:master' into master
jerrypeng7773 Jun 30, 2021
5bc47bd
allow step depends on pass in step instance
jerrypeng7773 Jun 30, 2021
603b934
Merge branch 'aws:master' into master
jerrypeng7773 Jun 30, 2021
664f2a8
Merge branch 'master' of github.com:jerrypeng7773/sagemaker-python-sdk
jerrypeng7773 Jun 30, 2021
a8755ec
Merge branch 'master' into master
apogupta2018 Jul 1, 2021
e25d36c
Merge branch 'aws:master' into master
jerrypeng7773 Jul 1, 2021
a9cfab4
Merge branch 'master' into accept-step-object-in-dependson-list
jerrypeng7773 Jul 1, 2021
c0066ea
resolve merge conflict
jerrypeng7773 Jul 1, 2021
e9ac9fa
support passing step object to tuning step depends on list
jerrypeng7773 Jul 1, 2021
eb6a523
fix test_workflow_with_clarify
jerrypeng7773 Jul 1, 2021
450e4a5
allow step instance in depends on list for repack and reigster model …
jerrypeng7773 Jul 6, 2021
3b75821
Merge branch 'master' into accept-step-object-in-dependson-list
ahsan-z-khan Jul 12, 2021
0eaf41b
Merge branch 'master' of https://github.com/aws/sagemaker-python-sdk …
jerrypeng7773 Jul 14, 2021
dad08c4
fix formatting
jerrypeng7773 Jul 14, 2021
5c45e2c
Merge branch 'master' into accept-step-object-in-dependson-list
ahsan-z-khan Jul 18, 2021
a7780ce
Merge branch 'master' into accept-step-object-in-dependson-list
shreyapandit Jul 20, 2021
65f9426
Merge branch 'master' into accept-step-object-in-dependson-list
shreyapandit Aug 2, 2021
e3579c5
Merge branch 'master' into accept-step-object-in-dependson-list
ahsan-z-khan Aug 2, 2021
cd1f731
Merge branch 'master' into accept-step-object-in-dependson-list
ahsan-z-khan Aug 3, 2021
a4d684a
Merge branch 'master' into accept-step-object-in-dependson-list
shreyapandit Aug 3, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion doc/workflows/pipelines/sagemaker.workflow.pipelines.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ ConditionStep
-------------

.. autoclass:: sagemaker.workflow.condition_step.ConditionStep

.. autoclass:: sagemaker.workflow.condition_step.JsonGet

Conditions
Expand Down
42 changes: 23 additions & 19 deletions src/sagemaker/workflow/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
import shutil
import tarfile
import tempfile
from typing import List

from typing import List, Union
from sagemaker import image_uris
from sagemaker.inputs import TrainingInput
from sagemaker.s3 import (
Expand Down Expand Up @@ -61,7 +60,7 @@ def __init__(
entry_point: str,
source_dir: str = None,
dependencies: List = None,
depends_on: List[str] = None,
depends_on: Union[List[str], List[Step]] = None,
**kwargs,
):
"""Constructs a TrainingStep, given an `EstimatorBase` instance.
Expand Down Expand Up @@ -230,7 +229,7 @@ def __init__(
image_uri=None,
compile_model_family=None,
description=None,
depends_on: List[str] = None,
depends_on: Union[List[str], List[Step]] = None,
tags=None,
container_def_list=None,
**kwargs,
Expand All @@ -239,30 +238,35 @@ def __init__(

Args:
name (str): The name of the training step.
step_type (StepTypeEnum): The type of the step with value `StepTypeEnum.Training`.
step_type (StepTypeEnum): The type of the step with value
`StepTypeEnum.Training`.
estimator (EstimatorBase): A `sagemaker.estimator.EstimatorBase` instance.
model_data: the S3 URI to the model data from training.
content_types (list): The supported MIME types for the input data (default: None).
response_types (list): The supported MIME types for the output data (default: None).
content_types (list): The supported MIME types for the
input data (default: None).
response_types (list): The supported MIME types for
the output data (default: None).
inference_instances (list): A list of the instance types that are used to
generate inferences in real-time (default: None).
transform_instances (list): A list of the instance types on which a transformation
job can be run or on which an endpoint can be deployed (default: None).
transform_instances (list): A list of the instance types on which a
transformation job can be run or on which an endpoint
can be deployed (default: None).
model_package_group_name (str): Model Package Group name, exclusive to
`model_package_name`, using `model_package_group_name` makes the Model Package
versioned (default: None).
`model_package_name`, using `model_package_group_name`
makes the Model Package versioned (default: None).
model_metrics (ModelMetrics): ModelMetrics object (default: None).
metadata_properties (MetadataProperties): MetadataProperties object (default: None).
approval_status (str): Model Approval Status, values can be "Approved", "Rejected",
or "PendingManualApproval" (default: "PendingManualApproval").
metadata_properties (MetadataProperties): MetadataProperties object
(default: None).
approval_status (str): Model Approval Status, values can be "Approved",
"Rejected", or "PendingManualApproval"
(default: "PendingManualApproval").
image_uri (str): The container image uri for Model Package, if not specified,
Estimator's training container image will be used (default: None).
compile_model_family (str): Instance family for compiled model, if specified, a compiled
model will be used (default: None).
compile_model_family (str): Instance family for compiled model,
if specified, a compiled model will be used (default: None).
description (str): Model Package description (default: None).
depends_on (List[str]): A list of step names this `sagemaker.workflow.steps.TrainingStep`
depends on
container_def_list (list): A list of containers.
depends_on (List[str] or List[Step]): A list of step names or instances
this step depends on
**kwargs: additional arguments to `create_model`.
"""
super(_RegisterModelStep, self).__init__(name, StepTypeEnum.REGISTER_MODEL, depends_on)
Expand Down
8 changes: 4 additions & 4 deletions src/sagemaker/workflow/callback_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"""The step definitions for workflow."""
from __future__ import absolute_import

from typing import List, Dict
from typing import List, Dict, Union
from enum import Enum

import attr
Expand Down Expand Up @@ -84,7 +84,7 @@ def __init__(
inputs: dict,
outputs: List[CallbackOutput],
cache_config: CacheConfig = None,
depends_on: List[str] = None,
depends_on: Union[List[str], List[Step]] = None,
):
"""Constructs a CallbackStep.

Expand All @@ -95,8 +95,8 @@ def __init__(
in the SQS message body of callback messages.
outputs (List[CallbackOutput]): Outputs that can be provided when completing a callback.
cache_config (CacheConfig): A `sagemaker.workflow.steps.CacheConfig` instance.
depends_on (List[str]): A list of step names this `sagemaker.workflow.steps.TransformStep`
depends on
depends_on (List[str] or List[Step]): A list of step names or step instances
this `sagemaker.workflow.steps.CallbackStep` depends on
"""
super(CallbackStep, self).__init__(name, StepTypeEnum.CALLBACK, depends_on)
self.sqs_queue_url = sqs_queue_url
Expand Down
14 changes: 7 additions & 7 deletions src/sagemaker/workflow/condition_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@
import attr

from sagemaker.workflow.conditions import Condition
from sagemaker.workflow.steps import (
Step,
StepTypeEnum,
)
from sagemaker.workflow.step_collections import StepCollection
from sagemaker.workflow.utilities import list_to_request
from sagemaker.workflow.entities import (
Expression,
RequestType,
Expand All @@ -26,12 +32,6 @@
Properties,
PropertyFile,
)
from sagemaker.workflow.steps import (
Step,
StepTypeEnum,
)
from sagemaker.workflow.step_collections import StepCollection
from sagemaker.workflow.utilities import list_to_request


class ConditionStep(Step):
Expand All @@ -40,7 +40,7 @@ class ConditionStep(Step):
def __init__(
self,
name: str,
depends_on: List[str] = None,
depends_on: Union[List[str], List[Step]] = None,
conditions: List[Condition] = None,
if_steps: List[Union[Step, StepCollection]] = None,
else_steps: List[Union[Step, StepCollection]] = None,
Expand Down
14 changes: 7 additions & 7 deletions src/sagemaker/workflow/step_collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"""The step definitions for workflow."""
from __future__ import absolute_import

from typing import List
from typing import List, Union

import attr

Expand Down Expand Up @@ -61,7 +61,7 @@ def __init__(
transform_instances,
estimator: EstimatorBase = None,
model_data=None,
depends_on: List[str] = None,
depends_on: Union[List[str], List[Step]] = None,
model_package_group_name=None,
model_metrics=None,
approval_status=None,
Expand All @@ -84,8 +84,8 @@ def __init__(
generate inferences in real-time (default: None).
transform_instances (list): A list of the instance types on which a transformation
job can be run or on which an endpoint can be deployed (default: None).
depends_on (List[str]): The list of step names the first step in the collection
depends on
depends_on (List[str] or List[Step]): The list of step names or step instances
the first step in the collection depends on
model_package_group_name (str): The Model Package Group name, exclusive to
`model_package_name`, using `model_package_group_name` makes the Model Package
versioned (default: None).
Expand Down Expand Up @@ -226,7 +226,7 @@ def __init__(
max_payload=None,
tags=None,
volume_kms_key=None,
depends_on: List[str] = None,
depends_on: Union[List[str], List[Step]] = None,
**kwargs,
):
"""Construct steps required for a Transformer step collection:
Expand Down Expand Up @@ -263,8 +263,8 @@ def __init__(
it will be the format of the batch transform output.
env (dict): The Environment variables to be set for use during the
transform job (default: None).
depends_on (List[str]): The list of step names the first step in
the collection depends on
depends_on (List[str] or List[Step]): The list of step names or step instances
the first step in the collection depends on
"""
steps = []
if "entry_point" in kwargs:
Expand Down
62 changes: 41 additions & 21 deletions src/sagemaker/workflow/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,13 @@ class Step(Entity):
Attributes:
name (str): The name of the step.
step_type (StepTypeEnum): The type of the step.
depends_on (List[str]): The list of step names the current step depends on
depends_on (List[str] or List[Step]): The list of step names or step
instances the current step depends on
"""

name: str = attr.ib(factory=str)
step_type: StepTypeEnum = attr.ib(factory=StepTypeEnum.factory)
depends_on: List[str] = attr.ib(default=None)
depends_on: Union[List[str], List["Step"]] = attr.ib(default=None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

List["Step"] -> List[Step]


@property
@abc.abstractmethod
Expand All @@ -88,11 +89,13 @@ def to_request(self) -> RequestType:
"Arguments": self.arguments,
}
if self.depends_on:
request_dict["DependsOn"] = self.depends_on
request_dict["DependsOn"] = self._resolve_depends_on(self.depends_on)

return request_dict

def add_depends_on(self, step_names: List[str]):
"""Add step names to the current step depends on list"""
def add_depends_on(self, step_names: Union[List[str], List["Step"]]):
"""Add step names or step instances to the current step depends on list"""

if not step_names:
return

Expand All @@ -105,6 +108,19 @@ def ref(self) -> Dict[str, str]:
"""Gets a reference dict for steps"""
return {"Name": self.name}

@staticmethod
def _resolve_depends_on(depends_on_list: Union[List[str], List["Step"]]):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

List["Step"] -> List[Step]

Copy link
Contributor Author

@jerrypeng7773 jerrypeng7773 Jul 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in py37 in order to infer type to self class, we need to do from __future__ import annotations, and this is not supported in py36. The pythonSDK supports both 37 and 36, so I think we need to stick with string "Step" for a while.

https://stackoverflow.com/questions/33533148/how-do-i-type-hint-a-method-with-the-type-of-the-enclosing-class

"""Resolver the step depends on list"""
depends_on = []
for step in depends_on_list:
if isinstance(step, Step):
depends_on.append(step.name)
elif isinstance(step, str):
depends_on.append(step)
else:
raise ValueError(f"Invalid input step name: {step}")
return depends_on


@attr.s
class CacheConfig:
Expand Down Expand Up @@ -147,7 +163,7 @@ def __init__(
estimator: EstimatorBase,
inputs: Union[TrainingInput, dict, str, FileSystemInput] = None,
cache_config: CacheConfig = None,
depends_on: List[str] = None,
depends_on: Union[List[str], List[Step]] = None,
):
"""Construct a TrainingStep, given an `EstimatorBase` instance.

Expand Down Expand Up @@ -175,8 +191,8 @@ def __init__(
the path to the training dataset.

cache_config (CacheConfig): A `sagemaker.workflow.steps.CacheConfig` instance.
depends_on (List[str]): A list of step names this `sagemaker.workflow.steps.TrainingStep`
depends on
depends_on (List[str] or List[Step]): A list of step names or step instances
this `sagemaker.workflow.steps.TrainingStep` depends on
"""
super(TrainingStep, self).__init__(name, StepTypeEnum.TRAINING, depends_on)
self.estimator = estimator
Expand Down Expand Up @@ -221,7 +237,11 @@ class CreateModelStep(Step):
"""CreateModel step for workflow."""

def __init__(
self, name: str, model: Model, inputs: CreateModelInput, depends_on: List[str] = None
self,
name: str,
model: Model,
inputs: CreateModelInput,
depends_on: Union[List[str], List[Step]] = None,
):
"""Construct a CreateModelStep, given an `sagemaker.model.Model` instance.

Expand All @@ -233,8 +253,8 @@ def __init__(
model (Model): A `sagemaker.model.Model` instance.
inputs (CreateModelInput): A `sagemaker.inputs.CreateModelInput` instance.
Defaults to `None`.
depends_on (List[str]): A list of step names this `sagemaker.workflow.steps.CreateModelStep`
depends on
depends_on (List[str] or List[Step]): A list of step names or step instances
this `sagemaker.workflow.steps.CreateModelStep` depends on
"""
super(CreateModelStep, self).__init__(name, StepTypeEnum.CREATE_MODEL, depends_on)
self.model = model
Expand Down Expand Up @@ -279,7 +299,7 @@ def __init__(
transformer: Transformer,
inputs: TransformInput,
cache_config: CacheConfig = None,
depends_on: List[str] = None,
depends_on: Union[List[str], List[Step]] = None,
):
"""Constructs a TransformStep, given an `Transformer` instance.

Expand All @@ -291,8 +311,8 @@ def __init__(
transformer (Transformer): A `sagemaker.transformer.Transformer` instance.
inputs (TransformInput): A `sagemaker.inputs.TransformInput` instance.
cache_config (CacheConfig): A `sagemaker.workflow.steps.CacheConfig` instance.
depends_on (List[str]): A list of step names this `sagemaker.workflow.steps.TransformStep`
depends on
depends_on (List[str] or List[Step]): A list of step names or step instances
this `sagemaker.workflow.steps.TransformStep` depends on
"""
super(TransformStep, self).__init__(name, StepTypeEnum.TRANSFORM, depends_on)
self.transformer = transformer
Expand Down Expand Up @@ -355,7 +375,7 @@ def __init__(
code: str = None,
property_files: List[PropertyFile] = None,
cache_config: CacheConfig = None,
depends_on: List[str] = None,
depends_on: Union[List[str], List[Step]] = None,
):
"""Construct a ProcessingStep, given a `Processor` instance.

Expand All @@ -376,8 +396,8 @@ def __init__(
property_files (List[PropertyFile]): A list of property files that workflow looks
for and resolves from the configured processing output list.
cache_config (CacheConfig): A `sagemaker.workflow.steps.CacheConfig` instance.
depends_on (List[str]): A list of step names this `sagemaker.workflow.steps.ProcessingStep`
depends on
depends_on (List[str] or List[Step]): A list of step names or step instance
this `sagemaker.workflow.steps.ProcessingStep` depends on
"""
super(ProcessingStep, self).__init__(name, StepTypeEnum.PROCESSING, depends_on)
self.processor = processor
Expand Down Expand Up @@ -445,7 +465,7 @@ def __init__(
inputs=None,
job_arguments: List[str] = None,
cache_config: CacheConfig = None,
depends_on: List[str] = None,
depends_on: Union[List[str], List[Step]] = None,
):
"""Construct a TuningStep, given a `HyperparameterTuner` instance.

Expand Down Expand Up @@ -485,8 +505,8 @@ def __init__(
job_arguments (List[str]): A list of strings to be passed into the processing job.
Defaults to `None`.
cache_config (CacheConfig): A `sagemaker.workflow.steps.CacheConfig` instance.
depends_on (List[str]): A list of step names this `sagemaker.workflow.steps.ProcessingStep`
depends on
depends_on (List[str] or List[Step]): A list of step names or step instance
this `sagemaker.workflow.steps.ProcessingStep` depends on
"""
super(TuningStep, self).__init__(name, StepTypeEnum.TUNING, depends_on)
self.tuner = tuner
Expand Down Expand Up @@ -539,7 +559,7 @@ def to_request(self) -> RequestType:

return request_dict

def get_top_model_s3_uri(self, top_k: int, s3_bucket: str, prefix: str = ""):
def get_top_model_s3_uri(self, top_k: int, s3_bucket: str, prefix: str = "") -> Join:
"""Get the model artifact s3 uri from the top performing training jobs.

Args:
Expand Down
2 changes: 1 addition & 1 deletion tests/integ/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1501,7 +1501,7 @@ def test_two_processing_job_depends_on(

step_pyspark_2 = ProcessingStep(
name="pyspark-process-2",
depends_on=[step_pyspark_1.name],
depends_on=[step_pyspark_1],
processor=pyspark_processor,
inputs=spark_run_args.inputs,
outputs=spark_run_args.outputs,
Expand Down
Loading