What is Apache Airflow?
Apache Airflowis an open source data workflow management project originally created at AirBnb in 2014. In terms of data workflows it covers, we can think about the following sample use cases:
:rocket: Automate Training, Testing and Deploying a Machine Learning Model
Ingesting Data from Multiple REST APIs
:vertical_traffic_light: Coordinating Extraction, Transformation and Loading (ETL) or Extraction, Loading and Transformation (ELT) Processes Across an Enterprise Data Lake
As we can see, one of the main features of Airflow is its flexibility: it can be used for many different data workflow scenarios. Due to this aspect and to its rich feature set, it has gained significant traction over the years, having been battle tested in many companies, from startups to Fortune 500 enterprises. Some examples include Spotify, Twitter, Walmart, Slack, Robinhood, Reddit, PayPal, Lyft, and of course, AirBnb .
OK, But Why?
Feature Set
- Apache Airflow works with the concept of Directed Acyclic Graphs (DAGs) , which are a powerful way of defining dependencies across different types of tasks. In Apache Airflow , DAGs are developed in Python , which unlocks many interesting features from software engineering: modularity, reusability, readability , among others.
- Sensors , Hooks and Operators are the main building blocks of Apache Airflow. They provide an easy and flexible way of connecting to multiple external resources. Want to integrate with Amazon S3 ? Maybe you also want to add Azure Container Instances to the picture in order to run some short-lived Docker containers? Perhaps running a batch workload in an Apache Spark or Databricks cluster? Or maybe just executing some basic Python code to connect with REST APIs ? Airflow ships with multiple operators, hooks and sensors out of the box, which allow for easy integration with these resources, and many more, such as: DockerOperator , BashOperator , HiveOperator , JDBCOperator — the list goes on . You can also build upon one of the standard operators and create your own. Or you can simply write your own operators, hooks and sensors from scratch.
- The UI allows for quick and easy monitoring and management of your Airflow instance. Detailed logs also make it easier to debug your DAGs .
…and there are many more. Personally, I believe one of the fun parts of working with Airflow is discovering new and exciting features as you use it — and if you miss something, you might as well create it.
- It is part of the Apache Foundation , and the community behind it is pretty active — currently there are more than a hundred direct contributors . One might argue that Open Source projects always run the risk of dying at some point — but with a vibrant developer community we can say this risk is mitigated. In fact, 2020 has seen individual contributions for Airflow at an all-time high.
Getting Started
Time to get our hands dirty.
There are multiple ways of installing Apache Airflow. In this introduction we will cover the easiest one, which is by installing it from the PyPi repository.
Basic Requirements
- Python 3.6+
- Pip
- Linux/Mac OS — for those running Windows , activate and install Windows Subsystem for Linux (WSL) , download Ubuntu 18 LTS from the Windows Marketplace and be happy :)
Initial Setup
- Create a new directory for your Airflow project (e.g. “ airflow-intro ”)
- From your new directory, create and activate a new virtual environment for your Airflow project using venv
# Run this from newly created directory to create the venv python3 -m venv venv# Activate your venv source venv/bin/activate
- Install apache-airflow through pip
pip install apache-airflow
Before proceeding, it is important to discuss a bit about Airflow’s main component: the Executor . The name is pretty self-explanatory: this component handles the coordination and execution of different tasks across multiple DAGs .
There are many types of Executors in Apache Airflow, such as the SingleExecutor , LocalExecutor , CeleryExecutor , DaskExecutor and others. For the sake of this tutorial, we will focus on the SingleExecutor . It presents very basic functionality and has a main limitation, which is the fact that it cannot execute tasks in parallel. This is due to the fact that it leverages a SQLite database as the backend (which can only handle one connection at a time), hence multithreading is not supported. Therefore it is not recommended for a Production setup, but it should not be an issue for our case.
Going back to our example, we need to initialize our backend database. But before that, we must override our AIRFLOW_HOME environment variable, so that we specify that our current directory will be used for running Airflow.
export AIRFLOW_HOME=$(pwd)
Now we can initialize our Airflow database. We can do this by simply executing the following:
airflow initdb
Take a look at the output and make sure that no error messages are displayed. If everything went well, you should now see the following files in your directory:
- To confirm if the initialization is correct, quickly inspect airflow.cfg and confirm if the following lines correctly point to your work directory in the [core] section. If they do, you should be good to go.
Optional: Airflow ships with many sample DAGs, which might help you get up to speed with how they work. While it is certainly helpful, it can make your UI convoluted. You can set load_examples to False , so that you will see only your own DAGs in the Airflow’s UI.
DAG Creation
We will start with a really basic DAG, which will do two simple tasks:
- Create a text file
- Rename this text file
To get started, create a new Python script file named simple_bash_dag.py inside your dags folder. In this script, we must first import some modules:
# Python standard modules from datetime import datetime, timedelta# Airflow modules from airflow import DAG from airflow.operators.bash_operator import BashOperator
We now proceed to creating a DAG object. In order to do that, we must specify some basic parameters, such as: when will it become active, which intervals do we want it to run, how many retries should be made in case any of its tasks fail, and others. So let’s define these parameters:
default_args = { 'owner': 'airflow', 'depends_on_past': False, # Start on 27th of June, 2020 'start_date': datetime(2020, 6, 27), 'email': ['airflow@example.com'], 'email_on_failure': False, 'email_on_retry': False, # In case of errors, do one retry 'retries': 1, # Do the retry with 30 seconds delay after the error 'retry_delay': timedelta(seconds=30), # Run once every 15 minutes 'schedule_interval': '*/15 * * * *' }
We have defined our parameters. Now it is time to actually tell our DAG what it is supposed to do. We do this by declaring different tasks — T1 and T2. We must also define which task depends on the other.
with DAG( dag_id=’simple_bash_dag’, default_args=default_args, schedule_interval=None, tags=[‘my_dags’], ) as dag: #Here we define our first task t1 = BashOperator(bash_command=”touch ~/my_bash_file.txt”, task_id=”create_file”) #Here we define our second task t2 = BashOperator(bash_command=”mv ~/my_bash_file.txt ~/my_bash_file_changed.txt”, task_id=”change_file_name”) # Configure T2 to be dependent on T1’s execution t1 >> t2
And as simple as that, we have finished creating our DAG :tada:
Testing our DAG
Let’s see how our DAG looks like and most importantly, see if it works.
To do this, we must start two Airflow components:
- The Scheduler , which controls the flow of our DAGs
airflow scheduler
- The Web Server , an UI which allows us to control and monitor our DAGs
airflow webserver
You should see the following outputs (or at least something similar):
Showtime
We should now be ready to look at our Airflow UI and test our DAG.
Just fire up your navigator and go to https://localhost:8080. Once you hit Enter, the Airflow UI should be displayed.
Look for our DAG — simple_bash_dag — and click on the button to its left, so that it is activated. Last, on the right hand side, click on the play button ▶ to trigger the DAG manually.
Clicking on the DAG enables us to see the status of the latest runs. If we click on the Graph View , we should see a graphical representation of our DAG — along with the color codes indicating the execution status for each task.
As we can see, our DAG has run successfully
We can also confirm that by looking at our home directory:
Wrap Up
- We had an introduction about Airflow , how it is used in different companies and how it can help us in setting up different types of data pipelines
- We were able to install, setup and run a simple Airflow environment using a SQLite backend and the SequentialExecutor
- We used the BashOperator to run simple file creation and manipulation logic
There are many nice things you can do with Airflow, and I hope my post helped you get started. If you have any questions or suggestions, feel free to comment.
You might also like some of my other stories:
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
scikit learn机器学习
黄永昌 / 机械工业出版社 / 2018-3-1 / CNY 59.00
本书通过通俗易懂的语言、丰富的图示和生动的实例,拨开了笼罩在机器学习上方复杂的数学“乌云”,让读者以较低的代价和门槛轻松入门机器学习。本书共分为11章,主要介绍了在Python环境下学习scikit-learn机器学习框架的相关知识。本书涵盖的主要内容有机器学习概述、Python机器学习软件包、机器学习理论基础、k-近邻算法、线性回归算法、逻辑回归算法、决策树、支持向量机、朴素贝叶斯算法、PCA ......一起来看看 《scikit learn机器学习》 这本书的介绍吧!