内容简介:Spark原本预计在2.3版本实现聚合下推,虽然不知道是何原因最终没有能够在2.3版本最终实现,但是因为工作需要,必须要从聚合函数下手优化Spark SQL,遂思考之实现之。网上有个牛人想在2.2版本实现聚合下推并提交代码到Spark,结果在pull request里被拒绝了,Spark的人说他们在2.3会实现一套新的DataSource API,即DataSource API v2,所以让他不要这么执着于提交这个代码,我也是很醉。。。这里贴出他的博文:
Spark原本预计在2.3版本实现聚合下推,虽然不知道是何原因最终没有能够在2.3版本最终实现,但是因为工作需要,必须要从聚合函数下手优化Spark SQL,遂思考之实现之。
一篇有意义的参考文章
网上有个牛人想在2.2版本实现聚合下推并提交代码到Spark,结果在pull request里被拒绝了,Spark的人说他们在2.3会实现一套新的DataSource API,即DataSource API v2,所以让他不要这么执着于提交这个代码,我也是很醉。。。这里贴出他的博文:
该大牛是基于物理计划实现的下推,局限性比较大。所以我参考了他的思路,从逻辑计划和物理计划两个方面都做了一些优化。这里只讲逻辑计划的下推。下推,必然最后是推到数据源层,而Spark没有实现DataSource的聚合数据源的接口,这里可以参考下刚刚分享博文实现的 AggregatedFilteredScan
接口,我也是基于这个接口的做法实现的。
下推的意义
无论是传统的谓词下推,还是聚合下推,意义都在于将一些操作推到数据源层,这样从数据源里返回的数据就会极大减少。磁盘读写和网络开销都会降低,性能会得到提升。
难点的实现思路
聚合下推的最大难点,我认为是遇到了join,当join的on的两列不属于group列或者aggregate列该肿么办。最开始我认为这种情况可能没有办法下推,因为这样势必要在在group列中加上了原本不属于group的某一join列,这样会影响聚合的结果并且会多一次聚合。但是经过大神提点,其实这样也是可以下推的,原因有二:
- 即使多一个聚合节点,SQL执行的结果也是对的,也就是最终结果来看其实不应该聚合结果。
- 一般来说,join on的两列不可能有相同的行数,如果行数相同,那么按照数据库的设计规范,这两张表就应该Union成一张表。所以多的这个聚合节点,也是会减少数据源的数据传输的。
这两点在后面的例子会有展示。
一个下推到join的简单思路以及结果
数据源准备
使用上一篇文章的Scott数据源。
初始 SQL 以及逻辑计划
SQL:
SELECT AVG(salary), deptName FROM emp JOIN dept ON emp.deptNo = dept.deptNo GROUP BY deptName;
LogicalPlan:
Aggregate [deptName#8], [avg(cast(salary#6 as double)) AS avg(salary)#19, deptName#8] +- Project [salary#6, deptName#8] +- Join Inner, (deptNo#5 = deptNo#7) :- Project [deptNo#5, salary#6] : +- Filter isnotnull(deptNo#5) : +- Relation[empNo#2,empName#3,mgr#4,deptNo#5,salary#6] TestAggregatePushdownPlan2Scan(emp,StructType(StructField(empNo,IntegerType,true), StructField(empName,StringType,true), StructField(mgr,IntegerType,true), StructField(deptNo,IntegerType,true), StructField(salary,FloatType,true))) +- Project [deptNo#7, deptName#8] +- Filter isnotnull(deptNo#7) +- Relation[deptNo#7,deptName#8,loc#9] TestAggregatePushdownPlan2Scan(dept,StructType(StructField(deptNo,IntegerType,true), StructField(deptName,StringType,true), StructField(loc,StringType,true)))
逐步下推
- Step 1,指针在最上层,aggregate节点推到其child节点project的下面,同时将project里的salary#6替换成avg(salary)#19:
Project [avg(salary)#19, deptName#8] +- Aggregate [deptName#8], [avg(cast(salary#6 as double)) AS avg(salary)#19, deptName#8] +- Join Inner, (deptNo#5 = deptNo#7) :- Project [deptNo#5, salary#6] : +- Filter isnotnull(deptNo#5) : +- Relation[empNo#2,empName#3,mgr#4,deptNo#5,salary#6] TestAggregatePushdownPlan2Scan(emp,StructType(StructField(empNo,IntegerType,true), StructField(empName,StringType,true), StructField(mgr,IntegerType,true), StructField(deptNo,IntegerType,true), StructField(salary,FloatType,true))) +- Project [deptNo#7, deptName#8] +- Filter isnotnull(deptNo#7) +- Relation[deptNo#7,deptName#8,loc#9] TestAggregatePushdownPlan2Scan(dept,StructType(StructField(deptNo,IntegerType,true), StructField(deptName,StringType,true), StructField(loc,StringType,true)))
- Step 2,指针在第二层,这次是下推一层后的aggregate节点,搜索join节点下面的左右子project节点,看哪个有salary#6,往salary#6所在的子节点上方添加一个以join字段为group by条件的聚合节点,假设新生成exprId是20:
Project [avg(salary)#19, deptName#8] +- Aggregate [deptName#8], [avg(cast(salary#20 as double)) AS avg(salary)#19, deptName#8] +- Join Inner, (deptNo#5 = deptNo#7) :- Aggregate [deptNo#5], [avg(cast(salary#6 as double)) AS salary#20, deptNo#5] : +- Project [deptNo#5, salary#6] : +- Filter isnotnull(deptNo#5) : +- Relation[empNo#2,empName#3,mgr#4,deptNo#5,salary#6] TestAggregatePushdownPlan2Scan(emp,StructType(StructField(empNo,IntegerType,true), StructField(empName,StringType,true), StructField(mgr,IntegerType,true), StructField(deptNo,IntegerType,true), StructField(salary,FloatType,true))) +- Project [deptNo#7, deptName#8] +- Filter isnotnull(deptNo#7) +- Relation[deptNo#7,deptName#8,loc#9] TestAggregatePushdownPlan2Scan(dept,StructType(StructField(deptNo,IntegerType,true), StructField(deptName,StringType,true), StructField(loc,StringType,true)))
- Step 3,指针在第4层的aggregate,与1类似,将aggregate推到project下方,并将project中的salary#6替换成salary#20:
Project [avg(salary)#19, deptName#8] +- Aggregate [deptName#8], [avg(cast(salary#20 as double)) AS avg(salary)#19, deptName#8] +- Join Inner, (deptNo#5 = deptNo#7) :- Project [deptNo#5, salary#20] : +- Aggregate [deptNo#5], [avg(cast(salary#6 as double)) AS salary#20, deptNo#5] : +- Filter isnotnull(deptNo#5) : +- Relation[empNo#2,empName#3,mgr#4,deptNo#5,salary#6] TestAggregatePushdownPlan2Scan(emp,StructType(StructField(empNo,IntegerType,true), StructField(empName,StringType,true), StructField(mgr,IntegerType,true), StructField(deptNo,IntegerType,true), StructField(salary,FloatType,true))) +- Project [deptNo#7, deptName#8] +- Filter isnotnull(deptNo#7) +- Relation[deptNo#7,deptName#8,loc#9] TestAggregatePushdownPlan2Scan(dept,StructType(StructField(deptNo,IntegerType,true), StructField(deptName,StringType,true), StructField(loc,StringType,true)))
这时Aggregate-Filter-Relation的组合就会调用到上面提到的 AggregatedFilteredScan
接口,调用到数据源的buildscan()方法。
结果查看
这个下推,其实等价于下面的SQL和执行计划:
SQL:
SELECT AVG(salary), deptName FROM (SELECT AVG(salary), deptNo FROM emp GROUP BY deptNo) a JOIN dept ON a.deptNo = dept.deptNo GROUP BY deptName
LogicalPlan:
Project [avg(avgsalary)#21, deptName#9] +- Aggregate [deptName#9], [avg(avgsalary#2) AS avg(avgsalary)#21, deptName#9] +- Join Inner, (deptNo#6 = deptNo#8) :- Project [deptNo#6, avgsalary#2] : +- Aggregate [deptNo#6], [avg(cast(salary#7 as double)) AS avgsalary#2, deptNo#6] : +- Filter isnotnull(deptNo#6) : +- Relation[empNo#3,empName#4,mgr#5,deptNo#6,salary#7] TestAggregatePushdownPlan2Scan(emp,StructType(StructField(empNo,IntegerType,true), StructField(empName,StringType,true), StructField(mgr,IntegerType,true), StructField(deptNo,IntegerType,true), StructField(salary,FloatType,true))) +- Project [deptNo#8, deptName#9] +- Filter isnotnull(deptNo#8) +- Relation[deptNo#8,deptName#9,loc#10] TestAggregatePushdownPlan2Scan(dept,StructType(StructField(deptNo,IntegerType,true), StructField(deptName,StringType,true), StructField(loc,StringType,true)))
在没有聚合下推的情况下,返回的join两侧的数据源是:
data without aggregate function: List(ListBuffer(20, 800.0), ListBuffer(20, 3000.0), ListBuffer(20, 2975.0), ListBuffer(30, 1600.0), ListBuffer(30, 1250.0), ListBuffer(30, 2950.0), ListBuffer(10, 5000.0)) data without aggregate function: List(ListBuffer(10, accounting), ListBuffer(20, research), ListBuffer(30, sales), ListBuffer(40, operations))
而在下推的情况下,返回的join两侧数据源是:
data with aggregate function: ListBuffer(List(30, 5800.0, 3), List(20, 6775.0, 3), List(10, 5000.0, 1)) data without aggregate function: List(ListBuffer(10, accounting), ListBuffer(20, research), ListBuffer(30, sales), ListBuffer(40, operations))
可以看到,下推和没下推,在一侧数据源中拿到的数据有明显减少。这还只是数据量在不到20的情况下。在数据量大的情况下,那么聚合下推效果会更好。
二者查询结果都是:
+------------------+----------+ |avg(salary) |deptName | +------------------+----------+ |1933.3333333333333|sales | |5000.0 |accounting| |2258.3333333333335|research | +------------------+----------+
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- Data Source V2 聚合下推
- 【大数据】SparkSql连接查询中的谓词下推处理(一) 原 荐
- 【大数据 】SparkSQL连接查询中的谓词下推处理(二) 原 荐
- Spark SQL 源码剖析 PushDownPredicate:谓词不是想下推,想推就能推
- 监控聚合器系列之: open-falcon新聚合器polymetric
- elasticsearch学习笔记(七)——快速入门案例实战之电商网站商品管理:嵌套聚合,下钻分析,聚合分析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Web Data Mining
Bing Liu / Springer / 2011-6-26 / CAD 61.50
Web mining aims to discover useful information and knowledge from Web hyperlinks, page contents, and usage data. Although Web mining uses many conventional data mining techniques, it is not purely an ......一起来看看 《Web Data Mining》 这本书的介绍吧!