内容简介:airflow探索篇
airflow是一个 Airbnb 的 Workflow 开源项目,在Github 上已经有超过两千星。data pipeline调度和监控工作流的平台,用于用来创建、监控和调整data pipeline。类似的产品有:Azkaban、oozie
pip方式安装
默认已经安装python >= 2.7 以及 pip
安装可以参考这篇,比较详细。 airflow安装以及celery方式启动
重要说明
使用 mysql 需要安装
python 2 : pip install MySQL-python python 3 : pip install PyMySQL
AIRFLOW_HOME配置说明
上篇在.bashrc中配置的export AIRFLOW_HOME=/home/airflow/airflow01。AIRFLOW_HOME设置目录在airflow initdb的时候初始化,存放airflow的配置文件airflow.cfg及相关文件。
DAG说明-管理建议
默认$AIRFLOW_HOME/dags存放定义的dag,可以分目录管理dag。常用管理dag做法,dag存放另一个目录通过git管理,并设置软连接映射到$AIRFLOW_HOME/dag。好处方便dag编辑变更,同时dag变更不会出现编辑到一半的时候就加载到airflow中。
plugins说明-算子定义
默认$AIRFLOW_HOME/plugins存放定义的plugins,自定义组件。可以自定义operator,hook等等。我们希望可以直接使用这种模式定义机器学习的一个算子。下面定义了一个简单的加法算子。
# -*- coding: UTF-8 -*-
# !/usr/bin/env python
from airflow.plugins_manager import AirflowPlugin
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
# Will show up under airflow.operators.plus_plugin.PluginOperator
class PlusOperator(BaseOperator):
@apply_defaults
def __init__(self, op_args=None, params=None, provide_context=False, set_context=False, *args, **kwargs):
super(PlusOperator, self).__init__(*args, **kwargs)
self.params = params or {}
self.set_context = set_context
def execute(self, context):
if self.provide_context:
context.update(self.op_kwargs)
self.op_kwargs = context
puls = self.op_kwargs['a'] + self.op_kwargs['b']
print "a =", self.op_kwargs['a'], ". b=", self.op_kwargs['a']
return_value = self.main()
context[self.task_id].xcom_push(key='return_value', value=return_value)
return puls
# Defining the plugin class
class PlusPlugin(AirflowPlugin):
name = "plus_plugin"
operators = [PlusOperator]
在dag中使用案例如下
from airflow.operators.plus_plugin import PlusOperator
plus_task = PlusOperator(task_id='plus_task', provide_context=True, params={'a': 1,'b':2},dag=dag)
一些命令说明
| 命令 | 说明 |
|---|---|
| airflow webserver -p 8091 | 8091启动webserver,通过页面查询不需要可以不启动 |
| airflow scheduler | 调度器,必须启动,不然dag没法run起来(使用CeleryExecutor、LocalExecutor时) |
| airflow run dagid [time] | run task instance |
| airflow backfill [dagid] -s[startTime] -e [endTime] | run a backfill over 2 days |
run的demo # run your first task instance airflow run example_bash_operator runme_0 2018-01-11 # run a backfill over 2 days airflow backfill example_bash_operator -s 2018-01-10 -e 2018-01-11
基于CeleryExecutor方式的系统架构
使用celery方式的系统架构图(官方推荐使用这种方式,同时支持mesos方式部署)。turing为外部系统,GDags服务帮助拼接成dag,可以忽略。
- 1.master节点webui管理dags、日志等信息。scheduler负责调度,只支持单节点,多节点启动scheduler可能会挂掉
- 2.worker负责执行具体dag中的task。这样不同的task可以在不同的环境中执行。
基于LocalExecutor方式的系统架构图
另一种启动方式的思考,一个dag分配到1台机器上执行。如果task不复杂同时task环境相同,可以采用这种方式,方便扩容、管理,同时没有master单点问题。
基于源码的启动以及二次开发
很多情况airflow是不满足我们需求,就需要自己二次开发,这时候就需要基于源码方式启动。比如日志我们期望通过http的方式提供出来,同其他系统查看。airflow自动的webserver只提供页面查询的方式。
下载源码
github源码地址 : [ https://github.com/apache/inc... ]
git clone git@github.com:apache/incubator-airflow.git
切换分支
master分支的表初始化有坑,mysql设置的 sql 校验安全级别过高一直建表不成功。这个坑被整的有点惨。v1-8-stable或者v1-9-stable分支都可以。
git checkout v1-8-stable
安装必要 Python 包
进入incubator-airflow,python setup.py install (没啥文档说明,又是一个坑。找了半天)
初始化
直接输入airflow initdb(python setup.py install这个命令会将airflow安装进去)
修改配置
进入$AIRFLOE_HOME (默认在~/airflow),修改airflow.cfg,修改mysql配置。可以查看上面推荐的文章以及上面的[使用mysql需要安装]
启动
airflow webserver -p 8085
airflow scheduler
获取日志信息的改造
1.进入incubator-airflow/airflow/www/
2.修改views.py
在 class Airflow(BaseView)中添加下面代码
@expose('/logs')
@login_required
@wwwutils.action_logging
def logs(self):
BASE_LOG_FOLDER = os.path.expanduser(
conf.get('core', 'BASE_LOG_FOLDER'))
dag_id = request.args.get('dag_id')
task_id = request.args.get('task_id')
execution_date = request.args.get('execution_date')
dag = dagbag.get_dag(dag_id)
log_relative = "{dag_id}/{task_id}/{execution_date}".format(
**locals())
loc = os.path.join(BASE_LOG_FOLDER, log_relative)
loc = loc.format(**locals())
log = ""
TI = models.TaskInstance
session = Session()
dttm = dateutil.parser.parse(execution_date)
ti = session.query(TI).filter(
TI.dag_id == dag_id, TI.task_id == task_id,
TI.execution_date == dttm).first()
dttm = dateutil.parser.parse(execution_date)
form = DateTimeForm(data={'execution_date': dttm})
if ti:
host = ti.hostname
log_loaded = False
if os.path.exists(loc):
try:
f = open(loc)
log += "".join(f.readlines())
f.close()
log_loaded = True
except:
log = "*** Failed to load local log file: {0}.\n".format(loc)
else:
WORKER_LOG_SERVER_PORT = \
conf.get('celery', 'WORKER_LOG_SERVER_PORT')
url = os.path.join(
"http://{host}:{WORKER_LOG_SERVER_PORT}/log", log_relative
).format(**locals())
log += "*** Log file isn't local.\n"
log += "*** Fetching here: {url}\n".format(**locals())
try:
import requests
timeout = None # No timeout
try:
timeout = conf.getint('webserver', 'log_fetch_timeout_sec')
except (AirflowConfigException, ValueError):
pass
response = requests.get(url, timeout=timeout)
response.raise_for_status()
log += '\n' + response.text
log_loaded = True
except:
log += "*** Failed to fetch log file from worker.\n".format(
**locals())
if not log_loaded:
# load remote logs
remote_log_base = conf.get('core', 'REMOTE_BASE_LOG_FOLDER')
remote_log = os.path.join(remote_log_base, log_relative)
log += '\n*** Reading remote logs...\n'
# S3
if remote_log.startswith('s3:/'):
log += log_utils.S3Log().read(remote_log, return_error=True)
# GCS
elif remote_log.startswith('gs:/'):
log += log_utils.GCSLog().read(remote_log, return_error=True)
# unsupported
elif remote_log:
log += '*** Unsupported remote log location.'
session.commit()
session.close()
if PY2 and not isinstance(log, unicode):
log = log.decode('utf-8')
title = "Log"
return wwwutils.json_response(log)
3.重启服务,访问url如:
http://localhost:8085/admin/airflow/logs?task_id=run_after_loop&dag_id=example_bash_operator&execution_date=2018-01-11
就可以拿到这个任务在execution_date=2018-01-11的日志
异步任务思考
案例:task通过http请求大数据操作,拆分一些数据,存入一些临时表。
方案:
1.新建一张task实例的状态表如:task_instance_state。
2.扩展一个plugins,如:AsyncHttpOperator。AsyncHttpOperator实现逻辑:
- 在task_instance_state插入一条running状态记录running。
- 发送http请求给大数据平台,操作数据。
- 轮询查询task_instance_state状态是成功、失败、running。如是running则继续轮询,成功、失败操作相应后续操作。
3.提供一个restful api update task_instance_state,供大数据平台回调,修改任务实例状态。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
An Introduction to Probability Theory and Its Applications
William Feller / Wiley / 1991-1-1 / USD 120.00
Major changes in this edition include the substitution of probabilistic arguments for combinatorial artifices, and the addition of new sections on branching processes, Markov chains, and the De Moivre......一起来看看 《An Introduction to Probability Theory and Its Applications》 这本书的介绍吧!