内容简介:最近几年里,Python 已成为数据科学、机器学习和深度学习领域的一门流行的编程语言。只需再配上查询语言 SQL 即可完成大多数工作。SQL 很棒,用英语即可发出指令,且只需指示想要什么,而无需关心具体如何查询。这使得底层的查询引擎可以不改变 SQL 查询就能对其进行优化。Python 也很棒,它有大量高质量的库,本身也易于使用。作业编排是执行日常任务并使其自动化的行为。在过去,这通常是通过 CRON 作业完成的。而在最近几年,越来越多的企业开始使用Apache Airflow 和Spotify 的 Lu
最近几年里,Python 已成为数据科学、机器学习和深度学习领域的一门流行的编程语言。只需再配上查询语言 SQL 即可完成大多数工作。SQL 很棒,用英语即可发出指令,且只需指示想要什么,而无需关心具体如何查询。这使得底层的查询引擎可以不改变 SQL 查询就能对其进行优化。Python 也很棒,它有大量高质量的库,本身也易于使用。
作业编排是执行日常任务并使其自动化的行为。在过去,这通常是通过 CRON 作业完成的。而在最近几年,越来越多的企业开始使用Apache Airflow 和Spotify 的 Luigi 等创建更强大的系统。这些 工具 可以监控作业、记录结果并在发生故障时重新运行作业。如果您有兴趣,我曾写过一篇博客文章,其中包括 Airflow 的背景故事,题为 《使用 Airflow 构建数据管道》 。
作为数据探索和可视化工具的 Notebooks 在过去几年中也在数据领域变得非常流行。像Jupyter Notebook 和Apache Zeppelin 这样的工具旨在满足这一需求。Notebooks 不仅向您显示分析结果,还显示产生这些结果的代码和查询。这有利于发现疏忽并可帮助分析师重现彼此的工作。
Airflow 和 Jupyter Notebook 可以很好地协同工作,您可以使用 Airflow 自动将新数据输入数据库,然后数据科学家可以使用 Jupyter Notebook 进行分析。
在这篇博文中,我将安装一个单节点的 Hadoop,让 Jupyter Notebook 运行并展示如何创建一个 Airflow 作业,它可以获取天气数据源,将其存储在 HDFS 上,再转换为 ORC 格式,最后导出到 Microsoft Excel 格式的电子表格中。
我正在使用的机器有一个主频为 3.40 GHz 的 Intel Core i5-4670K CPU、12 GB 的 RAM 和 200 GB 的 SSD。我将使用全新安装的 Ubuntu 16.04.2 LTS,并根据我的博客文章 《Hadoop 3:单节点安装指南》 中的说明构建安装单节点 Hadoop。
安装依赖项
接下来将安装 Ubuntu 上的依赖项。 git 包将用于从 GitHub 获取天气数据集,其余三个包是 Python 本身、Python 包安装程序和 Python 环境隔离工具包。
$ sudo apt install \ git \ python \ python-pip \ virtualenv 复制代码
Airflow 将依靠 RabbitMQ 的帮助来跟踪其作业。下面安装 Erlang,这是编写 RabbitMQ 的语言。
$ echo "deb http://binaries.erlang-solutions.com/debian xenial contrib" | \ sudo tee /etc/apt/sources.list.d/erlang.list $ wget -O - http://binaries.erlang-solutions.com/debian/erlang_solutions.asc | \ sudo apt-key add - $ sudo apt update $ sudo apt install esl-erlang 复制代码
下面安装 RabbitMQ。
$ echo "deb https://dl.bintray.com/rabbitmq/debian xenial main" | \ sudo tee /etc/apt/sources.list.d/bintray.rabbitmq.list $ wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | \ sudo apt-key add - $ sudo apt update $ sudo apt install rabbitmq-server 复制代码
下面将安装此博文中使用的 Python 上的依赖项和应用程序。
$ virtualenv .notebooks $ source .notebooks/bin/activate $ pip install \ apache-airflow \ celery \ cryptography \ jupyter \ jupyterthemes \ pyhive \ requests \ xlsxwriter 复制代码
配置 Jupyter Notebook
我将为 Jupyter 创建一个文件夹来存储其配置,然后为服务器设置密码。如果不设置密码,您就会获得一个冗长的 URL,其中包含用于访问 Jupyter 网页界面的密钥。每次启动 Jupyter Notebook 时,密钥都会更新。
$ mkdir -p ~/.jupyter/ $ jupyter notebook password 复制代码
Jupyter Notebook 支持用户界面主题。以下命令将主题设置为 Chesterish 。
$ jt -t chesterish 复制代码
下面命令列出当前安装的主题。内置的主题在 GitHub上都有 屏幕截图 。
$ jt -l 复制代码
要返回默认主题,请运行以下命令。
$ jt -r 复制代码
通过 Jupyter Notebook 查询 Spark
首先确保您运行着 Hive 的 Metastore、Spark 的 Master & Slaves 服务,以及 Presto 的服务端。以下是启动这些服务的命令。
$ hive --service metastore & $ sudo /opt/presto/bin/launcher start $ sudo /opt/spark/sbin/start-master.sh $ sudo /opt/spark/sbin/start-slaves.sh 复制代码
下面将启动 Jupyter Notebook,以便您可以与 PySpark 进行交互,PySpark 是 Spark 的基于 Python 的编程接口。
$ PYSPARK_DRIVER_PYTHON=ipython \ PYSPARK_DRIVER_PYTHON_OPTS="notebook --no-browser --ip=0.0.0.0 --NotebookApp.iopub_data_rate_limit=100000000" \ pyspark \ --master spark://ubuntu:7077 复制代码
请注意,上面的 master 的 URL 以 ubuntu 为主机名。此主机名是 Spark Master 服务端绑定的主机名。如果无法连接到 Spark,请检查 Spark Master 服务端的日志,查找它已选择绑定的主机名,因为它不接受寻址其他主机名的连接。这可能会令人困惑,因为您通常会期望像 localhost 这样的主机名无论如何都能正常工作。
运行 Jupyter Notebook 服务后,用下面命令打开网页界面。
$ open http://localhost:8888/ 复制代码
系统将提示您输入为 Jupyter Notebook 设置的密码。在右上角输入后,您可以从下拉列表中创建新的笔记本。我们感兴趣的两种笔记本类型是 Python 和终端。终端笔记本使用您启动 Jupyter Notebook 的 UNIX 帐户为您提供 shell 访问权限。而我将使用的是 Python 笔记本。
启动 Python 笔记本后,将以下代码粘贴到单元格中,它将通过 Spark 查询数据。调整查询以使用您在安装中创建的数据集。
cab_types = sqlContext.sql(""" SELECT cab_type, COUNT(*) FROM trips_orc GROUP BY cab_type """) cab_types.take(2) 复制代码
这就是上面查询的输出结果。只返回了一条记录,包括两个字段。
[Row(cab_type=u'yellow', count(1)=20000000)] 复制代码
通过 Jupyter Notebook 查询 Presto
在前面用来查询 Spark 的笔记本中,也可以查询 Presto。某些 Presto 查询的性能可能超过 Spark,趁手的是这两者可以在同一个笔记本中进行切换。在下面的示例中,我使用 Dropbox 的 PyHive 库来查询 Presto。
from pyhive import presto cursor = presto.connect('0.0.0.0').cursor() cursor.execute('SELECT * FROM trips_orc LIMIT 10') cursor.fetchall() 复制代码
这是上述查询的部分输出。
[(451221840, u'CMT', u'2011-08-23 21:03:34.000', u'2011-08-23 21:21:49.000', u'N', 1, -74.004655, 40.742162, -73.973489, 40.792922, ... 复制代码
如果您想在 Jupyter Notebook 中生成数据图表,可以看看 《在 Jupyter Notebook 中使用 SQLite 可视化数据》 这篇博文,因为它有几个使用 SQL 的绘图示例,可以与 Spark 和 Presto 一起使用。
启动 Airflow
下面将创建一个 ~/airflow 文件夹,设置一个用于存储在网页界面上设置的 Airflow 的状态和配置集的 SQLite 3 数据库,升级配置模式并为 Airflow 将要运行的 Python 作业代码创建一个文件夹。
$ cd ~ $ airflow initdb $ airflow upgradedb $ mkdir -p ~/airflow/dags 复制代码
默认情况下,Presto、Spark 和 Airflow 的网页界面都使用 TCP 8080 端口。如果您先启动了 Spark,Presto 就将无法启动。但如果您是在 Presto 之后启动 Spark,那么 Presto 将在 8080 上启动,而 Spark Master 服务端则会使用 8081,如果仍被占用,会继续尝试更高端口,直到它找到一个空闲的端口。之后, Spark 将为 Spark Worker 的网页界面选择更高的端口号。这种重叠通常不是问题,因为在生产设置中这些服务通常存在于不同的机器上。
因为此安装中使用了 8080 - 8082 的 TCP 端口,我将在端口 8083 上启动 Airflow 的网页界面。
$ airflow webserver --port=8083 & 复制代码
我经常使用以下命令之一来查看正在使用的网络端口。
$ sudo lsof -OnP | grep LISTEN $ netstat -tuplen $ ss -lntu 复制代码
Airflow 的 Celery 代理和作业结果的存储都默认使用 MySQL。这里改为使用 RabbitMQ。
$ vi ~/airflow/airflow.cfg 复制代码
找到并编辑以下设置。
broker_url = amqp://airflow:airflow@localhost:5672/airflow celery_result_backend = amqp://airflow:airflow@localhost:5672/airflow 复制代码
上面使用了 airflow 作为用户名和密码连接到 RabbitMQ。账号密码可以随意自定。
下面将为 RabbitMQ 配置上述账号密码,以便它能访问 Airflow 虚拟主机。
$ sudo rabbitmqctl add_vhost airflow $ sudo rabbitmqctl add_user airflow airflow $ sudo rabbitmqctl set_user_tags airflow administrator $ sudo rabbitmqctl set_permissions -p airflow airflow ".*" ".*" ".*" 复制代码
将 Airflow 连接到 Presto
下面将打开 Airflow 网页界面。
$ open http://localhost:8083/ 复制代码
打开 Airflow 网页界面后,单击顶部的 “Admin” 导航菜单,然后选择 “Connections”。您将看到一长串默认数据库连接。单击以编辑 Presto 连接。 Airflow 连接到 Presto 需要进行以下更改。
- 将 schema 从 hive 改为 default。
- 将端口从 3400 改为 8080。
保存这些更改,然后单击顶部的 “Data Profiling” 导航菜单,选择 “Ad Hoc Query”。从查询框上方的下拉列表中选择 “presto_default”,您就应该可以通过 Presto 执行 SQL 代码了。下面是针对我在安装中导入的数据集运行的示例查询。
SELECT count(*) FROM trips_orc; 复制代码
下载天气数据集
可以将Airflow DAG 视为定时执行的作业。在下面的示例中,我将在 GitHub 上获取FiveThirtyEight 数据仓库 提供的天气数据,将其导入 HDFS,将其从 CSV 转换为 ORC 并将其从 Presto 导出为 Microsoft Excel 格式。
以下内容将 FiveThirtyEight 的数据存储克隆到名为 data 的本地文件夹中。
$ git clone \ https://github.com/fivethirtyeight/data.git \ ~/data 复制代码
然后我将启动 Hive 并创建两个表。一个存数据集的 CSV 格式,另一个存数据集的 Presto 和 Spark 友好的 ORC 格式。
$ hive 复制代码
CREATE EXTERNAL TABLE weather_csv ( date_ DATE, actual_mean_temp SMALLINT, actual_min_temp SMALLINT, actual_max_temp SMALLINT, average_min_temp SMALLINT, average_max_temp SMALLINT, record_min_temp SMALLINT, record_max_temp SMALLINT, record_min_temp_year INT, record_max_temp_year INT, actual_precipitation DECIMAL(18,14), average_precipitation DECIMAL(18,14), record_precipitation DECIMAL(18,14) ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/weather_csv/'; CREATE EXTERNAL TABLE weather_orc ( date_ DATE, actual_mean_temp SMALLINT, actual_min_temp SMALLINT, actual_max_temp SMALLINT, average_min_temp SMALLINT, average_max_temp SMALLINT, record_min_temp SMALLINT, record_max_temp SMALLINT, record_min_temp_year INT, record_max_temp_year INT, actual_precipitation DOUBLE, average_precipitation DOUBLE, record_precipitation DOUBLE ) STORED AS orc LOCATION '/weather_orc/'; 复制代码
创建 Airflow DAG
下面的 Python 代码是 Airflow 作业(也称为DAG)。每隔 30 分钟,它将执行以下操作。
- 清除 HDFS上 /weather_csv/ 文件夹中的任何现有数据。
- 将 ~/data 文件夹中的 CSV 文件复制到 HDFS 上的 /weather_csv/ 文件夹中。
- 使用 Hive 将 HDFS 上的 CSV 数据转换为 ORC 格式。
- 使用 Presto 将 ORC 格式的数据导出为 Microsoft Excel 2013 格式。
在下面的 Python 代码中有一个指向 CSV 的位置,完整路径为 /home/mark/data/us-weather-history/*.csv,请将其中的 “mark” 更换为您自己的 UNIX 用户名。
$ vi ~/airflow/dags/weather.py 复制代码
from datetime import timedelta import airflow from airflow.hooks.presto_hook import PrestoHook from airflow.operators.bash_operator import BashOperator from airflow.operators.python_operator import PythonOperator import numpy as np import pandas as pd default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': airflow.utils.dates.days_ago(0), 'email': ['airflow@example.com'], 'email_on_failure': True, 'email_on_retry': False, 'retries': 3, 'retry_delay': timedelta(minutes=15), } dag = airflow.DAG('weather', default_args=default_args, description='将天气数据复制到 HDFS 并导出为 Excel', schedule_interval=timedelta(minutes=30)) cmd = "hdfs dfs -rm /weather_csv/*.csv || true" remove_csvs_task = BashOperator(task_id='remove_csvs', bash_command=cmd, dag=dag) cmd = """hdfs dfs -copyFromLocal \ /home/mark/data/us-weather-history/*.csv \ /weather_csv/""" csv_to_hdfs_task = BashOperator(task_id='csv_to_hdfs', bash_command=cmd, dag=dag) cmd = """echo \"INSERT INTO weather_orc SELECT * FROM weather_csv;\" | \ hive""" csv_to_orc_task = BashOperator(task_id='csv_to_orc', bash_command=cmd, dag=dag) def presto_to_excel(**context): column_names = [ "date", "actual_mean_temp", "actual_min_temp", "actual_max_temp", "average_min_temp", "average_max_temp", "record_min_temp", "record_max_temp", "record_min_temp_year", "record_max_temp_year", "actual_precipitation", "average_precipitation", "record_precipitation" ] sql = """SELECT * FROM weather_orc LIMIT 20""" ph = PrestoHook(catalog='hive', schema='default', port=8080) data = ph.get_records(sql) df = pd.DataFrame(np.array(data).reshape(20, 13), columns=column_names) writer = pd.ExcelWriter('weather.xlsx', engine='xlsxwriter') df.to_excel(writer, sheet_name='Sheet1') writer.save() return True presto_to_excel_task = PythonOperator(task_id='presto_to_excel', provide_context=True, python_callable=presto_to_excel, dag=dag) remove_csvs_task >> csv_to_hdfs_task >> csv_to_orc_task >> presto_to_excel_task if __name__ == "__main__": dag.cli() 复制代码
使用该代码打开 Airflow 的网页界面并将主页底部的 “weather” DAG 旁边的开关切换为 “on”。
调度程序将创建一个作业列表交给 workers 去执行。以下内容将启动 Airflow 的调度程序服务和一个将完成所有预定作业的 worker。
$ airflow scheduler & $ airflow worker & 复制代码
感谢您抽出宝贵时间阅读这篇文章。我为北美和欧洲的客户提供咨询、架构和实际开发服务。如果您有意探讨我的产品将如何帮助您的业务,请通过LinkedIn 与我联系。
如果发现译文存在错误或其他需要改进的地方,欢迎到 掘金翻译计划 对译文进行修改并 PR,也可获得相应奖励积分。文章开头的 本文永久链接 即为本文在 GitHub 上的 MarkDown 链接。
掘金翻译计划 是一个翻译优质互联网技术文章的社区,文章来源为掘金 上的英文分享文章。内容覆盖 Android 、 iOS 、 前端 、 后端 、 区块链 、 产品 、 设计 、 人工智能 等领域,想要查看更多优质译文请持续关注 掘金翻译计划 、官方微博、 知乎专栏 。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 4 万字全面掌握数据库、数据仓库、数据集市、数据湖、数据中台
- 数据映射如何支持数据转换和数据集成?
- 大数据产品经理必备的数据挖掘知识概述(一)认识数据之数据可视化
- 避免“数据湖”成为“数据沼泽”,流动的“数据河”是关键
- Oracle数据库查询重复数据及删除重复数据方法
- Redis数据持久化、数据备份、数据的故障恢复
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。