使用Spark读取Hive中的数据

栏目: 服务器 · 发布时间: 7年前

内容简介:在默认情况下,Hive使用MapReduce来对数据进行操作和运算,即将HQL语句翻译成MapReduce作业执行。而MapReduce的执行速度是比较慢的,一种改进方案就是使用Spark来进行数据的查找和运算。Hive和Spark的结合使用有两种方式,一种称为Hive on Spark:即将Hive底层的运算引擎由MapReduce切换为Spark,官方文档在这里:因为Spark是一个更为通用的计算引擎,以后还会有更深度的使用(比如使用Spark streaming来进行实时运算),因此,我选用了Spa

在默认情况下,Hive使用MapReduce来对数据进行操作和运算,即将HQL语句翻译成MapReduce作业执行。而MapReduce的执行速度是比较慢的,一种改进方案就是使用Spark来进行数据的查找和运算。Hive和Spark的结合使用有两种方式,一种称为Hive on Spark:即将Hive底层的运算引擎由MapReduce切换为Spark,官方文档在这里: Hive on Spark: Getting Started 。还有一种方式,可以称之为Spark on Hive:即使用Hive作为Spark的数据源,用Spark来读取HIVE的表数据(数据仍存储在HDFS上)。

因为Spark是一个更为通用的计算引擎,以后还会有更深度的使用(比如使用Spark streaming来进行实时运算),因此,我选用了Spark on Hive这种解决方案,将Hive仅作为管理结构化数据的工具。本文是Spark的配置过程。

配置spark

拷贝hive-site.xml至$SPARK_HOME下,然后再其中添加下面的语句:

<property> <name>hive.metastore.uris</name> <value>thrift://192.168.1.56:9083</value> </property>

这里192.168.1.56是Hive的元数据服务的地址,9083是默认的端口号。通过这里的配置,让Spark与Hive的元数据库建立起联系,Spark就可以获得Hive中有哪些库、表、分区、字段等信息。

配置Hive的元数据,可以参考 配置Hive使用 MySql 记录元数据 。

确认Hive元数据服务已经运行

Hive的元数据服务是单独启动的,可以通过下面两种方式验证其是否启动:

# ps aux | grep hive-metastore root 10516 3.0 5.7 2040832 223484 pts/4 Sl+ 14:52 0:11 /opt/jdk/jdk1.8.0_171/jre/bin/java -Xmx256m -Djava.library.path=/opt/hadoop/hadoop-2.9.1/lib/native -Djava.net.preferIPv4Stack=true -Dhadoop.log.dir=/opt/hadoop/hadoop-2.9.1/logs ...

也可以使用下面的语句,以验证端口的方式来确认服务是否启动:

# lsof -i:9083 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 10516 root 509u IPv4 6889656 0t0 TCP *:emc-pp-mgmtsvc (LISTEN)

如果metastore服务没有启动,可以使用下面的命令启动之:

# hive --service metastore 2018-07-25 14:52:27: Starting Hive Metastore Server

编写 python 脚本,访问Hive仓库

配置完成后,就可以编写python脚本来对数据进行查询和运算了:

from pyspark.sql import SparkSession from pyspark.sql import HiveContext spark = SparkSession.builder.master("local")\ .appName("SparkOnHive")\ .config("spark.sql.warehouse.dir", "/user/hive/warehouse")\ .enableHiveSupport()\ .getOrCreate() hiveCtx = HiveContext(spark) df = hiveCtx.sql("select * from tglog_aw_2018.golds_log limit 10") rows = df.collect() for row in rows: print " ".join([str(row[0]), row[1].encode('utf-8'), row[2].encode('utf-8'), str(row[3]), str(row[4])])

本人是使用PyCharm这个IDE进行开发的,上面引用了pyspark这个包,如何进行python的包管理可以自行百度。

将上面的代码保存至文件 golds_read.py,然后上传至已安装好spark的服务器的~/python 文件夹下。

上面的查询语句中,tglog_aw_2018是数据库名,golds_log是表名。配置HIVE并写入数据,可以参考这两篇文章:

1. linux上安装和配置Hive

2. 写入数据到Hive表(命令行)

接下来像spark提交作业,可以获得执行结果:

# spark-submit ~/python/golds_read.py 3645356 wds7654321(4171752) 妞妞拼十翻牌 1700 1526027152 2016869 dqyx123456789(2376699) 妞妞拼十翻牌 1140 1526027152 3630468 dke3776611(4156064) 妞妞拼十翻牌 1200 1526027152 3642022 黑娃123456(4168266) 妞妞拼十翻牌 500 1526027152

这个例子主要只是演示一下如何使用spark结合hive使用。spark默认支持 java 、scala和python三种语言编写的作业。可以看出,大部分的逻辑都是要通过python/java/scala编程来实现的。本人选择的是比较轻量的python,操作spark主要是要学习pySpark这个类库,它的官方地址位于: https://spark.apache.org/docs/latest/api/python/index.html

感谢阅读,希望这篇文章能给你带来帮助!


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Tagging

Tagging

Gene Smith / New Riders / 2007-12-27 / GBP 28.99

Tagging is fast becoming one of the primary ways people organize and manage digital information. Tagging complements traditional organizational tools like folders and search on users desktops as well ......一起来看看 《Tagging》 这本书的介绍吧!

随机密码生成器
随机密码生成器

多种字符组合密码

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具