Skip to content

Commit a4fb9a4

Browse files
committed
Add README for airflow
1 parent 9c27fd9 commit a4fb9a4

File tree

1 file changed

+162
-0
lines changed

1 file changed

+162
-0
lines changed

src/sagemaker/workflow/README.rst

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
=============================
2+
SageMaker Workflow in Airflow
3+
=============================
4+
5+
Apache Airflow
6+
~~~~~~~~~~~~~~
7+
8+
`Apache Airflow <https://airflow.apache.org/index.html>`_
9+
is a platform that enables you to programmatically author, schedule, and monitor workflows. Using Airflow,
10+
you can build a workflow for SageMaker training, hyperparameter tuning, batch transform and endpoint deployment.
11+
You can use any SageMaker deep learning framework or Amazon algorithms to perform above operations in Airflow.
12+
13+
There are two ways to build SageMaker workflow. Using Airflow SageMaker operators or using Airflow PythonOperator.
14+
15+
1. SageMaker Operators: Since Airflow 1.10.1, we contributed special operators just for SageMaker operations.
16+
Each operator takes a configuration dictionary that defines the corresponding operation. And we provide APIs to
17+
generate the configuration dictionary in SageMaker Python SDK. Current supported SageMaker operators are:
18+
19+
* ``SageMakerTrainingOperator``
20+
* ``SageMakerTuningOperator``
21+
* ``SageMakerModelOperator``
22+
* ``SageMakerTransformOperator``
23+
* ``SageMakerEndpointConfigOperator``
24+
* ``SageMakerEndpointOperator``
25+
26+
2. PythonOperator: Airflow built-in operator that could execute Python callables. You could use SageMaker Python SDK to
27+
customize your workflow with PythonOperator.
28+
29+
Using Airflow on AWS
30+
~~~~~~~~~~~~~~~~~~~~
31+
32+
Turbine is an open source AWS CloudFormation template to create Airflow resources stack on AWS.
33+
You can get it here: https://github.com/villasv/aws-airflow-stack
34+
35+
Using Airflow SageMaker Operators
36+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
37+
38+
Starting from Airflow 1.10.1, you could use SageMaker operators in Airflow. All SageMaker operators take a
39+
configuration dictionary that can be easily generated by SageMaker Python SDK. For example:
40+
41+
.. code:: python
42+
43+
import sagemaker
44+
from sagemaker.tensorflow import TensorFlow
45+
from sagemaker.workflow.airflow import training_config, transform_config_from_estimator
46+
47+
estimator = TensorFlow(entry_point='tf_train.py',
48+
role='sagemaker-role',
49+
framework_version='1.11.0',
50+
training_steps=1000,
51+
evaluation_steps=100,
52+
train_instance_count=2,
53+
train_instance_type='ml.p2.xlarge')
54+
55+
# train_config specifies SageMaker training configuration
56+
train_config = training_config(estimator=estimator,
57+
inputs=your_training_data_s3_uri)
58+
59+
# trans_config specifies SageMaker batch transform configuration
60+
trans_config = transform_config_from_estimator(estimator=estimator,
61+
instance_count=1,
62+
instance_type='ml.m4.xlarge',
63+
data=your_transform_data_s3_uri,
64+
content_type='text/csv')
65+
66+
Now we can pass these configurations to related SageMaker operators and create the workflow:
67+
68+
.. code:: python
69+
70+
import airflow
71+
from airflow import DAG
72+
from airflow.contrib.operators.sagemaker_training_operator import SageMakerTrainingOperator
73+
from airflow.contrib.operators.sagemaker_transform_operator import SageMakerTransformOperator
74+
75+
default_args = {
76+
'owner': 'airflow',
77+
'start_date': airflow.utils.dates.days_ago(2),
78+
'provide_context': True
79+
}
80+
81+
dag = DAG('tensorflow_example', default_args=default_args,
82+
schedule_interval='@once')
83+
84+
train_op = SageMakerTrainingOperator(
85+
task_id='training',
86+
config=train_config,
87+
wait_for_completion=True,
88+
dag=dag)
89+
90+
transform_op = SageMakerTransformOperator(
91+
task_id='transform',
92+
config=trans_config,
93+
wait_for_completion=True,
94+
dag=dag)
95+
96+
transform_op.set_upstream(train_op)
97+
98+
Using Airflow Python Operator
99+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
100+
101+
`Airflow PythonOperator <https://airflow.apache.org/howto/operator.html?#pythonoperator>`_
102+
is a built-in operator that can execute any Python callables. If you want to build the SageMaker workflow in a more
103+
flexible way, you could write your python callables for SageMaker operations using SageMaker Python SDK. For example:
104+
105+
.. code:: python
106+
107+
from sagemaker.tensorflow import TensorFlow
108+
109+
# callable for SageMaker training in TensorFlow
110+
def train(data, **context):
111+
estimator = TensorFlow(entry_point='tf_train.py',
112+
role='sagemaker-role',
113+
framework_version='1.11.0',
114+
training_steps=1000,
115+
evaluation_steps=100,
116+
train_instance_count=2,
117+
train_instance_type='ml.p2.xlarge')
118+
estimator.fit(data)
119+
return estimator.latest_training_job.job_name
120+
121+
# callable for SageMaker batch transform
122+
def transform(data, **context):
123+
training_job = context['ti'].xcom_pull(task_ids='training')
124+
estimator = TensorFlow.attach(training_job)
125+
transformer = estimator.transformer(instance_count=1, instance_type='ml.c4.xlarge')
126+
transformer.transform(data, content_type='text/csv')
127+
128+
Then you could build your workflow using PythonOperator with Python callables defined above:
129+
130+
.. code:: python
131+
132+
import airflow
133+
from airflow import DAG
134+
from airflow.operators.python_operator import PythonOperator
135+
136+
default_args = {
137+
'owner': 'airflow',
138+
'start_date': airflow.utils.dates.days_ago(2),
139+
'provide_context': True
140+
}
141+
142+
dag = DAG('tensorflow_example', default_args=default_args,
143+
schedule_interval='@once')
144+
145+
train_op = PythonOperator(
146+
task_id='training',
147+
python_callable=train,
148+
op_args=[training_data_s3_uri],
149+
provide_context=True,
150+
dag=dag)
151+
152+
transform_op = PythonOperator(
153+
task_id='transform',
154+
python_callable=transform,
155+
op_args=[transform_data_s3_uri],
156+
provide_context=True,
157+
dag=dag)
158+
159+
transform_op.set_upstream(train_op)
160+
161+
A workflow with SageMaker training and batch transform is finished! In this way, you could customize your Python
162+
callables with SageMaker Python SDK according to your needs and build more flexible and powerful workflow.

0 commit comments

Comments
 (0)