diff --git a/src/sagemaker/workflow/execution_variables.py b/src/sagemaker/workflow/execution_variables.py index a69d41492c..e2e2a0a666 100644 --- a/src/sagemaker/workflow/execution_variables.py +++ b/src/sagemaker/workflow/execution_variables.py @@ -13,56 +13,27 @@ """Pipeline parameters and conditions for workflow.""" from __future__ import absolute_import -from typing import Dict - from sagemaker.workflow.entities import ( - Entity, + Expression, RequestType, ) -class ExecutionVariable(Entity, str): +class ExecutionVariable(Expression): """Pipeline execution variables for workflow.""" - def __new__(cls, *args, **kwargs): # pylint: disable=unused-argument - """Subclass str""" - value = "" - if len(args) == 1: - value = args[0] or value - elif kwargs: - value = kwargs.get("name", value) - return str.__new__(cls, ExecutionVariable._expr(value)) - def __init__(self, name: str): """Create a pipeline execution variable. Args: name (str): The name of the execution variable. """ - super(ExecutionVariable, self).__init__() self.name = name - def __hash__(self): - """Hash function for execution variable types""" - return hash(tuple(self.to_request())) - - def to_request(self) -> RequestType: - """Get the request structure for workflow service calls.""" - return self.expr - @property - def expr(self) -> Dict[str, str]: + def expr(self) -> RequestType: """The 'Get' expression dict for an `ExecutionVariable`.""" - return ExecutionVariable._expr(self.name) - - @classmethod - def _expr(cls, name): - """An internal classmethod for the 'Get' expression dict for an `ExecutionVariable`. - - Args: - name (str): The name of the execution variable. - """ - return {"Get": f"Execution.{name}"} + return {"Get": f"Execution.{self.name}"} class ExecutionVariables: diff --git a/src/sagemaker/workflow/pipeline.py b/src/sagemaker/workflow/pipeline.py index d3d42e1b49..e4518ae250 100644 --- a/src/sagemaker/workflow/pipeline.py +++ b/src/sagemaker/workflow/pipeline.py @@ -16,7 +16,7 @@ import json from copy import deepcopy -from typing import Any, Dict, List, Sequence, Union +from typing import Any, Dict, List, Sequence, Union, Optional import attr import botocore @@ -30,7 +30,9 @@ Expression, RequestType, ) +from sagemaker.workflow.execution_variables import ExecutionVariables from sagemaker.workflow.parameters import Parameter +from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig from sagemaker.workflow.properties import Properties from sagemaker.workflow.steps import Step from sagemaker.workflow.step_collections import StepCollection @@ -44,6 +46,12 @@ class Pipeline(Entity): Attributes: name (str): The name of the pipeline. parameters (Sequence[Parameters]): The list of the parameters. + pipeline_experiment_config (Optional[PipelineExperimentConfig]): If set, + the workflow will attempt to create an experiment and trial before + executing the steps. Creation will be skipped if an experiment or a trial with + the same name already exists. By default, pipeline name is used as + experiment name and execution id is used as the trial name. + If set to None, no experiment or trial will be created automatically. steps (Sequence[Steps]): The list of the non-conditional steps associated with the pipeline. Any steps that are within the `if_steps` or `else_steps` of a `ConditionStep` cannot be listed in the steps of a @@ -57,6 +65,11 @@ class Pipeline(Entity): name: str = attr.ib(factory=str) parameters: Sequence[Parameter] = attr.ib(factory=list) + pipeline_experiment_config: Optional[PipelineExperimentConfig] = attr.ib( + default=PipelineExperimentConfig( + ExecutionVariables.PIPELINE_NAME, ExecutionVariables.PIPELINE_EXECUTION_ID + ) + ) steps: Sequence[Union[Step, StepCollection]] = attr.ib(factory=list) sagemaker_session: Session = attr.ib(factory=Session) @@ -69,6 +82,9 @@ def to_request(self) -> RequestType: "Version": self._version, "Metadata": self._metadata, "Parameters": list_to_request(self.parameters), + "PipelineExperimentConfig": self.pipeline_experiment_config.to_request() + if self.pipeline_experiment_config is not None + else None, "Steps": list_to_request(self.steps), } @@ -76,7 +92,6 @@ def create( self, role_arn: str, description: str = None, - experiment_name: str = None, tags: List[Dict[str, str]] = None, ) -> Dict[str, Any]: """Creates a Pipeline in the Pipelines service. @@ -84,7 +99,6 @@ def create( Args: role_arn (str): The role arn that is assumed by the pipeline to create step artifacts. description (str): A description of the pipeline. - experiment_name (str): The name of the experiment. tags (List[Dict[str, str]]): A list of {"Key": "string", "Value": "string"} dicts as tags. @@ -96,7 +110,6 @@ def create( kwargs = self._create_args(role_arn, description) update_args( kwargs, - ExperimentName=experiment_name, Tags=tags, ) return self.sagemaker_session.sagemaker_client.create_pipeline(**kwargs) @@ -106,7 +119,7 @@ def _create_args(self, role_arn: str, description: str): Args: role_arn (str): The role arn that is assumed by pipelines to create step artifacts. - pipeline_description (str): A description of the pipeline. + description (str): A description of the pipeline. Returns: A keyword argument dict for calling create_pipeline. @@ -147,15 +160,13 @@ def upsert( self, role_arn: str, description: str = None, - experiment_name: str = None, tags: List[Dict[str, str]] = None, ) -> Dict[str, Any]: """Creates a pipeline or updates it, if it already exists. Args: role_arn (str): The role arn that is assumed by workflow to create step artifacts. - pipeline_description (str): A description of the pipeline. - experiment_name (str): The name of the experiment. + description (str): A description of the pipeline. tags (List[Dict[str, str]]): A list of {"Key": "string", "Value": "string"} dicts as tags. @@ -163,7 +174,7 @@ def upsert( response dict from service """ try: - response = self.create(role_arn, description, experiment_name, tags) + response = self.create(role_arn, description, tags) except ClientError as e: error = e.response["Error"] if ( @@ -224,6 +235,9 @@ def start( def definition(self) -> str: """Converts a request structure to string representation for workflow service calls.""" request_dict = self.to_request() + request_dict["PipelineExperimentConfig"] = interpolate( + request_dict["PipelineExperimentConfig"] + ) request_dict["Steps"] = interpolate(request_dict["Steps"]) return json.dumps(request_dict) diff --git a/src/sagemaker/workflow/pipeline_experiment_config.py b/src/sagemaker/workflow/pipeline_experiment_config.py new file mode 100644 index 0000000000..dc7b91b9cf --- /dev/null +++ b/src/sagemaker/workflow/pipeline_experiment_config.py @@ -0,0 +1,76 @@ +# Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +"""Pipeline experiment config for SageMaker pipeline.""" +from __future__ import absolute_import + +from typing import Union + +from sagemaker.workflow.parameters import Parameter +from sagemaker.workflow.execution_variables import ExecutionVariable +from sagemaker.workflow.entities import ( + Entity, + Expression, + RequestType, +) + + +class PipelineExperimentConfig(Entity): + """Experiment config for SageMaker pipeline.""" + + def __init__( + self, + experiment_name: Union[str, Parameter, ExecutionVariable, Expression], + trial_name: Union[str, Parameter, ExecutionVariable, Expression], + ): + """Create a PipelineExperimentConfig + + Args: + experiment_name: the name of the experiment that will be created + trial_name: the name of the trial that will be created + """ + self.experiment_name = experiment_name + self.trial_name = trial_name + + def to_request(self) -> RequestType: + """Returns: the request structure.""" + + return { + "ExperimentName": self.experiment_name, + "TrialName": self.trial_name, + } + + +class PipelineExperimentConfigProperty(Expression): + """Reference to pipeline experiment config property.""" + + def __init__(self, name: str): + """Create a reference to pipeline experiment property. + + Args: + name (str): The name of the pipeline experiment config property. + """ + super(PipelineExperimentConfigProperty, self).__init__() + self.name = name + + @property + def expr(self) -> RequestType: + """The 'Get' expression dict for a pipeline experiment config property.""" + + return {"Get": f"PipelineExperimentConfig.{self.name}"} + + +class PipelineExperimentConfigProperties: + """Enum-like class for all pipeline experiment config property references.""" + + EXPERIMENT_NAME = PipelineExperimentConfigProperty("ExperimentName") + TRIAL_NAME = PipelineExperimentConfigProperty("TrialName") diff --git a/tests/integ/test_workflow_experiment.py b/tests/integ/test_workflow_experiment.py new file mode 100644 index 0000000000..9a7b21f334 --- /dev/null +++ b/tests/integ/test_workflow_experiment.py @@ -0,0 +1,219 @@ +# Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +from __future__ import absolute_import + +import os +import time + +import pytest + +from botocore.exceptions import WaiterError +from sagemaker.processing import ProcessingInput +from sagemaker.session import get_execution_role +from sagemaker.sklearn.processing import SKLearnProcessor +from sagemaker.dataset_definition.inputs import DatasetDefinition, AthenaDatasetDefinition +from sagemaker.workflow.execution_variables import ExecutionVariables +from sagemaker.workflow.functions import Join +from sagemaker.workflow.parameters import ( + ParameterInteger, +) +from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig +from sagemaker.workflow.steps import ProcessingStep +from sagemaker.workflow.pipeline import Pipeline +from tests.integ import DATA_DIR + + +@pytest.fixture(scope="module") +def region_name(sagemaker_session): + return sagemaker_session.boto_session.region_name + + +@pytest.fixture(scope="module") +def role(sagemaker_session): + return get_execution_role(sagemaker_session) + + +@pytest.fixture(scope="module") +def script_dir(): + return os.path.join(DATA_DIR, "sklearn_processing") + + +@pytest.fixture +def pipeline_name(): + return f"my-pipeline-{int(time.time() * 10**7)}" + + +@pytest.fixture +def smclient(sagemaker_session): + return sagemaker_session.boto_session.client("sagemaker") + + +@pytest.fixture +def athena_dataset_definition(sagemaker_session): + return DatasetDefinition( + local_path="/opt/ml/processing/input/add", + data_distribution_type="FullyReplicated", + input_mode="File", + athena_dataset_definition=AthenaDatasetDefinition( + catalog="AwsDataCatalog", + database="default", + work_group="workgroup", + query_string='SELECT * FROM "default"."s3_test_table_$STAGE_$REGIONUNDERSCORED";', + output_s3_uri=f"s3://{sagemaker_session.default_bucket()}/add", + output_format="JSON", + output_compression="GZIP", + ), + ) + + +def test_pipeline_execution_with_default_experiment_config( + sagemaker_session, + smclient, + role, + sklearn_latest_version, + cpu_instance_type, + pipeline_name, + athena_dataset_definition, +): + instance_count = ParameterInteger(name="InstanceCount", default_value=2) + script_path = os.path.join(DATA_DIR, "dummy_script.py") + input_file_path = os.path.join(DATA_DIR, "dummy_input.txt") + inputs = [ + ProcessingInput(source=input_file_path, destination="/opt/ml/processing/inputs/"), + ProcessingInput(dataset_definition=athena_dataset_definition), + ] + + sklearn_processor = SKLearnProcessor( + framework_version=sklearn_latest_version, + role=role, + instance_type=cpu_instance_type, + instance_count=instance_count, + command=["python3"], + sagemaker_session=sagemaker_session, + base_job_name="test-sklearn", + ) + + step_sklearn = ProcessingStep( + name="sklearn-process", + processor=sklearn_processor, + inputs=inputs, + code=script_path, + ) + pipeline = Pipeline( + name=pipeline_name, + parameters=[instance_count], + steps=[step_sklearn], + sagemaker_session=sagemaker_session, + ) + + try: + pipeline.create(role) + execution = pipeline.start(parameters={}) + + try: + execution.wait(delay=30, max_attempts=3) + except WaiterError: + pass + execution_steps = execution.list_steps() + assert len(execution_steps) == 1 + assert execution_steps[0]["StepName"] == "sklearn-process" + + execution_id = execution.arn.split("/")[-1] + + # trial components + trial_components = smclient.list_trial_components(TrialName=execution_id) + assert len(trial_components["TrialComponentSummaries"]) == 1 + + # trial details + trial = smclient.describe_trial(TrialName=execution_id) + assert pipeline_name == trial["ExperimentName"] + finally: + try: + pipeline.delete() + except Exception: + pass + + +def test_pipeline_execution_with_custom_experiment_config( + sagemaker_session, + smclient, + role, + sklearn_latest_version, + cpu_instance_type, + pipeline_name, + athena_dataset_definition, +): + instance_count = ParameterInteger(name="InstanceCount", default_value=2) + script_path = os.path.join(DATA_DIR, "dummy_script.py") + input_file_path = os.path.join(DATA_DIR, "dummy_input.txt") + inputs = [ + ProcessingInput(source=input_file_path, destination="/opt/ml/processing/inputs/"), + ProcessingInput(dataset_definition=athena_dataset_definition), + ] + + sklearn_processor = SKLearnProcessor( + framework_version=sklearn_latest_version, + role=role, + instance_type=cpu_instance_type, + instance_count=instance_count, + command=["python3"], + sagemaker_session=sagemaker_session, + base_job_name="test-sklearn", + ) + + step_sklearn = ProcessingStep( + name="sklearn-process", + processor=sklearn_processor, + inputs=inputs, + code=script_path, + ) + + experiment_name = f"my-experiment-{int(time.time() * 10**7)}" + + pipeline = Pipeline( + name=pipeline_name, + parameters=[instance_count], + pipeline_experiment_config=PipelineExperimentConfig( + experiment_name=experiment_name, + trial_name=Join(on="-", values=["my-trial", ExecutionVariables.PIPELINE_EXECUTION_ID]), + ), + steps=[step_sklearn], + sagemaker_session=sagemaker_session, + ) + + try: + pipeline.create(role) + execution = pipeline.start(parameters={}) + + try: + execution.wait(delay=30, max_attempts=3) + except WaiterError: + pass + execution_steps = execution.list_steps() + assert len(execution_steps) == 1 + assert execution_steps[0]["StepName"] == "sklearn-process" + + execution_id = execution.arn.split("/")[-1] + + # trial components + trial_components = smclient.list_trial_components(TrialName=f"my-trial-{execution_id}") + assert len(trial_components["TrialComponentSummaries"]) == 1 + + # trial details + trial = smclient.describe_trial(TrialName=f"my-trial-{execution_id}") + assert experiment_name == trial["ExperimentName"] + finally: + try: + pipeline.delete() + except Exception: + pass diff --git a/tests/unit/sagemaker/workflow/test_execution_variables.py b/tests/unit/sagemaker/workflow/test_execution_variables.py index 9a38ff7712..779b798bb9 100644 --- a/tests/unit/sagemaker/workflow/test_execution_variables.py +++ b/tests/unit/sagemaker/workflow/test_execution_variables.py @@ -18,6 +18,4 @@ def test_execution_variable(): var = ExecutionVariables.START_DATETIME - assert var.to_request() == {"Get": "Execution.StartDateTime"} assert var.expr == {"Get": "Execution.StartDateTime"} - assert isinstance(var, str) diff --git a/tests/unit/sagemaker/workflow/test_pipeline.py b/tests/unit/sagemaker/workflow/test_pipeline.py index c7c6ed7afa..642d11476f 100644 --- a/tests/unit/sagemaker/workflow/test_pipeline.py +++ b/tests/unit/sagemaker/workflow/test_pipeline.py @@ -21,8 +21,13 @@ from mock import Mock +from sagemaker.workflow.execution_variables import ExecutionVariables from sagemaker.workflow.parameters import ParameterString from sagemaker.workflow.pipeline import Pipeline +from sagemaker.workflow.pipeline_experiment_config import ( + PipelineExperimentConfig, + PipelineExperimentConfigProperties, +) from sagemaker.workflow.properties import Properties from sagemaker.workflow.steps import ( Step, @@ -184,6 +189,10 @@ def test_pipeline_basic(): "Version": "2020-12-01", "Metadata": {}, "Parameters": [{"Name": "MyStr", "Type": "String"}], + "PipelineExperimentConfig": { + "ExperimentName": ExecutionVariables.PIPELINE_NAME, + "TrialName": ExecutionVariables.PIPELINE_EXECUTION_ID, + }, "Steps": [{"Name": "MyStep", "Type": "Training", "Arguments": {"input_data": parameter}}], } assert ordered(json.loads(pipeline.definition())) == ordered( @@ -191,6 +200,10 @@ def test_pipeline_basic(): "Version": "2020-12-01", "Metadata": {}, "Parameters": [{"Name": "MyStr", "Type": "String"}], + "PipelineExperimentConfig": { + "ExperimentName": {"Get": "Execution.PipelineName"}, + "TrialName": {"Get": "Execution.PipelineExecutionId"}, + }, "Steps": [ { "Name": "MyStep", @@ -204,8 +217,15 @@ def test_pipeline_basic(): def test_pipeline_two_step(sagemaker_session_mock): parameter = ParameterString("MyStr") - step1 = CustomStep(name="MyStep1", input_data=parameter) - step2 = CustomStep(name="MyStep2", input_data=step1.properties.S3Uri) + step1 = CustomStep( + name="MyStep1", + input_data=[ + parameter, # parameter reference + ExecutionVariables.PIPELINE_EXECUTION_ID, # execution variable + PipelineExperimentConfigProperties.EXPERIMENT_NAME, # experiment config property + ], + ) + step2 = CustomStep(name="MyStep2", input_data=[step1.properties.S3Uri]) # step property pipeline = Pipeline( name="MyPipeline", parameters=[parameter], @@ -216,12 +236,26 @@ def test_pipeline_two_step(sagemaker_session_mock): "Version": "2020-12-01", "Metadata": {}, "Parameters": [{"Name": "MyStr", "Type": "String"}], + "PipelineExperimentConfig": { + "ExperimentName": ExecutionVariables.PIPELINE_NAME, + "TrialName": ExecutionVariables.PIPELINE_EXECUTION_ID, + }, "Steps": [ - {"Name": "MyStep1", "Type": "Training", "Arguments": {"input_data": parameter}}, + { + "Name": "MyStep1", + "Type": "Training", + "Arguments": { + "input_data": [ + parameter, + ExecutionVariables.PIPELINE_EXECUTION_ID, + PipelineExperimentConfigProperties.EXPERIMENT_NAME, + ] + }, + }, { "Name": "MyStep2", "Type": "Training", - "Arguments": {"input_data": step1.properties.S3Uri}, + "Arguments": {"input_data": [step1.properties.S3Uri]}, }, ], } @@ -230,22 +264,80 @@ def test_pipeline_two_step(sagemaker_session_mock): "Version": "2020-12-01", "Metadata": {}, "Parameters": [{"Name": "MyStr", "Type": "String"}], + "PipelineExperimentConfig": { + "ExperimentName": {"Get": "Execution.PipelineName"}, + "TrialName": {"Get": "Execution.PipelineExecutionId"}, + }, "Steps": [ { "Name": "MyStep1", "Type": "Training", - "Arguments": {"input_data": {"Get": "Parameters.MyStr"}}, + "Arguments": { + "input_data": [ + {"Get": "Parameters.MyStr"}, + {"Get": "Execution.PipelineExecutionId"}, + {"Get": "PipelineExperimentConfig.ExperimentName"}, + ] + }, }, { "Name": "MyStep2", "Type": "Training", - "Arguments": {"input_data": {"Get": "Steps.MyStep1.S3Uri"}}, + "Arguments": {"input_data": [{"Get": "Steps.MyStep1.S3Uri"}]}, }, ], } ) +def test_pipeline_override_experiment_config(): + pipeline = Pipeline( + name="MyPipeline", + pipeline_experiment_config=PipelineExperimentConfig("MyExperiment", "MyTrial"), + steps=[CustomStep(name="MyStep", input_data="input")], + sagemaker_session=sagemaker_session_mock, + ) + assert ordered(json.loads(pipeline.definition())) == ordered( + { + "Version": "2020-12-01", + "Metadata": {}, + "Parameters": [], + "PipelineExperimentConfig": {"ExperimentName": "MyExperiment", "TrialName": "MyTrial"}, + "Steps": [ + { + "Name": "MyStep", + "Type": "Training", + "Arguments": {"input_data": "input"}, + } + ], + } + ) + + +def test_pipeline_disable_experiment_config(): + pipeline = Pipeline( + name="MyPipeline", + pipeline_experiment_config=None, + steps=[CustomStep(name="MyStep", input_data="input")], + sagemaker_session=sagemaker_session_mock, + ) + assert ordered(json.loads(pipeline.definition())) == ordered( + { + "Version": "2020-12-01", + "Metadata": {}, + "Parameters": [], + "PipelineExperimentConfig": None, + "Steps": [ + { + "Name": "MyStep", + "Type": "Training", + "Arguments": {"input_data": "input"}, + } + ], + } + ) + + def test_pipeline_execution_basics(sagemaker_session_mock): sagemaker_session_mock.sagemaker_client.start_pipeline_execution.return_value = { "PipelineExecutionArn": "my:arn" diff --git a/tests/unit/sagemaker/workflow/test_pipeline_experiment_config.py b/tests/unit/sagemaker/workflow/test_pipeline_experiment_config.py new file mode 100644 index 0000000000..0c77f56e3d --- /dev/null +++ b/tests/unit/sagemaker/workflow/test_pipeline_experiment_config.py @@ -0,0 +1,29 @@ +# Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +# language governing permissions and limitations under the License. +from __future__ import absolute_import + +from sagemaker.workflow.pipeline_experiment_config import ( + PipelineExperimentConfig, + PipelineExperimentConfigProperties, +) + + +def test_pipeline_experiment_config(): + config = PipelineExperimentConfig("experiment-name", "trial-name") + assert config.to_request() == {"ExperimentName": "experiment-name", "TrialName": "trial-name"} + + +def test_pipeline_experiment_config_property(): + var = PipelineExperimentConfigProperties.EXPERIMENT_NAME + assert var.expr == {"Get": "PipelineExperimentConfig.ExperimentName"}