Skip to content

Add README for airflow #507

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ Table of Contents
10. `SageMaker Batch Transform <#sagemaker-batch-transform>`__
11. `Secure Training and Inference with VPC <#secure-training-and-inference-with-vpc>`__
12. `BYO Model <#byo-model>`__
13. `SageMaker Workflow <#sagemaker-workflow>`__


Installing the SageMaker Python SDK
Expand Down Expand Up @@ -706,3 +707,13 @@ After that, invoke the ``deploy()`` method on the ``Model``:
This returns a predictor the same way an ``Estimator`` does when ``deploy()`` is called. You can now get inferences just like with any other model deployed on Amazon SageMaker.

A full example is available in the `Amazon SageMaker examples repository <https://github.com/awslabs/amazon-sagemaker-examples/tree/master/advanced_functionality/mxnet_mnist_byom>`__.


SageMaker Workflow
------------------

You can use Apache Airflow to author, schedule and monitor SageMaker workflow.

For more information, see `SageMaker Workflow in Apache Airflow`_.

.. _SageMaker Workflow in Apache Airflow: src/sagemaker/workflow/README.rst
162 changes: 162 additions & 0 deletions src/sagemaker/workflow/README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
====================================
SageMaker Workflow in Apache Airflow
====================================

Apache Airflow
~~~~~~~~~~~~~~

`Apache Airflow <https://airflow.apache.org/index.html>`_
is a platform that enables you to programmatically author, schedule, and monitor workflows. Using Airflow,
you can build a workflow for SageMaker training, hyperparameter tuning, batch transform and endpoint deployment.
You can use any SageMaker deep learning framework or Amazon algorithms to perform above operations in Airflow.

There are two ways to build a SageMaker workflow. Using Airflow SageMaker operators or using Airflow PythonOperator.

1. SageMaker Operators: In Airflow 1.10.1, the SageMaker team contributed special operators for SageMaker operations.
Each operator takes a configuration dictionary that defines the corresponding operation. We provide APIs to generate
the configuration dictionary in the SageMaker Python SDK. Currently, the following SageMaker operators are supported:

* ``SageMakerTrainingOperator``
* ``SageMakerTuningOperator``
* ``SageMakerModelOperator``
* ``SageMakerTransformOperator``
* ``SageMakerEndpointConfigOperator``
* ``SageMakerEndpointOperator``

2. PythonOperator: Airflow built-in operator that executes Python callables. You can use the PythonOperator to execute
operations in the SageMaker Python SDK to create a SageMaker workflow.

Using Airflow on AWS
~~~~~~~~~~~~~~~~~~~~

Turbine is an open-source AWS CloudFormation template that enables you to create an Airflow resource stack on AWS.
You can get it here: https://github.com/villasv/aws-airflow-stack

Using Airflow SageMaker Operators
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Starting with Airflow 1.10.1, you can use SageMaker operators in Airflow. All SageMaker operators take a configuration
dictionary that can be generated by the SageMaker Python SDK. For example:

.. code:: python

import sagemaker
from sagemaker.tensorflow import TensorFlow
from sagemaker.workflow.airflow import training_config, transform_config_from_estimator

estimator = TensorFlow(entry_point='tf_train.py',
role='sagemaker-role',
framework_version='1.11.0',
training_steps=1000,
evaluation_steps=100,
train_instance_count=2,
train_instance_type='ml.p2.xlarge')

# train_config specifies SageMaker training configuration
train_config = training_config(estimator=estimator,
inputs=your_training_data_s3_uri)

# trans_config specifies SageMaker batch transform configuration
trans_config = transform_config_from_estimator(estimator=estimator,
instance_count=1,
instance_type='ml.m4.xlarge',
data=your_transform_data_s3_uri,
content_type='text/csv')

Now you can pass these configurations to the corresponding SageMaker operators and create the workflow:

.. code:: python

import airflow
from airflow import DAG
from airflow.contrib.operators.sagemaker_training_operator import SageMakerTrainingOperator
from airflow.contrib.operators.sagemaker_transform_operator import SageMakerTransformOperator

default_args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2),
'provide_context': True
}

dag = DAG('tensorflow_example', default_args=default_args,
schedule_interval='@once')

train_op = SageMakerTrainingOperator(
task_id='training',
config=train_config,
wait_for_completion=True,
dag=dag)

transform_op = SageMakerTransformOperator(
task_id='transform',
config=trans_config,
wait_for_completion=True,
dag=dag)

transform_op.set_upstream(train_op)

Using Airflow Python Operator
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

`Airflow PythonOperator <https://airflow.apache.org/howto/operator.html?#pythonoperator>`_
is a built-in operator that can execute any Python callable. If you want to build the SageMaker workflow in a more
flexible way, write your python callables for SageMaker operations by using the SageMaker Python SDK.

.. code:: python

from sagemaker.tensorflow import TensorFlow

# callable for SageMaker training in TensorFlow
def train(data, **context):
estimator = TensorFlow(entry_point='tf_train.py',
role='sagemaker-role',
framework_version='1.11.0',
training_steps=1000,
evaluation_steps=100,
train_instance_count=2,
train_instance_type='ml.p2.xlarge')
estimator.fit(data)
return estimator.latest_training_job.job_name

# callable for SageMaker batch transform
def transform(data, **context):
training_job = context['ti'].xcom_pull(task_ids='training')
estimator = TensorFlow.attach(training_job)
transformer = estimator.transformer(instance_count=1, instance_type='ml.c4.xlarge')
transformer.transform(data, content_type='text/csv')

Then build your workflow by using the PythonOperator with the Python callables defined above:

.. code:: python

import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

default_args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2),
'provide_context': True
}

dag = DAG('tensorflow_example', default_args=default_args,
schedule_interval='@once')

train_op = PythonOperator(
task_id='training',
python_callable=train,
op_args=[training_data_s3_uri],
provide_context=True,
dag=dag)

transform_op = PythonOperator(
task_id='transform',
python_callable=transform,
op_args=[transform_data_s3_uri],
provide_context=True,
dag=dag)

transform_op.set_upstream(train_op)

A workflow that runs a SageMaker training job and a batch transform job is finished. You can customize your Python
callables with the SageMaker Python SDK according to your needs, and build more flexible and powerful workflows.