内容简介:Enzyme是挖财数据团队自研的SQL执行引擎,适用于小规模或者中型数据集的快速计算。基于Spark Catalyst实现,Enzyme SQL在查询层面 和Spark SQL完全兼容。至于Dataframe,在Enzyme中有对应的Protein。在API的层次上,Protein和Spark Dataframe几乎完全一致。Enzyme SQL目前应用于信贷风控体系中的变量中心。变量,也就是指标或者特征,是描述一个用户的一个值。最初,变量的加工逻辑由负责风控的数据分析师提供,需要通过数据团队的工程师用Ja
Enzyme是挖财数据团队自研的 SQL 执行引擎,适用于小规模或者中型数据集的快速计算。基于Spark Catalyst实现,Enzyme SQL在查询层面 和Spark SQL完全兼容。至于Dataframe,在Enzyme中有对应的Protein。在API的层次上,Protein和Spark Dataframe几乎完全一致。
应用
Enzyme SQL目前应用于信贷风控体系中的变量中心。变量,也就是指标或者特征,是描述一个用户的一个值。最初,变量的加工逻辑由负责风控的数据分析师提供,需要通过数据团队的工程师用 Java 代码实现。这种方式比较原始,研发的链路和周期也相对冗长。故而,我们使用SQL作为一种加工变量的DSL,提供在离线和实时两个平台上的一致语义。
为什么要使用SQL呢?首先,自研DSL需要做很多设计,包括易用性、实现层面的性能等等;其次,自研的DSL最终被接受被高效使用,不可避免会有一个相对较长的磨合周期;最后,SQL作为数据分析师的看家本领,没有使用的障碍和语义上的歧义,其实现也已经有大量现有的代码可供参考。
Enzyme SQL引擎极致的性能表现和非常低的CPU占用与内存消耗,有效地支撑了变量中心庞大的计算量(一个用户就会触发数以千计的变量计算)。
实践
Enzyme设计之初就是以兼容Spark SQL为目标的,故而在使用上,和Spark SQL的API大体是一致的。EnzymeSession即SparkSession,Protein即Dataframe。
我们从构建一个Protein数据集开始:
// a session for computing val conf = new EnzymeConf val session = new EnzymeSession(conf) // construct a protein from rows and schemas val schema = StructType(Seq( StructField("x", LongType), StructField("y", StringType), StructField("z", DoubleType), StructField("in", IntegerType) )) val rows = Seq(Row(1L, "234L", 1.1, 12), Row(2L, "23L", 23.4, 4245), Row(2L, "65L", 5244.2, 234), Row(null, "7L", 245.234, 5245), Row(4L, "7L", 245.234, 5245)) val df = new Protein(session, rows, schema) 复制代码
这样的一个数据集可以直接展示:
> df.show() +----+----+-------+----+ | x| y| z| in| +----+----+-------+----+ | 1|234L| 1.1| 12| | 2| 23L| 23.4|4245| | 2| 65L| 5244.2| 234| |null| 7L|245.234|5245| | 4| 7L|245.234|5245| +----+----+-------+----+ 复制代码
如果要使用SQL,首先我们要把这个数据集和一个表名关联起来:
> session.register(tableName = "a", df) > session.sql("select * from a").show() +----+----+-------+----+ | x| y| z| in| +----+----+-------+----+ | 1|234L| 1.1| 12| | 2| 23L| 23.4|4245| | 2| 65L| 5244.2| 234| |null| 7L|245.234|5245| | 4| 7L|245.234|5245| +----+----+-------+----+ 复制代码
上面的代码中 session.sql()
的结果还是一个Protein。除了使用SQL,我们还可以使用Protein里面丰富的API:
> session.sql("select * from a order by x asc").show() +----+----+-------+----+ | x| y| z| in| +----+----+-------+----+ |null| 7L|245.234|5245| | 1|234L| 1.1| 12| | 2| 23L| 23.4|4245| | 2| 65L| 5244.2| 234| | 4| 7L|245.234|5245| +----+----+-------+----+ > df.sort("x").show() +----+----+-------+----+ | x| y| z| in| +----+----+-------+----+ |null| 7L|245.234|5245| | 1|234L| 1.1| 12| | 2| 23L| 23.4|4245| | 2| 65L| 5244.2| 234| | 4| 7L|245.234|5245| +----+----+-------+----+ 复制代码
更多用法的细节可以查看Spark SQL的文档,也可以查看Enzyme的文档。
实现
Enzyme基于Spark Catalyst实现,而Catalyst对标的开源项目是Apache Calcite。Apache Phoenix和Apache Hive等众多项目都在使用Calcite。因为我们的目标是兼容Spark SQL,自然而然选择了Catalyst,作为SQL的解析器、逻辑计划的执行器和优化器。
Spark Catalyst概览
- SQL Text
- (parse): Unresolved Logical Plan
- (analyze): Resolved Logical Plan
- (optimize): Optimized Logical Plan(s) ----- RBO
- (planning): Physical Planning ------ CBO
- (optimize): Optimized Logical Plan(s) ----- RBO
- (analyze): Resolved Logical Plan
- (parse): Unresolved Logical Plan
上面的层次结构简明地概括了一个SQL从最原始的SQL文本,到最后执行的各个阶段。其中加粗的部分是Enzyme中所实现的,未加粗的部分是Catalyst所提供的功能。
解析,就是用Antlr4将SQL文本变成一棵AST树,这个AST树经过转换,变成了最原始的逻辑计划。在这样的逻辑计划中,我们是不知道 *
所表示的字段究竟是哪些。
分析,就是结合Catalog中的元数据信息,将原始的逻辑计划中各个未确定的部分(比如 *
)和元数据匹配确定下来。如果发现类型无法满足或者所引用的字段根本不存在,就直接抛出AnalysisException。
优化,即通过逻辑计划的等价变换,转换得到最优的逻辑计划。Catalyst中内置了一系列既有的优化规则,比如谓词下推和列剪裁。我们也可以通过Catalyst提供的接口,将自己研发的优化规则加入其中。这里的优化就是RBO,基于规则的优化。
最后是物理计划的生成,一个优化过后的逻辑计划其实可以生成多种等效的物理计划,数据最终决定了其中一个物理计划是最优的。在没有时光机的当下,我们无法将所有物理计划都运行一遍,再选择最优的那个。所以通常的做法是,收集一些关于底层表的统计信息,依据这些信息,预判出执行效率最高的物理计划。这就是所谓的CBO,基于代价的优化。
一个SQL的一生
SELECT * FROM employee INNER JOIN department ON employee.DepartmentID = department.DepartmentID 复制代码
我们用上面这个SQL来详细了解一下上述各个阶段。
分析阶段
Project [*] +- 'Join Inner, ('employee.DepartmentID = 'department.DepartmentID) :- 'UnresolvedRelation `employee` +- 'UnresolvedRelation `department` Project [LastName#6, DepartmentID#7L, DepartmentID#0L, DepartmentName#1] +- Join Inner, (DepartmentID#7L = DepartmentID#0L) :- SubqueryAlias employee : +- LocalRelation [LastName#6, DepartmentID#7L] +- SubqueryAlias department +- LocalRelation [DepartmentID#0L, DepartmentName#1] 复制代码
我们看到 *
已经被展开成了四个明确的字段,而且每个字段都有明确的ID标志,从而可以明确判定这个字段来自于哪一个表。当我们需要对Spark SQL做精确到字段级别的权限控制的时候,我们所需要的其实就是经过分析的逻辑计划。
优化
Project [LastName#6, DepartmentID#7L, DepartmentID#0L, DepartmentName#1] +- Join Inner, (DepartmentID#7L = DepartmentID#0L) :- SubqueryAlias employee : +- LocalRelation [LastName#6, DepartmentID#7L] +- SubqueryAlias department +- LocalRelation [DepartmentID#0L, DepartmentName#1] Join Inner, (DepartmentID#7L = DepartmentID#0L) :- Filter isnotnull(DepartmentID#7L) : +- LocalRelation [LastName#6, DepartmentID#7L] +- Filter isnotnull(DepartmentID#0L) +- LocalRelation [DepartmentID#0L, DepartmentName#1] 复制代码
因为这是一个inner join,所以这里的一个优化点其实是在做join之前,把join key为null的行过滤掉。
物理计划的生成
我们模仿Spark SQL中SparkPlan的实现,提供了简化的EnzymePlan:
abstract class EnzymePlan extends QueryPlan[EnzymePlan] { def iterator: Iterator[InternalRow] override def output: Seq[Attribute] ... } trait LeafExecNode extends EnzymePlan { override final def children: Seq[EnzymePlan] = Nil } trait UnaryExecNode extends EnzymePlan { def child: EnzymePlan override final def children: Seq[EnzymePlan] = child :: Nil } trait BinaryExecNode extends EnzymePlan { def left: EnzymePlan def right: EnzymePlan override final def children: Seq[EnzymePlan] = Seq(left, right) } 复制代码
在这个代码片段中,EnzymePlan是核心,其中output表示一个物理计划的节点上结果集的元数据信息,而iterator则是调用这个物理计划节点的入口。我们看到有三类物理计划:
- LeafExecNode : LocalTableScanExec, LazyLocalTableScanExec
- UnaryExecNode : ProjectExec, LimitExec, FilterExec
- BinaryExecNode : HashJoinExec, NestedLoopExec
Enzyme中的部分物理计划实现分类之后,如上所示。物理计划整体上是一棵树,数据实际上是从叶节点(Leaf)开始,经过过滤或者转换(Unary)或者合流(Binary),最终汇聚到根节点,得到计算结果。叶节点就是我们的数据源。有两个输入源的是Union或者Join,而只有一个输入源的就是Projection,Filter,Sort等算子。
上一节中优化之后的逻辑计划可以生成这样的物理计划:
HashJoinExec [DepartmentID#11L], [DepartmentID#4L] , BuildRight, Inner :- FilterExec isnotnull(DepartmentID#11L) : +- LazyLocalTableScan [LastName#10, DepartmentID#11L], employee, catalog@60dcf9ec +- FilterExec isnotnull(DepartmentID#4L) +- LazyLocalTableScan [DepartmentID#4L, DepartmentName#5], department, catalog@60dcf9ec 复制代码
计算通过在根节点调用iterator方法,层层回溯:
HashJoinExec.iterator + FilterExec.iterator + LazyLocalTableScan(employee).iterator + FilterExec.iterator + LazyLocalTableScan(department).iterator 复制代码
性能调优
首先,我们需要定位性能瓶颈。JVM生态中有很多做Profiling的工具。Enzyme在优化过程中,使用的是JDK中自带的jmc命令和FlightRecord。通过jmc的分析,可以定位到热点的方法,耗时的方法等有帮助的信息。我们有两种优化的策略。
- 其一,直接替换掉慢的部分
- 其二,对无法优化的部分做必要的缓存
- 其三,逻辑计划优化
优化点一:动态代码生成调优
Spark的钨丝计划引入了动态代码生成的技术,比较有效地解决了三方面的问题(详见参考资料2):
- 大量虚函数调用,生成的实际代码不再需要执行表达式系统中统一定义的虚函数
- 判断数据类型和操作算子等内容的大型分支选择语句
- 常数传播限制,生成的代码能够确定性地折叠常量
对于Enzyme的使用场景,动态代码生成并不一定有性能优化的效果,我们使用JMH做基准测试,将一部分使得性能变差的代码生成关闭掉。
数以千计的SQL会生成大量Java类,在引擎中编译并缓存,会带来一些内存上的占用和CPU的消耗,也是我们做取舍的其中一个原因。
优化点二:缓存
我们做的最主要的缓存就是从Unresolve Logical Plan到Physical Plan的生成。为什么不是直接从SQL到Physical Plan呢?因为SQL解析的开销实际上很小,而且略有差异的SQL所生成的Unresolved Logical Plan可能是一模一样的。
在物理计划的缓存中,还有两点需要注意:
- 其一,物理计划必须和数据隔离
- 其二,物理计划的计算不能有副作用
只有这样,我们的缓存才是有效的、正确无误的。另外,在表的schema发生改变的时候,我们还需要让所缓存的相关物理计划失效。
优化点三:新增逻辑计划优化规则
Catalyst中的优化器提供了可扩展的接口,使得我们可以自定义逻辑计划优化的规则。Databricks在Spark Summit上做过一个题为A Deep Dive into Spark SQL's Catalyst Optimizer的讲座,其中有细节的介绍。
具体的接口如下:
spark.experimental.extraStrategies = CustomizedExtraStrategy :: Nil 复制代码
我们利用这个接口,针对我们的业务数据,专门定制了一系列额外的优化规则,极大地提升了引擎的性能。
Enzyme的未来
- 开源
- 做更多针对小数据集的优化,进一步改善性能
- 基于Enzyme,做一些上层生态的扩展
对于第三点,我们想做的实际上是让Enzyme和其他生态更好地结合。比如如何将Enzyme运用到Spark Streaming或者Flink Streaming中,如何在Spring Boot中更加方便地使用Enzyme,如何在机器学习中使用Enzyme。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 模板引擎实现(一)词法分析
- 一步步实现 Redis 搜索引擎
- InnoDB存储引擎MVCC实现原理
- 500行 python 代码实现模板引擎
- 一步步实现 Redis 搜索引擎
- KOA的简易模板引擎实现方式
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
深入理解程序设计
[美] Jonathan Bartlett / 郭晴霞 / 人民邮电出版社 / 2014-1 / 49.00
是否真正理解汇编语言,常常是普通程序员和优秀程序员的分水岭。《深入理解程序设计:使用Linux汇编语言》介绍了Linux平台下的汇编语言编程,教你从计算机的角度看问题,从而了解汇编语言及计算机的工作方式,为成就自己的优秀程序员之梦夯实基础。 很多人都认为汇编语言晦涩难懂,但New Medio技术总监Jonathan Bartlett的这本书将改变人们的看法。本书首先介绍计算机的体系结构,然后......一起来看看 《深入理解程序设计》 这本书的介绍吧!
Markdown 在线编辑器
Markdown 在线编辑器
RGB HSV 转换
RGB HSV 互转工具