-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Add README for airflow #507
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,162 @@ | ||
==================================== | ||
SageMaker Workflow in Apache Airflow | ||
==================================== | ||
|
||
Apache Airflow | ||
~~~~~~~~~~~~~~ | ||
|
||
`Apache Airflow <https://airflow.apache.org/index.html>`_ | ||
is a platform that enables you to programmatically author, schedule, and monitor workflows. Using Airflow, | ||
you can build a workflow for SageMaker training, hyperparameter tuning, batch transform and endpoint deployment. | ||
You can use any SageMaker deep learning framework or Amazon algorithms to perform above operations in Airflow. | ||
|
||
There are two ways to build SageMaker workflow. Using Airflow SageMaker operators or using Airflow PythonOperator. | ||
|
||
1. SageMaker Operators: Since Airflow 1.10.1, we contributed special operators just for SageMaker operations. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In Airflow 1.10.1, the SageMaker team contributed special operators for SageMaker operations. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated. |
||
Each operator takes a configuration dictionary that defines the corresponding operation. And we provide APIs to | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We provide APIs to generate the configuration dictionary in the SageMaker Python SDK. Currently, the following SageMaker operators are supported: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated. |
||
generate the configuration dictionary in SageMaker Python SDK. Current supported SageMaker operators are: | ||
|
||
* ``SageMakerTrainingOperator`` | ||
* ``SageMakerTuningOperator`` | ||
* ``SageMakerModelOperator`` | ||
* ``SageMakerTransformOperator`` | ||
* ``SageMakerEndpointConfigOperator`` | ||
* ``SageMakerEndpointOperator`` | ||
|
||
2. PythonOperator: Airflow built-in operator that could execute Python callables. You could use SageMaker Python SDK to | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Airflow built-in operator that executes Python callables. You can use the PythonOperator to execute operations in the SageMaker Python SDK to creat a SageMaker workflow. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated. |
||
customize your workflow with PythonOperator. | ||
|
||
Using Airflow on AWS | ||
~~~~~~~~~~~~~~~~~~~~ | ||
|
||
Turbine is an open source AWS CloudFormation template to create Airflow resources stack on AWS. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Turbine is an open-source AWS CloudFormation template that enables you to create an Airflow resource stack on AWS. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated. |
||
You can get it here: https://github.com/villasv/aws-airflow-stack | ||
|
||
Using Airflow SageMaker Operators | ||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | ||
|
||
Starting from Airflow 1.10.1, you could use SageMaker operators in Airflow. All SageMaker operators take a | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Starting wiht Airflow 1.10.1, you can use SageMaker operators in Airflow. All SageMaker operators take a configuration dictionary that can be generated by the SageMaker Python SDK. For example: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated. |
||
configuration dictionary that can be easily generated by SageMaker Python SDK. For example: | ||
|
||
.. code:: python | ||
|
||
import sagemaker | ||
from sagemaker.tensorflow import TensorFlow | ||
from sagemaker.workflow.airflow import training_config, transform_config_from_estimator | ||
|
||
estimator = TensorFlow(entry_point='tf_train.py', | ||
role='sagemaker-role', | ||
framework_version='1.11.0', | ||
training_steps=1000, | ||
evaluation_steps=100, | ||
train_instance_count=2, | ||
train_instance_type='ml.p2.xlarge') | ||
|
||
# train_config specifies SageMaker training configuration | ||
train_config = training_config(estimator=estimator, | ||
inputs=your_training_data_s3_uri) | ||
|
||
# trans_config specifies SageMaker batch transform configuration | ||
trans_config = transform_config_from_estimator(estimator=estimator, | ||
instance_count=1, | ||
instance_type='ml.m4.xlarge', | ||
data=your_transform_data_s3_uri, | ||
content_type='text/csv') | ||
|
||
Now we can pass these configurations to related SageMaker operators and create the workflow: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now you can pass these configurations to the corresponding SageMaker operators and create the workflow: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated. |
||
|
||
.. code:: python | ||
|
||
import airflow | ||
from airflow import DAG | ||
from airflow.contrib.operators.sagemaker_training_operator import SageMakerTrainingOperator | ||
from airflow.contrib.operators.sagemaker_transform_operator import SageMakerTransformOperator | ||
|
||
default_args = { | ||
'owner': 'airflow', | ||
'start_date': airflow.utils.dates.days_ago(2), | ||
'provide_context': True | ||
} | ||
|
||
dag = DAG('tensorflow_example', default_args=default_args, | ||
schedule_interval='@once') | ||
|
||
train_op = SageMakerTrainingOperator( | ||
task_id='training', | ||
config=train_config, | ||
wait_for_completion=True, | ||
dag=dag) | ||
|
||
transform_op = SageMakerTransformOperator( | ||
task_id='transform', | ||
config=trans_config, | ||
wait_for_completion=True, | ||
dag=dag) | ||
|
||
transform_op.set_upstream(train_op) | ||
|
||
Using Airflow Python Operator | ||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | ||
|
||
`Airflow PythonOperator <https://airflow.apache.org/howto/operator.html?#pythonoperator>`_ | ||
is a built-in operator that can execute any Python callables. If you want to build the SageMaker workflow in a more | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ...execute any Python callable. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated. |
||
flexible way, you could write your python callables for SageMaker operations using SageMaker Python SDK. For example: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ...flexible way, writer your python callables for SageMaker operatoins by using the SageMaker Python SDK. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated. |
||
|
||
.. code:: python | ||
|
||
from sagemaker.tensorflow import TensorFlow | ||
|
||
# callable for SageMaker training in TensorFlow | ||
def train(data, **context): | ||
estimator = TensorFlow(entry_point='tf_train.py', | ||
role='sagemaker-role', | ||
framework_version='1.11.0', | ||
training_steps=1000, | ||
evaluation_steps=100, | ||
train_instance_count=2, | ||
train_instance_type='ml.p2.xlarge') | ||
estimator.fit(data) | ||
return estimator.latest_training_job.job_name | ||
|
||
# callable for SageMaker batch transform | ||
def transform(data, **context): | ||
training_job = context['ti'].xcom_pull(task_ids='training') | ||
estimator = TensorFlow.attach(training_job) | ||
transformer = estimator.transformer(instance_count=1, instance_type='ml.c4.xlarge') | ||
transformer.transform(data, content_type='text/csv') | ||
|
||
Then you could build your workflow using PythonOperator with Python callables defined above: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Then build your workflow by using the PythonOperator with the Python callables defined above: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated. |
||
|
||
.. code:: python | ||
|
||
import airflow | ||
from airflow import DAG | ||
from airflow.operators.python_operator import PythonOperator | ||
|
||
default_args = { | ||
'owner': 'airflow', | ||
'start_date': airflow.utils.dates.days_ago(2), | ||
'provide_context': True | ||
} | ||
|
||
dag = DAG('tensorflow_example', default_args=default_args, | ||
schedule_interval='@once') | ||
|
||
train_op = PythonOperator( | ||
task_id='training', | ||
python_callable=train, | ||
op_args=[training_data_s3_uri], | ||
provide_context=True, | ||
dag=dag) | ||
|
||
transform_op = PythonOperator( | ||
task_id='transform', | ||
python_callable=transform, | ||
op_args=[transform_data_s3_uri], | ||
provide_context=True, | ||
dag=dag) | ||
|
||
transform_op.set_upstream(train_op) | ||
|
||
A workflow with SageMaker training and batch transform is finished! In this way, you could customize your Python | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A workflow that runs a SageMaker training job and a batch transform job is finished. You can customize your Python callables with the SageMaker Python SDK according to your needs, and build more flexible and powerful workflows. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated. |
||
callables with SageMaker Python SDK according to your needs and build more flexible and powerful workflow. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
...to build a SageMaker workflow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.