内容简介:版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。QQ邮箱地址:1120746959@qq.com,如有任何技术交流,可随时联系。开始操作之前,需要将 hive.enforce.bucketing 属性设置为 true,以标识 Hive 可以识别桶。当数据量过大,需要庞大分区数量时,可以考虑桶,因为分区数量太大的情况可能会导致文 件系统挂掉,而且桶比分区有更高的查询效率。 数据最终落在
版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。QQ邮箱地址:1120746959@qq.com,如有任何技术交流,可随时联系。
1:order by, sort by, distribute by, cluster by
-
1.1 order by
hive 中的 order by 语句会对查询结果做一次全局排序,即,所有的 mapper 产生的结果都会交给一个 reducer 去处理,无论数据量大小, job 任务只会启动一个 reducer,如果数据量巨大,则会耗费大量的时间。 提示: 如果在严格模式下, order by 需要指定 limit 数据条数,不然数据量巨大的情况下会造成崩溃无输出结果。涉及属性: set hive.mapred.mode=nonstrict/strict
select * from company_info order by money desc; 复制代码
-
1.2 sort by
hive 中的 sort by 语句会对每一块局部数据进行局部排序,即,每一个 reducer 处理的数据都是有序的,但是不能保证全局有序。
-
1.3 distribute by
hive 中的 distribute by 一般要和 sort by 一起使用,即将某一块数据归给(distribute by)某一个reducer 处理,然后在指定的 reducer 中进行 sort by 排序。
提示: distribute by 必须写在 sort by 之前
提示: 涉及属性
mapreduce.job.reduces, hive.exec.reducers.bytes.per.reducer 例如:不同的人(personId)分为不同的组,每组按照 money 排序
select * from company_info distribute by personId sort by personId, money desc; 复制代码
-
1.4 cluster by
hive 中的 cluster by 在 distribute by 和 sort by 排序字段一致的情况下是等价的。 同时, cluster by 指定的列只能是降序,即默认的 descend,而不能是 ascend。 例如: 写一个等价于 distribute by 与 sort by 的例子
select * from company_info distribute by personId sort by personId; select * from compnay_info cluster by personId; 复制代码
2: 行转列、列转行(UDAF 与 UDTF)
2.1 行转列(concat_ws)
create table person_info( name string, constellation string, blood_type string) row format delimited fields terminated by "\t"; load data local inpath “person_info.tsv” into table person_info; #collect_set(t1.name) 表示把分组后的多行值转化为集合 select t1.base, concat_ws('|', collect_set(t1.name)) name from (select name, concat(constellation, ",", blood_type) base from person_info) t1 group by t1.base; 复制代码
2.2 列转行(array< string >数组结构)
create table movie_info( movie string, category array<string>) row format delimited fields terminated by "\t" collection items terminated by ","; load data local inpath "movie_info.tsv" into table movie_info; 复制代码
-
将电影分类中的数组数据展开
select movie, category_name from movie_info lateral view explode(category) table_tmp as category_name; 复制代码
-
“fields terminated by”:字段与字段之间的分隔符。
-
“collection items terminated by”:一个字段中各个子元素 item的分隔符
-
orc 即 Optimized Row Columnar (ORC) file,在 RCFile 的基础上演化而来,可以提供一种高 效的方法在 Hive 中存储数据, 提升了读、写、 处理数据的效率。
2.3 分桶
- 直接分桶
开始操作之前,需要将 hive.enforce.bucketing 属性设置为 true,以标识 Hive 可以识别桶。
create table music( id int, name string, size float) row format delimited fields terminated by "\t" clustered by (id) into 4 buckets; 复制代码
- 在分区中分桶
当数据量过大,需要庞大分区数量时,可以考虑桶,因为分区数量太大的情况可能会导致文 件系统挂掉,而且桶比分区有更高的查询效率。 数据最终落在哪一个桶里,取决于 clustered by 的那个列的值的 hash 数与桶的个数求余来决定。 虽然有一定离散性, 但不能保证每个桶 中的数据量是一样的。
create table music2( id int, name string, size float) partitioned by (date string) clustered by (id) sorted by(size) into 4 bucket row format delimited fields terminated by "\t"; load data local inpath 'demo/music.txt' into table music2 partition(date='2017-08-30'); 复制代码
3 Hive综合项目:
- 版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。QQ邮箱地址:1120746959@qq.com,如有任何技术交流,可随时联系。
3.1 项目字段说明
3.2 用户表
3.3 数据集
3.4 用户数据集
3.5 数据初步ETL
package com.z.youtube.util; public class ETLUtils { /** * 1、过滤不合法数据 * 2、去掉&符号左右两边的空格 * 3、 \t 换成&符号 * @param ori * @return */ public static String getETLString(String ori){ String[] splits = ori.split("\t"); //1、过滤不合法数据 if(splits.length < 9) return null; //2、去掉&符号左右两边的空格 splits[3] = splits[3].replaceAll(" ", ""); StringBuilder sb = new StringBuilder(); //3、 \t 换成&符号 for(int i = 0; i < splits.length; i++){ sb.append(splits[i]); if(i < 9){ if(i != splits.length - 1){ sb.append("\t"); } }else{ if(i != splits.length - 1){ sb.append("&"); } } } return sb.toString(); } } 复制代码
3.6 数据模型建模
-
创建原始表: youtube_ori, youtube_user_ori
#youtube_ori create table youtube_ori( videoId string, uploader string, age int, category array<string>, length int, views int, rate float, ratings int, comments int, relatedId array<string>) row format delimited fields terminated by "\t" collection items terminated by "&" stored as textfile; #youtube_user_ori: create table youtube_user_ori( uploader string, videos int, friends int) clustered by (uploader) into 24 buckets row format delimited fields terminated by "\t" stored as textfile; 复制代码
-
创建ORC表: youtube_orc, youtube_user_orc
#youtube_orc create table youtube_orc( videoId string, uploader string, age int, category array<string>, length int, views int, rate float, ratings int, comments int, relatedId array<string>) clustered by (uploader) into 8 buckets row format delimited fields terminated by "\t" collection items terminated by "&" stored as orc; #youtube_user_orc: create table youtube_user_orc( uploader string, videos int, friends int) clustered by (uploader) into 24 buckets row format delimited fields terminated by "\t" stored as orc; load data inpath "/youtube/output/video/2008/0222" into table youtube_ori; load data inpath "/youtube/user/2008/0903" into table youtube_user_ori; insert into table youtube_orc select * from youtube_ori; insert into table youtube_user_orc select * from youtube_user_ori; 复制代码
4 业务分析
4.1 统计视频观次数 Top10
使用 order by 按照 views 字段做一个全局 排序 即可,同时我们设置只显示前 10 条。
select videoId, uploader, age, category, length, views, rate, ratings, comments from youtube_orc order by views desc limit 复制代码
4.2 统计视频类别热度 Top10
-
- 即统计每个类别有多少个视频,显示出包含视频最多的前 10 个类别。
-
- 我们需要按照类别 group by 聚合,然后 count 组内的 videoId 个数即可。
-
- 因为当前表结构为:一个视频对应一个或多个类别。所以如果要 group by 类别,需要先 将类别进行列转行(展开),然后再进行 count 即可。
-
-
最后按照热度排序,显示前 10 条。
select category_name as category, count(t1.videoId) as hot from ( select videoId, category_name from youtube_orc lateral view explode(category) t_catetory as category_name) t1 group by t1.category_name order by hot desc limit 10; 复制代码
-
4.3 统计出视频(观看数最高的 20 个视频)的所属类别以及类别包含(这 Top20 视频的个数)
-
- 先找到观看数最高的 20 个视频所属条目的所有信息,降序排列
-
- 把这 20 条信息中的 category 分裂出来(列转行)
-
-
最后查询视频分类名称和该分类下有多少个 Top20 的视频
select category_name as category, count(t2.videoId) as hot_with_views from ( select videoId, category_name from ( select * from youtube_orc order by views desc limit 20) t1 lateral view explode(category) t_catetory as category_name) t2 group by category_name order by hot_with_views desc; 复制代码
-
4.4 统计视频观看数 Top50 所关联视频的所属类别的热度排名
-
-
查询出观看数最多的前 50 个视频的所有信息(当然包含了每个视频对应的关联视频),记 为临时表 t1
t1:观看数前 50 的视频 select * from youtube_orc order by views desc limit 50; 复制代码
-
-
-
将找到的 50 条视频信息的相关视频 relatedId 列转行,记为临时表 t2
select explode(relatedId) as videoId from t1; 复制代码
-
-
-
将相关视频的 id 和 youtube_orc 表进行 inner join 操作
t5:得到两列数据,一列是 category,一列是之前查询出来的相关视频 id (select distinct(t2.videoId), t3.category from t2 inner join youtube_orc t3 on t2.videoId = t3.videoId) t4 lateral view explode(category) t_catetory as category_name; 复制代码
-
-
-
按照视频类别进行分组,统计每组视频个数,然后排行
select category_name as category, count(t5.videoId) as hot from ( select videoId, category_name from ( select distinct(t2.videoId), t3.category from ( select explode(relatedId) as videoId from ( select * from youtube_orc order by views desc limit 50) t1) t2 inner join youtube_orc t3 on t2.videoId = t3.videoId) t4 lateral view explode(category) t_catetory as category_name) t5 group by category_name order by hot desc; 复制代码
-
4.5 统计每个类别中的视频热度 Top10,以 Music 为例
-
-
要想统计 Music类别中的视频热度 Top10,需要先找到 Music类别,那么就需要将 category 展开,所以可以创建一张表用于存放 categoryId 展开的数据。
create table youtube_category( videoId string, uploader string, age int, categoryId string, length int, views int, rate float, ratings int, comments int, relatedId array<string>) row format delimited fields terminated by "\t" collection items terminated by "&" stored as orc; 复制代码
-
-
-
向 category 展开的表中插入数据。
insert into table youtube_category select videoId, uploader, age, categoryId, length, views, rate, ratings, comments, relatedId from youtube_orc lateral view explode(category) catetory as categoryId; 复制代码
-
-
-
统计对应类别(Music)中的视频热度。
select videoId, views from youtube_category where categoryId = "Music" order by views desc limit 10; 复制代码
-
4.6 统计每个类别中视频流量Top10,以 Music 为例
-
- 创建视频类别展开表(categoryId 列转行后的表)
-
-
按照 ratings 排序即可
select videoId, views, ratings from youtube_category where categoryId = "Music" order by ratings desc limit 10;
-
4.7 统计上传视频最多的用户 Top10 以及他们上传的观看次数在前 20 的视频
-
-
先找到上传视频最多的 10 个用户的用户信息
select * from youtube_user_orc order by videos desc limit 10; 复制代码
-
-
-
通过 uploader 字段与 youtube_orc 表进行 join,得到的信息按照 views 观看次数进行排序 即可。
select t2.videoId, t2.views, t2.ratings, t1.videos, t1.friends from ( select * from youtube_user_orc order by videos desc limit 10) t1 join youtube_orc t2 on t1.uploader = t2.uploader order by views desc limit 20; 复制代码
-
4.8 统计每个类别视频观看数 Top10
-
- 先得到 categoryId 展开的表数据
-
- 子查询按照 categoryId 进行分区,然后分区内排序,并生成递增数字,该递增数字这一 列起名为 rank 列
-
-
通过子查询产生的临时表,查询 rank 值小于等于 10 的数据行即可。
select t1.* from ( select videoId, categoryId, views, row_number() over(partition by categoryId order by views desc) rank from youtube_category) t1 where rank <= 10;
-
5 JVM 堆内存溢出
描述: java.lang.OutOfMemoryError: Java heap space
Error: Java heap space堆栈空间太小了,在mapred-site.xml中设置 <name>mapred.child.java.opts</name> <value>-Xmx200m</value> 如果是新版本在这里在修改中 hadoop-env.sh export HADOOP_HEAPSIZE=2000 复制代码
解决: 在 yarn-site.xml 中加入如下代码
<property> <name>yarn.scheduler.maximum-allocation-mb</name> <value>2048</value> </property> <property> <name>yarn.scheduler.minimum-allocation-mb</name> <value>2048</value> </property> <property> <name>yarn.nodemanager.vmem-pmem-ratio</name> <value>2.1</value> </property> <property> <name>mapred.child.java.opts</name> <value>-Xmx1024m</value> </property> 复制代码
6 Hive性能优化详解
- 列裁剪
Hive在读数据的时候,可以只读取查询中所需要用到的列,而忽略其他的列。这样做可以节省读取开销,中间表存储开销和数据整合开销。
set hive.optimize.cp=true; // 默认为true Hive 在读数据的时候,可以只读取查询中所需要用到的列,而忽略其它列。 例如,若有以下查询: SELECT a,b FROM q WHERE e<10; 在实施此项查询中,Q 表有 5 列(a,b,c,d,e),Hive 只读取查询逻辑中真实需要 的 3 列 a、b、e, 而忽略列 c,d;这样做节省了读取开销,中间表存储开销和数据整合开销。 裁剪所对应的参数项为:hive.optimize.cp=true(默认值为真) 复制代码
-
分区裁剪
在查询的过程中只选择需要的分区,可以减少读入的分区数目,减少读入的数据量。 set hive.optimize.pruner=true; // 默认为true 复制代码
-
join优化:在进行join的时候,大表放在最后面,但是使用 / +streamtable(大表名称) / 来标记大表,那么大表放在什么位置都行了
select /*+streamtable(s)*/ s.ymd,d.dividend from stocks s inner join dividends d on s.ymd=d.ymd and s.symbol=d.symbol where s.symbol=’aapl’ 复制代码
-
在hive中,当对3个或更多张表进行join时,如果on条件使用相同字段,那么它们会合并为一个MapReduce Job,利用这种特性,可以将相同的join on的放入一个job来节省执行时间。
-
优先过滤数据,尽量减少每个阶段的数据量,对于分区表能用上分区字段的尽量使用,同时只选择后面需要使用到的列,最大限度的减少参与join的数据量。
-
启用mapjoin,mapjoin是将join双方比较小的表直接分发到各个map进程的内存中,在map进程中进行join操作,这样就可以不用进行reduce步骤,从而提高了速度。只有join操作才能启用mapjoin。
set hive.auto.convert.join = true; // 是否根据输入小表的大小,自动将reduce端的common join 转化为map join,将小表刷入内存中。 set hive.mapjoin.smalltable.filesize = 2500000; // 刷入内存表的大小(字节) set hive.mapjoin.maxsize=1000000; // Map Join所处理的最大的行数。超过此行数,Map Join进程会异常退出 复制代码
-
尽量原子操作,尽量避免一个 SQL 包含复杂的逻辑,可以使用中间表来完成复杂的逻辑。
-
并行执行, hive会将一个查询任务转化为一个或多个阶段。默认情况下,一次只执行一个阶段。如果某些阶段不是互相依赖的,是可以并行执行的,这样可以缩短整个job执行时间。
set hive.exec.parallel=true; // 可以开启并发执行。 set hive.exec.parallel.thread.number=16; // 同一个sql允许最大并行度,默认为8。 复制代码
-
中间数据压缩,中间数据压缩就是对hive查询的多个job之间的数据进行压缩。最好是选择一个节省CPU耗时的压缩方式。可以采用snappy压缩算法,该算法的压缩和解压效率都非常高。开启中间压缩(map输出结果(临时的)压缩) 。
set hive.exec.compress.intermediate=true; set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec; set hive.intermediate.compression.type=BLOCK; 复制代码
-
结果数据压缩
最终的结果数据(Reducer输出数据)也是可以进行压缩的,可以选择一个压缩效果比较好的,可以减少数据的大小和数据的磁盘读写时间;
注:常用的gzip,snappy压缩算法是不支持并行处理的,如果数据源是gzip/snappy压缩文件大文件,这样只会有有个mapper来处理这个文件,会严重影响查询效率。所以如果结果数据需要作为其他查询任务的数据源,可以选择支持splitable的LZO算法,这样既能对结果文件进行压缩,还可以并行的处理,这样就可以大大的提高job执行的速度了。关于如何给Hadoop集群安装LZO压缩库可以查看这篇文章。
set hive.exec.compress.output=true; set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec; set mapred.output.compression.type=BLOCK: 如何给hadoop安装压缩库 https://mp.weixin.qq.com/s?__biz=MzU5OTM5NjQ2NA==∣=2247483676&idx=1&sn=2a14972e97bc6c25647e962c12ce3e77&chksm=feb4d803c9c35115c66017e077fdc4b613515d3b93206d62400213bd5ab79ae46f125333db15&scene=21#wechat_redirect Hadoop集群支持一下算法: org.apache.hadoop.io.compress.GzipCodec org.apache.hadoop.io.compress.SnappyCodec com.hadoop.compression.lzo.LzopCodec org.apache.hadoop.io.compress.Lz4Codec 复制代码
-
本地化执行
对于小数据集,可以通过本地模式,在单台机器上处理所有任务,执行时间明显被缩短 set mapred.job.tracker=local; set hive.exec.mode.local.auto=true; 当一个job满足下面条件才能真正使用本地模式: job的输入数据大小必须小于参数hive.exec.mode.local.inputbytes.max(默认128M) job的map数必须小于参数hive.exec.mode.local.auto.tasks.max(默认4) job的reduce数必须为0或者1 复制代码
-
Map端聚合优化
hive.map.aggr=true; // 用于设定是否在 map 端进行聚合,默认值为真 hive.groupby.mapaggr.checkinterval=100000; // 用于设定 map 端进行聚合操作的条目数 复制代码
-
合并小文件
在执行MapReduce程序的时候,一般情况是一个文件需要一个mapper来处理。但是如果数据源是大量的小文件,这样岂不是会启动大量的mapper任务,这样会浪费大量资源。可以将输入的小文件进行合并,从而减少mapper任务数量.
Hadoop小文件问题解决方案 https://mp.weixin.qq.com/s?__biz=MzU5OTM5NjQ2NA==∣=2247483659&idx=1&sn=28a7d3e2c0bd87fa4239719b1b360aed&chksm=feb4d814c9c35102ad9f018342307e3fe86f06cfeb14d522bf563b0c61bba061907065490eba&scene=21#wechat_redirect set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; // Map端输入、合并文件之后按照block的大小分割(默认) set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; // Map端输入,不合并 一个文件起一个Map 复制代码
-
控制map任务数量
1)减少mapper数可以通过合并小文件来实现,增加mapper数可以通过控制上一个reduce。
输入文件总大小:total_size hdfs设置的数据量大小:dfs_block_size default_mapper_num=total_size/dfs_block_size set mapred.map.tasks=10; 复制代码
2)那如果我们需要减少mapper数量,但是文件大小是固定的,那该怎么办呢? 可以通过mapred.min.split.size设置每个任务处理的文件的大小,这个大小只有在大于dfs_block_size的时候才会生效。
split_size=max(mapred.min.split.size, dfs_block_size) split_num=total_size/split_size compute_map_num = min(split_num, max(default_mapper_num, mapred.map.tasks)) 复制代码
3)总结一下控制mapper个数的方法:
(1)如果想增加mapper个数,可以设置mapred.map.tasks为一个较大的值 (2)如果想减少mapper个数,可以设置maperd.min.split.size为一个较大的值 (3)如果输入是大量小文件,想减少mapper个数,可以通过设置 set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;合并小文件。 复制代码
-
控制reducer数量
1)如果reducer数量过多,一个reducer会产生一个结果文件,这样就会生成很多小文件,那么如果这些结果文件会作为下一个job的输入,则会出现小文件需要进行合并的问题,而且启动和初始化reducer需要耗费和资源。
2)如果reducer数量过少,这样一个reducer就需要处理大量的数据,并且还有可能会出现数据倾斜的问题,使得整个查询耗时长。
默认情况下,hive分配的reducer个数由下列参数决定: 参数1:hive.exec.reducers.bytes.per.reducer(默认1G) 参数2:hive.exec.reducers.max(默认为999) reducer的计算公式为: N=min(参数2, 总输入数据量/参数1) 可以通过改变上述两个参数的值来控制reducer的数量,也可以通过 set mapred.map.tasks=10; 直接控制reducer个数,如果设置了该参数,上面两个参数就会忽略。 复制代码
-
group by数据倾斜优化
在实际业务中,通常是数据集中在某些点上,这样在进行数据分组的时候,某一些组上数据量非常大,而其他的分组上数据量很小,在MapReduce程序中,同一个分组的数据会分配到同一个reduce上进行操作,这样会导致某一些reduce压力很大,一些reduce压力很小,这就是数据倾斜,整个job执行时间取决于那个执行最慢的那个reduce。
set hive.groupby.skewindata=false; //决定 group by 操作是否支持倾斜的数据。注意:只能对单个字段聚合 当上面选项设置为true的时候,生成的查询任务会生成两个MapReduce Job。 第一个Job,map的输出结果会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的 group by key有可能被分发到不同的reduce中,从而达到负载均衡的目的; 第二个Job再根据预处理的数据结果按照 group by key分布到reduce中,这个过程可以保证相同的 group by key被分布到同一个reduce中,最后完成最终的聚合操作。 复制代码
-
JVM重用
1)hadoop默认配置是使用派生JVM来执行Map和Reduce任务的,JVM的启动过程会造成相当大的开销。尤其是执行的job包含成千上万个task任务的情况。
2)JVM重用可以使得JVM实例在同一个job中重新使用N次,N的值可以在hadoop的配置文件mapred-site.xml文件中进行配置。
set mapred.job.reuse.jvm.num.tasks=20; 复制代码
3)JVM重用也是有缺点的,开启JVM重用会一直占用使用到的task的插槽,以便进行重用,知道任务完成后才会释放。 如果某个不平衡的job中有几个reduce task执行的时间要比其他的reduce task消耗的时间要多得多的话,那么保留的插槽就会一直空闲却无法被其他的job使用,直到所有的task都结束了才会释放。
-
列式存储
创建表的时候,可以设置成orc/parquet列式存储格式。因为列式存储的表,每一列的数据在物理上是存储在一起的,Hive在查询的时候只会遍历需要的列数据,从而可以大大减少处理的数据量。
7 总结
hive优化请优先过滤数据,启用mapjoin,Map端聚合优化,group by数据倾斜优化,JVM重用相对更有意义。
秦凯新 于深圳
版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。QQ邮箱地址:1120746959@qq.com,如有任何技术交流,可随时联系。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- immer.js 实战讲解
- js 防抖实战讲解
- 实战大数据平台开发架构讲解
- 实战讲解:如何用Python搭建一个服务器
- 跨域认证解决方案-JSON WEB TOKEN讲解与实战
- 【白瞟版】分布式商城系统,最牛逼技术栈,实战讲解407个视频,含源码
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。