Skip to content

Add README for airflow #507

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 20, 2018
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 162 additions & 0 deletions src/sagemaker/workflow/README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
=============================
SageMaker Workflow in Airflow

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to change the master readme and add a workflow section?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea! One section added to master readme and linked to this one.

=============================

Apache Airflow
~~~~~~~~~~~~~~

`Apache Airflow <https://airflow.apache.org/index.html>`_
is a platform that enables you to programmatically author, schedule, and monitor workflows. Using Airflow,
you can build a workflow for SageMaker training, hyperparameter tuning, batch transform and endpoint deployment.
You can use any SageMaker deep learning framework or Amazon algorithms to perform above operations in Airflow.

There are two ways to build SageMaker workflow. Using Airflow SageMaker operators or using Airflow PythonOperator.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...to build a SageMaker workflow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.


1. SageMaker Operators: Since Airflow 1.10.1, we contributed special operators just for SageMaker operations.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Airflow 1.10.1, the SageMaker team contributed special operators for SageMaker operations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

Each operator takes a configuration dictionary that defines the corresponding operation. And we provide APIs to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We provide APIs to generate the configuration dictionary in the SageMaker Python SDK. Currently, the following SageMaker operators are supported:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

generate the configuration dictionary in SageMaker Python SDK. Current supported SageMaker operators are:

* ``SageMakerTrainingOperator``
* ``SageMakerTuningOperator``
* ``SageMakerModelOperator``
* ``SageMakerTransformOperator``
* ``SageMakerEndpointConfigOperator``
* ``SageMakerEndpointOperator``

2. PythonOperator: Airflow built-in operator that could execute Python callables. You could use SageMaker Python SDK to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Airflow built-in operator that executes Python callables. You can use the PythonOperator to execute operations in the SageMaker Python SDK to creat a SageMaker workflow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

customize your workflow with PythonOperator.

Using Airflow on AWS
~~~~~~~~~~~~~~~~~~~~

Turbine is an open source AWS CloudFormation template to create Airflow resources stack on AWS.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turbine is an open-source AWS CloudFormation template that enables you to create an Airflow resource stack on AWS.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

You can get it here: https://github.com/villasv/aws-airflow-stack

Using Airflow SageMaker Operators
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Starting from Airflow 1.10.1, you could use SageMaker operators in Airflow. All SageMaker operators take a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Starting wiht Airflow 1.10.1, you can use SageMaker operators in Airflow. All SageMaker operators take a configuration dictionary that can be generated by the SageMaker Python SDK. For example:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

configuration dictionary that can be easily generated by SageMaker Python SDK. For example:

.. code:: python

import sagemaker
from sagemaker.tensorflow import TensorFlow
from sagemaker.workflow.airflow import training_config, transform_config_from_estimator

estimator = TensorFlow(entry_point='tf_train.py',
role='sagemaker-role',
framework_version='1.11.0',
training_steps=1000,
evaluation_steps=100,
train_instance_count=2,
train_instance_type='ml.p2.xlarge')

# train_config specifies SageMaker training configuration
train_config = training_config(estimator=estimator,
inputs=your_training_data_s3_uri)

# trans_config specifies SageMaker batch transform configuration
trans_config = transform_config_from_estimator(estimator=estimator,
instance_count=1,
instance_type='ml.m4.xlarge',
data=your_transform_data_s3_uri,
content_type='text/csv')

Now we can pass these configurations to related SageMaker operators and create the workflow:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now you can pass these configurations to the corresponding SageMaker operators and create the workflow:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.


.. code:: python

import airflow
from airflow import DAG
from airflow.contrib.operators.sagemaker_training_operator import SageMakerTrainingOperator
from airflow.contrib.operators.sagemaker_transform_operator import SageMakerTransformOperator

default_args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2),
'provide_context': True
}

dag = DAG('tensorflow_example', default_args=default_args,
schedule_interval='@once')

train_op = SageMakerTrainingOperator(
task_id='training',
config=train_config,
wait_for_completion=True,
dag=dag)

transform_op = SageMakerTransformOperator(
task_id='transform',
config=trans_config,
wait_for_completion=True,
dag=dag)

transform_op.set_upstream(train_op)

Using Airflow Python Operator
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

`Airflow PythonOperator <https://airflow.apache.org/howto/operator.html?#pythonoperator>`_
is a built-in operator that can execute any Python callables. If you want to build the SageMaker workflow in a more
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...execute any Python callable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

flexible way, you could write your python callables for SageMaker operations using SageMaker Python SDK. For example:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...flexible way, writer your python callables for SageMaker operatoins by using the SageMaker Python SDK.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.


.. code:: python

from sagemaker.tensorflow import TensorFlow

# callable for SageMaker training in TensorFlow
def train(data, **context):
estimator = TensorFlow(entry_point='tf_train.py',
role='sagemaker-role',
framework_version='1.11.0',
training_steps=1000,
evaluation_steps=100,
train_instance_count=2,
train_instance_type='ml.p2.xlarge')
estimator.fit(data)
return estimator.latest_training_job.job_name

# callable for SageMaker batch transform
def transform(data, **context):
training_job = context['ti'].xcom_pull(task_ids='training')
estimator = TensorFlow.attach(training_job)
transformer = estimator.transformer(instance_count=1, instance_type='ml.c4.xlarge')
transformer.transform(data, content_type='text/csv')

Then you could build your workflow using PythonOperator with Python callables defined above:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then build your workflow by using the PythonOperator with the Python callables defined above:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.


.. code:: python

import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

default_args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2),
'provide_context': True
}

dag = DAG('tensorflow_example', default_args=default_args,
schedule_interval='@once')

train_op = PythonOperator(
task_id='training',
python_callable=train,
op_args=[training_data_s3_uri],
provide_context=True,
dag=dag)

transform_op = PythonOperator(
task_id='transform',
python_callable=transform,
op_args=[transform_data_s3_uri],
provide_context=True,
dag=dag)

transform_op.set_upstream(train_op)

A workflow with SageMaker training and batch transform is finished! In this way, you could customize your Python
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A workflow that runs a SageMaker training job and a batch transform job is finished. You can customize your Python callables with the SageMaker Python SDK according to your needs, and build more flexible and powerful workflows.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

callables with SageMaker Python SDK according to your needs and build more flexible and powerful workflow.