内容简介:01引言
01
引言
当前,业务互联网大厂基于Spark计算引擎在大数据离线计算领域所占的比例越来越高。 结合当前我们自身所面临的资源紧张,我们有必要有步骤的将传统的任务从MR计算引擎引擎切换到Spark计算引擎。
首先我们从如下两种计算引擎的基本架构图来分析,
分别为:Spark/MapReduce的性能
1.1
MapReduce
处理效率低效 :
-
Map/Reduce任务中间结果写磁盘,多个MR之间通过HDFS交换数据,任务调度和启动开销大;
-
一条 SQL 语句经常被拆分成多个Application,数据在多个Application之间只能通过读写HDFS交换;
-
无法充分利用内存。
1.2
S park
高效(比MapReduce快几倍到几十倍)
-
内存计算引擎,提供Cache机制来支持需要反复迭代计算或者多次数据共享,减少数据读取的IO开销,另外为了解决纯内存计算带来的数据可靠性,引入了Checkpoint机制;
-
DAG引擎,减少多次计算之间中间结果写到HDFS的开销;
-
Executor使用线程池模型来减少task启动开销,shuffle过程中避免 不必要的sort操作以及减少磁盘IO操作。
1.3
Spark Engine架构图
-
Spark Core (Spark基本数据结构)
-
Spark Streaming (微批处理)
-
MLib (机器学习)
-
GraphX (图计算)
-
Spark SQL (SQL结构化语言处理)
1.4
Spark SQL 应用场景
Spark SQL的使用场景广泛:传统的数据库,NoSQL数据库,大数据领域的其他存储系统都可以使用Spark SQL访问
02
影响 Spark任务快慢的因素
-
数据量 (GB级 vs TB级)
-
数据组织形式 (存储结构,压缩算法,数据Schema[Map等复杂结构])
-
小文件 (很多的KB级,10M级)
-
内存 (内存偏少,大量溢写)
-
Core个数 (并发量小,shuffle等待)
-
算法 (聚合因子,过滤条件,SQL组织形式 )
-
发生大量的Shuffle (可用Broadcast替换)
-
是否使用缓存 (将经常访问的数据缓存在Executor内存中)
2.1
Executor
建议不要自己配置Executor个数,使用 动态分配模式 :
概念 :
根据当前的负载动态的增加或者删除Executor,这样做的好处在于:
在业务组的队列资源 (vcore, memory) 资源恒定情况下,能更好的均衡各个业务的对资源的占用。也就是对于一个计算量较小的任务不用占用太多资源,而对于一个计算量较大的任务,也能从集群中获取相对较多的资源。
而采用指定模式,则会导致任务在获取足够多 (可通过参数设置比例) 的Executor之前一直处于等待状态,而这通常会浪费计算资源。
2.1.1Executor动态分配模型:
ExecutorAllocationManager内部会定时根据工作负载计算所需的Executor数量:
-
如果任务对Executor需求数量大于之前向集群管理器申请的Executor数量,那么向Yarn申请添加Executor;
-
如果任务对Executor需求数量小于之前向集群管理器申请的Executor数量,那么向Yarn申请取消部分Executor;
-
ExecutorAllocationManager内部还会定时向Yarn申请移除(杀死)过期的Executor。
2.1.1Executor固定分配:
在执行job之前,executor资源申请到的数量要达到 80%(默认),可通过参数:
2.2
Core
spark.executor.cores (默认值1) ,在默认情况下spark.task.cpus (每个task使用的core个数也为1)
-
建议executor的cpu core数量设置为2 ~ 3个比较合适
(同时伴随需要调整 spark.executor.memory);
-
在队列有大量任务提交的情况下,还要更少,以免影响其他用户提交的任务因申请不到cpu资源而卡主。
2.3
Memory
2.3.1 统一内存模型
2.3.2 spark.executor.memory
建议: 每个Executor的每个core分配的内存设置4g较为合适。用户设置该值的时候需要考虑如下影响因子:
-
自己使用的executor-memory * num-executor所使用的资源不能超过所提交队列的阈值;
-
在队列资源共用的模式下,所申请的资源还要更小,以免申请不到资源或者阻塞其他用户的任务;
-
用户申请的executor-momory不能超过yarn设置的最大值,当前设置的最大值为60g。
Storage Memory 这片内存区域是为了解决:
block cache (Rdd.cache, rdd.persist等方法) ,还有就是broadcasts,以及task results的存储。 可以通过参数设置,如果你大量调用了持久化操作或广播变量,那可以适当调高它; 参数:
spark.storage.memoryFraction (默认值:0.6)
Execution Memory 这片内存区域是为了解决 shuffles,joins, sorts and aggregations 过程中为了避免频繁IO需要的buffer; 参数:
spark.shuffle.memoryFraction (默认值:0.2)
User Memory:应用程序本身执行需要的内存
2.3.2.1 统一内存模型动态调节机制
根据应用的不同可自己动态调整,但通常情况下不需要调整,使用默认值即可。上图展示的是Storage内存与Execution的内存动动态调节机制。
2.3.3 spark.executor.memoryOverhead
主要用于JVM自身,字符串, NIO Buffer等开销:
-
拉取远端的RDD Block;
-
RDD.persist(StorageLevel.DISK_ONLY)
RDD.persit(StorageLevel.MEMORY_AND_DISK)
RDD.persist(StorageLevel.MEMORY_AND_DISK_SER)
等含有disk level的cache RDD操作
对于Driver: 拉取Executor端Task Result数据回Driver节点时, 此处消耗的DirectMemory内存 = conf.getInt("spark.resultGetter.threads", 4) * TaskResultSize
2.4
Shuffle并行度
2.4.1spark.default.parallelism
该参数用于设置每个stage的默认task数量,这个参数极为重要,如果不设置可能会直接影响你的任务性能。 (只有在处理RDD时才会起作用,对Spark SQL无效)
建议: 500 ~ 1000较为合适。
2.4.2spark.sql.shuffle.partitions
用于配置 join 或聚合操作shuffle数据时使用的分区数(则是对sparks SQL专用的设置,目前不用自己设置,使用:
2.5
存储结构
目前HADOOP中常用的数据存储结构包括:
Text (行式存储)
CSV (行式存储)
RCFile (列式存储)
ORC (列式存储)
Parquet (列式存储)
目前Spark默认存储的格式为Parquet。 下图展示的是相同数据以不同存储结构存储,存储文件的Size对比:
2.5.1列式存储的好处:
-
查询的时候不需要扫描全部的数据,而只需要读取每次查询涉及的列,这样可以将I/O消耗降低N倍,另外可以保存每一列的统计信息 (min、max、sum等) ,实现部分的谓词下推;
-
由于每一列的成员都是同构的,可以针对不同的数据类型使用更高效的数据压缩算法,进一步减小I/O;
-
由于每一列的成员的同构性,可以使用更加适合CPU pipeline的编码方式,减小CPU的缓存失效;
-
由于列式存储数据量更小,Spark的Task读取数据的时间更短,不光节省计算资源,还节省存储资源。
建表是可通过如下方式指定存储格式:
CREATE TABLE parquet_table_name (x INT, y STRING) STORED AS PARQUET
2.5.2 向量化读
列式存储的向量化操作,相对于行式存储一行一行的操作,列式存储可做到一个batch一个batch的操作,这样的操作方式极大的提升了运算性能。
从Spark 2.3开始,Spark使用新ORC文件格式的向量化的ORC reader来支持ORC文件。为此,新添加了以下配置。
当spark.sql.orc.impl设置为native,并且spark.sql.orc.enableVectorizedReader设置为true时:
向量化reader用于原生ORC表对于Hive ORC serde表,
当spark.sql.hive.convertMetastoreOrc也设置为true时,使用向量化reader。
原生ORC表 , 例如:
使用USING ORC子句创建的表
Hive ORC serde表 , 例如:
使用USING HIVE OPTIONS(fileFormat'ORC')子句创建的表
2.5.2.1 向量化读限制
对于不同的数据存储格式需要满足不同的条件才能使用向量化阅读
ORC格式 , 需要满足以下条件:
-
开启spark.sql.orc.enableVectorizedReader: 默认true;
-
开启spark.sql.codegen.wholeStage: 默认true, 并且其scheme的长度不大于wholeStageMaxNumFields(默认100列), 参数:spark.sql.codegen.maxFields,可设置;
-
[关键]所有列数据类型需要为AtomicType类型的。
Parquet格式 , 需要满足以下条件:
-
开启spark.sql.orc.enableVectorizedReader: 默认true;
-
开启spark.sql.codegen.wholeStage: 默认true;
-
[关键]所有列数据类型需要为AtomicType类型的。
AtomicType 是指:
Map,Array,Object,UDT等复杂结构体之外的类型。
2.6
压缩方式 (gzip, bzip2, lzo, snappy)
通过上述两张图得出以下结论:
-
CSV,Text的低效性;
-
压缩比不压缩性能高;
-
不同压缩格式性能有所差异 。
2.6.1 snappy压缩前后对比
Snappy压缩前后比例为3:1
2.6.2 压缩优点
对于Spark任务来说,压缩的数据带来的好处是显而易见的:
-
大幅节省内存
-
大幅节省磁盘
-
大幅节省数据读取时间
2.6.3 设置压缩格式
Text 格式
Parquet格式
2.7
小文件
所谓小文件,我们定义为小于一个HDFS Block大小一下的文件。
案例1:
2.7.1 缺点
-
小文件太多,导致每个task读取的数据量较小,计算的时间很短;
-
执行的时间不足以弥补JVM启动的时间;
-
由于集群中NameNode节点需要维护文件的元数据信息,太多的输出小文件会给集群的NameNode带来巨大的压力;
2.7.2 解决方案
可通过控制task个数的方式来对输出数据重分区,通过这种方式可以达到减少或者扩大task个数的目的,从而控制输出文件数量。
2.8
控制task个数
2.8.1 Repartition
使用hint将会使得输入数据进行重新Repartition,调节最终task的个数以及输出文件的个数:
通过重分区将减少或者增大分区数量以达到增加或减少task的数量,从而增大或者减少Task输出的文件个数。
案例:
1.没有repartition
时间:
2.repartition
时间:
2.8.2 控制每个task处理的数据量
Orc, parquet格式
其他格式
通过调整上述参数可调整每个task处理数据的大小,从而调整task的个数。调整这些会对任务的执行性能带来一些改变,也能在一定程度上解决小文件问题。
2.8
下推 (PushDownPredicate)
上图中:
-
方式1从磁盘中读取出所有的数据,在内存中过滤;
-
方式2,3将过滤从内存中下推到磁盘,在扫描磁盘的数据的时候就过滤掉数据。
概念: 所谓下推是指将过滤尽可能地下沉到数据源端,从而避免从磁盘读取不必要数据。
下推与不下推性能对比 (DataBricks官方)
通过上图可知: 60%的下推比不下推的性能提高了2~18倍 。
2.9.1 SQL分区裁剪
目前Spark 在第一次读取Hive表数据时,会全量查询一次Hive数据以便拿到元数据信息,这样会list所有分区,这是个社区的bug已经在修复中。
2.9.2 谓词下推的限制
-
只有operator 包含的所有expression都是确定性的时候才可以下推, 比如 rand 表达式等等;
-
Filter 的字段必须要在group by 的维度字段里面,举个例子:
①下面的聚合是 可以谓词下推 的:
select a, count(*) as c from t1 group by a where a ==“1"
②下面的聚合是 不可以谓词下推 的:
select count(*) as c from t1 where c == “10”
select a, count(b) as c from t1 group by a where c == “10“
案例 :
Regexp是 非确定 的,导致下推失败
regexp导致非确定性
确定性
二者的执行计划不同--不带regexp的SQL的执行计划显示在数据源扫描阶段执行过滤数据的操作;而带有regexp的SQL的执行计划显示是先把所有的数据都扫描完了才做过滤。
2.10
广播
对该变量 (表) 进行广播。广播后的变量,会保证每个Executor的内存中,只驻留一份变量副本,而Executor中的task执行时共享该Executor中的那份变量副本。这样的话,可以大大减少变量副本的数量,从而减少网络传输的性能开销,并减少对Executor内存的占用开销,降低GC的频率。
可适当调整广播变量大小的阈值,使得稍微大一些的数据也能被广播:
案例:
2.10.1 关闭Broadcast
耗时:
2.10.2 开启Broadcast
耗时:
通过案例可以看出: 开启Broadcast比不开启 B roadcast , 程序性能提升 1倍。
2.11
缓存
2.11.1 缓存原则
将数据缓存在内存中, 遵循的原则:
-
数据重复使用
-
重新生成这部分数据的代价昂贵
权衡cache与否的代价,不cache则多次使用同一份数据都需要重新计算一次。Cache则只会计算一次,但是会占用executor的内存资源,那是否应该cache就是把计算RDD,从hdfs上获取数据的时间资源与缓存数据的内存资源之间进行权衡。
rdd1,rdd2不需要缓存
rdd可以缓存,rdd1,rdd2不需要缓存
2.11.2 使用方式
对View缓存:
2.12
表结构嵌套
Spark SQL 处理嵌套类型数据时,存在以下问题:
-
读取大量不必要的数据:
对于 Parquet / ORC 等列式存储格式,可只读取需要的字段,而直接跳过其它字段,从而极大节省 IO。而对于嵌套数据类型的字段,如下图中的 Map 类型的 people 字段,往往只需要读取其中的子字段,如 people.age。却需要将整个 Map 类型的 people 字段全部读取出来然后抽取出 people.age 字段。这会引入大量的无意义的 IO 开销。如果是几百个 Key,这也就意味着 IO 被放大了几十至几百倍。
-
无法进行向量化读取:
而向量化读能极大的提升性能。但截止到目前, Spark 不支持包含嵌套数据类型的向量化读取。这极大地影响了包含嵌套数据类型的查询性能。
-
不支持 Filter 下推:
Spark 不支持嵌套类型字段上的 Filter 的下推。
-
重复计算:
JSON 字段,在 Spark SQL 中以 String 类型存在,严格来说不算嵌套数据类型。不过实践中也常用于保存不固定的多个字段,在查询时通过 JSON Path 抽取目标子字段,而大型 JSON 字符串的字段抽取非常消耗 CPU。对于热点表,频繁重复抽取相同子字段非常浪费资源。
案例1:
这张表是业务用户的表结构,用户行为数据以Json形式上报。由于表的结构实在太过复杂,Column字段存在大量的Map结构,分析层面很难通过简单的SQL语句来分析这行数据,只能以读取HDFS,在代码层面来做数据分析。
案例2:
用户需要查询某个年龄段的人群,第一张表需要读取people整个struct,而第二张表则只需要基于age过滤。
2.13
SQL标准化
SQL语句中的Column类型一定要与Hive数仓中的表的Column类型保持一致,Spark SQL相对Hive SQL来说对于语法的检查更为严格。
案例 :
Partition中dayno是String类型,此处是Int类型,导致需要做转换,耗费CPU计算资源。
查询值类型与Hive表字段类型一致
耗时
查询值类型与Hive表字段类型不一致
二者执行计划不一样,如果字符串不匹配会先去做cast类型转换,然后才比较。
03
结语
通过不断的优化SQL,优化表结构,优化数据存储格式等等措施,一定能让Spark SQL的性能得到极大的提升。
投稿 | 大数据平台
编辑 | sea
排版 | sea
往期推荐
在看点一下 大家都知道
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- Spark查询优化案例分享
- 技术资讯 | Spark sql优化案例分享
- 代码审计之Fiyo CMS案例分享
- 从零快速搭建Next框架案例分享
- Android组件化开发实践和案例分享
- 西瓜创客 x Leangoo敏捷实践案例分享
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Clean Architecture
Robert C. Martin / Prentice Hall / 2017-9-20 / USD 34.99
Practical Software Architecture Solutions from the Legendary Robert C. Martin (“Uncle Bob”) By applying universal rules of software architecture, you can dramatically improve developer producti......一起来看看 《Clean Architecture》 这本书的介绍吧!
XML 在线格式化
在线 XML 格式化压缩工具
html转js在线工具
html转js在线工具