airflow探索篇

栏目: Python · 发布时间: 8年前

内容简介:airflow探索篇
airflow是一个 Airbnb 的 Workflow 开源项目,在Github 上已经有超过两千星。data pipeline调度和监控工作流的平台,用于用来创建、监控和调整data pipeline。类似的产品有:Azkaban、oozie

pip方式安装

默认已经安装python >= 2.7 以及 pip

安装可以参考这篇,比较详细。 airflow安装以及celery方式启动

重要说明

使用 mysql 需要安装

python 2 : pip install MySQL-python
python 3 : pip install PyMySQL

AIRFLOW_HOME配置说明

上篇在.bashrc中配置的export AIRFLOW_HOME=/home/airflow/airflow01。AIRFLOW_HOME设置目录在airflow initdb的时候初始化,存放airflow的配置文件airflow.cfg及相关文件。

DAG说明-管理建议

默认$AIRFLOW_HOME/dags存放定义的dag,可以分目录管理dag。常用管理dag做法,dag存放另一个目录通过git管理,并设置软连接映射到$AIRFLOW_HOME/dag。好处方便dag编辑变更,同时dag变更不会出现编辑到一半的时候就加载到airflow中。

plugins说明-算子定义

默认$AIRFLOW_HOME/plugins存放定义的plugins,自定义组件。可以自定义operator,hook等等。我们希望可以直接使用这种模式定义机器学习的一个算子。下面定义了一个简单的加法算子。

# -*- coding: UTF-8 -*-
# !/usr/bin/env python

from airflow.plugins_manager import AirflowPlugin
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults

# Will show up under airflow.operators.plus_plugin.PluginOperator
class PlusOperator(BaseOperator):

    @apply_defaults
    def __init__(self, op_args=None, params=None, provide_context=False, set_context=False, *args, **kwargs):
        super(PlusOperator, self).__init__(*args, **kwargs)
        self.params = params or {}
        self.set_context = set_context

    def execute(self, context):
        if self.provide_context:
            context.update(self.op_kwargs)
            self.op_kwargs = context

        puls = self.op_kwargs['a'] + self.op_kwargs['b']
        print "a =", self.op_kwargs['a'], ". b=", self.op_kwargs['a']
        return_value = self.main()
        context[self.task_id].xcom_push(key='return_value', value=return_value)
        return puls


# Defining the plugin class
class PlusPlugin(AirflowPlugin):
    name = "plus_plugin"
    operators = [PlusOperator]

在dag中使用案例如下

from airflow.operators.plus_plugin import PlusOperator
plus_task = PlusOperator(task_id='plus_task', provide_context=True, params={'a': 1,'b':2},dag=dag)

一些命令说明

命令 说明
airflow webserver -p 8091 8091启动webserver,通过页面查询不需要可以不启动
airflow scheduler 调度器,必须启动,不然dag没法run起来(使用CeleryExecutor、LocalExecutor时)
airflow run dagid [time] run task instance
airflow backfill [dagid] -s[startTime] -e [endTime] run a backfill over 2 days
run的demo
# run your first task instance
airflow run example_bash_operator runme_0 2018-01-11

# run a backfill over 2 days
airflow backfill example_bash_operator -s 2018-01-10 -e 2018-01-11

基于CeleryExecutor方式的系统架构

使用celery方式的系统架构图(官方推荐使用这种方式,同时支持mesos方式部署)。turing为外部系统,GDags服务帮助拼接成dag,可以忽略。

  • 1.master节点webui管理dags、日志等信息。scheduler负责调度,只支持单节点,多节点启动scheduler可能会挂掉
  • 2.worker负责执行具体dag中的task。这样不同的task可以在不同的环境中执行。

airflow探索篇

基于LocalExecutor方式的系统架构图

另一种启动方式的思考,一个dag分配到1台机器上执行。如果task不复杂同时task环境相同,可以采用这种方式,方便扩容、管理,同时没有master单点问题。

airflow探索篇

基于源码的启动以及二次开发

很多情况airflow是不满足我们需求,就需要自己二次开发,这时候就需要基于源码方式启动。比如日志我们期望通过http的方式提供出来,同其他系统查看。airflow自动的webserver只提供页面查询的方式。

下载源码

github源码地址 : [ https://github.com/apache/inc... ]

git clone git@github.com:apache/incubator-airflow.git

切换分支

master分支的表初始化有坑,mysql设置的 sql 校验安全级别过高一直建表不成功。这个坑被整的有点惨。v1-8-stable或者v1-9-stable分支都可以。

git checkout v1-8-stable

安装必要 Python

进入incubator-airflow,python setup.py install (没啥文档说明,又是一个坑。找了半天)

初始化

直接输入airflow initdb(python setup.py install这个命令会将airflow安装进去)

修改配置

进入$AIRFLOE_HOME (默认在~/airflow),修改airflow.cfg,修改mysql配置。可以查看上面推荐的文章以及上面的[使用mysql需要安装]

启动

airflow webserver -p 8085

airflow scheduler

获取日志信息的改造

1.进入incubator-airflow/airflow/www/

2.修改views.py

在 class Airflow(BaseView)中添加下面代码

@expose('/logs')
    @login_required
    @wwwutils.action_logging
    def logs(self):
        BASE_LOG_FOLDER = os.path.expanduser(
            conf.get('core', 'BASE_LOG_FOLDER'))
        dag_id = request.args.get('dag_id')
        task_id = request.args.get('task_id')
        execution_date = request.args.get('execution_date')
        dag = dagbag.get_dag(dag_id)
        log_relative = "{dag_id}/{task_id}/{execution_date}".format(
            **locals())
        loc = os.path.join(BASE_LOG_FOLDER, log_relative)
        loc = loc.format(**locals())
        log = ""
        TI = models.TaskInstance
        session = Session()
        dttm = dateutil.parser.parse(execution_date)
        ti = session.query(TI).filter(
            TI.dag_id == dag_id, TI.task_id == task_id,
            TI.execution_date == dttm).first()
        dttm = dateutil.parser.parse(execution_date)
        form = DateTimeForm(data={'execution_date': dttm})

        if ti:
            host = ti.hostname
            log_loaded = False

            if os.path.exists(loc):
                try:
                    f = open(loc)
                    log += "".join(f.readlines())
                    f.close()
                    log_loaded = True
                except:
                    log = "*** Failed to load local log file: {0}.\n".format(loc)
            else:
                WORKER_LOG_SERVER_PORT = \
                    conf.get('celery', 'WORKER_LOG_SERVER_PORT')
                url = os.path.join(
                    "http://{host}:{WORKER_LOG_SERVER_PORT}/log", log_relative
                ).format(**locals())
                log += "*** Log file isn't local.\n"
                log += "*** Fetching here: {url}\n".format(**locals())
                try:
                    import requests
                    timeout = None  # No timeout
                    try:
                        timeout = conf.getint('webserver', 'log_fetch_timeout_sec')
                    except (AirflowConfigException, ValueError):
                        pass

                    response = requests.get(url, timeout=timeout)
                    response.raise_for_status()
                    log += '\n' + response.text
                    log_loaded = True
                except:
                    log += "*** Failed to fetch log file from worker.\n".format(
                        **locals())

            if not log_loaded:
                # load remote logs
                remote_log_base = conf.get('core', 'REMOTE_BASE_LOG_FOLDER')
                remote_log = os.path.join(remote_log_base, log_relative)
                log += '\n*** Reading remote logs...\n'

                # S3
                if remote_log.startswith('s3:/'):
                    log += log_utils.S3Log().read(remote_log, return_error=True)

                # GCS
                elif remote_log.startswith('gs:/'):
                    log += log_utils.GCSLog().read(remote_log, return_error=True)

                # unsupported
                elif remote_log:
                    log += '*** Unsupported remote log location.'

            session.commit()
            session.close()

        if PY2 and not isinstance(log, unicode):
            log = log.decode('utf-8')

        title = "Log"

        return wwwutils.json_response(log)

3.重启服务,访问url如:

http://localhost:8085/admin/airflow/logs?task_id=run_after_loop&dag_id=example_bash_operator&execution_date=2018-01-11

就可以拿到这个任务在execution_date=2018-01-11的日志

异步任务思考

案例:task通过http请求大数据操作,拆分一些数据,存入一些临时表。

方案:

1.新建一张task实例的状态表如:task_instance_state。

2.扩展一个plugins,如:AsyncHttpOperator。AsyncHttpOperator实现逻辑:

  • 在task_instance_state插入一条running状态记录running。
  • 发送http请求给大数据平台,操作数据。
  • 轮询查询task_instance_state状态是成功、失败、running。如是running则继续轮询,成功、失败操作相应后续操作。

3.提供一个restful api update task_instance_state,供大数据平台回调,修改任务实例状态。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Professional JavaScript for Web Developers

Professional JavaScript for Web Developers

Nicholas C. Zakas / Wrox / 2009-1-14 / USD 49.99

This eagerly anticipated update to the breakout book on JavaScript offers you an in-depth look at the numerous advances to the techniques and technology of the JavaScript language. You'll see why Java......一起来看看 《Professional JavaScript for Web Developers》 这本书的介绍吧!

SHA 加密
SHA 加密

SHA 加密工具

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具