【译】深入理解 Spark SQL 的 Catalyst 优化器

栏目: Scala · 发布时间: 6年前

内容简介:原文:Spark SQL 是 Spark 最新且技术最复杂的组件之一。它同时支持 SQL 查询和新的 DataFrame API。Spark SQL 的核心就是 Catalyst 优化器,它以一种全新的方式利用高级语言的特性(例如:Scala 的模式匹配和 Quasiquotes ①)来构建一个可扩展的查询优化器。最近我们发表了一篇

原文: Deep Dive into Spark SQL’s Catalyst Optimizer

Spark SQL 是 Spark 最新且技术最复杂的组件之一。它同时支持 SQL 查询和新的 DataFrame API。Spark SQL 的核心就是 Catalyst 优化器,它以一种全新的方式利用高级语言的特性(例如:Scala 的模式匹配和 Quasiquotes ①)来构建一个可扩展的查询优化器。

最近我们发表了一篇 论文 将在 SIGMOD 2015 (合作者:Davies Liu,Joseph K. Bradley,Xiangrui Meng,Tomer Kaftan,Michael J. Franklin 和 Ali Ghodsi)。在本篇博文中,我们将重新发表论文中的部分内容,为广大读者解释 Catalyst 优化器的内部原理。

为了实现 Spark SQL,我们基于 Scala 函数式编程结构,设计了一个新的可扩展的优化器 Catalyst。Catalyst 的可扩展设计有两个目的:

  • 首先,我们希望能够非常容易地为 Spark SQL 添加新的优化技术和特性,尤其是为了应对我们遇到的大数据中的各种问题(例如:半结构化数据和高级分析);
  • 其次,我们希望外部的开发者可以扩展优化器。例如,为数据源添加特定的规则从而使过滤或聚合操作下推到外部的存储系统,或者支持新的数据类型。Catalyst 同时支持基于规则和基于成本的优化。

Catalyst 核心是一个树连同操作树的应用规则的通用库。框架顶层,我们构建了专门用于关系型查询处理的库(例如,表达式,逻辑查询计划),以及处理查询执行不同阶段的几组规则:分析,逻辑优化,物理计划和将部分查询编译为 Java 字节码的代码生成。对于后者,我们使用了另一个 Scala 特性 Quasiquotes,这让在运行时从组合表达式生成代码机器变得非常容易。最后,Catalyst 提供了若干公共的扩展点,包括扩展数据源和用户自定义类型。

Catalyst 主要的数据类型是由节点对象构成的树。每个节点有一个节点类型和零到多个子节点。新的节点类型在 Scala 中被定义为 TreeNode 类的子类。这些对象是不可变的,并且可以使用函数式的转换进行操作,我们将在下一小节讨论。

举个简单的例子,假设我们有以下三个节点类,用非常简单的表达式表示:

Literal(value: Int)
Attribute(name: String)
Add(left: TreeNode, right: TreeNode)

这些类可以用于构建树;例如,表达式 x+(1+2) ,将在 Scala 代码中表示为:

【译】深入理解 Spark SQL 的 Catalyst 优化器

规则

规则用于对树进行操作,实际上是将一棵树转换为另外一棵树的方法。虽然规则可以在其输入树上运行任意的代码(假定该树只是一个 Scala 对象),但最常见的方式是使用一组模式匹配函数,找到并替换特定结构的子树。

模式匹配是许多函数式编程语言的特性,允许从代数数据类型的潜在嵌套结构中进行值提取。在 Catalyst,树提供了转换方法可以递归地应用模式匹配函数到树的所有节点。例如,我们可以实现一个常量之间叠加操作的规则:

tree.transform {

  case Add(Literal(c1), Literal(c2)) => Literal(c1+c2)

}

应用这条规则到树 x+(1+2) 就会产生一棵新树 x+3 。在这里 case 关键字是 Scala 标准模式匹配的语法,可被用于匹配对象的类型以及命名值提取(这里是 c1 和 c2)。

被传递给转换操作的模式匹配表达式是一个偏函数 ②,这意味着它只需要匹配所有可能的输入树的子集。Catalyst 将测试规则适用于树的哪些部分,自动跳过并下降到不匹配的子树。这种能力意味着规则只需要对给定优化适用的树进行推理,而不是那些不匹配的。这样,即使添加新的操作类型到系统中,也不需要修改规则。

规则(通常是 Scala 的模式匹配)可以在相同的转换调用中匹配多个模式,这使得一次实现多个转换操作非常的简练:

tree.transform {

  case Add(Literal(c1), Literal(c2)) => Literal(c1+c2)

  case Add(left, Literal(0)) => left

  case Add(Literal(0), right) => right

}

实践中,规则可能需要执行多次才能完全转换一棵树。Catalyst 将规则分成 batch,执行各个 batch 直到达到一个固定的点,即应用规则后树不再更新。运行规则达到固定的点意味着每条规则可以非常简单且自包含,但最终仍然会在树上产生较大的全局效果。在上面的例子中,重复地应用规则将不断折叠较大的树,如 (x+0)+(3+3) 。另外一个例子,第一个 batch 也许分析一个表达式将类型赋给所有属性,而第二个 batch 可能使用这些类型进行不断折叠。每个 batch 之后,开发者还可以在新树上运行健全性检查(例如,查看所有的属性都指定了类型),通常也同样通过递归匹配来编写。

最后,规则条件及其实现可以包含具体的 Scala 代码。这使得 Catalyst 比优化器 DSL 更加强大,同时保持了规则的简洁性。

根据我们的经验,对不可变树执行函数式转换操作让整个优化器非常易于推理和调试。同时也使得优化器的转换操作可以并行化,尽管我们还没有利用起来。

在 Spark SQL 中使用 Catalyst

我们分四个阶段使用 Catalyst 通用树转换操作框架,如下所示:

  1. 分析逻辑计划解析引用
  2. 逻辑计划优化
  3. 物理计划
  4. 代码生成,编译部分查询为 Java 字节码

【译】深入理解 Spark SQL 的 Catalyst 优化器

分析

Spark SQL 以一个需要计算的关系开始,要么来自 SQL 解析器返回的抽象语法树(AST),要么来自 使用 API 构造的 DataFrame 对象。在两种情况下,关系可能包含未解析的属性引用或关系:例如,在 SQL 查询 SELECT col FROM sales ,col 的类型,甚至是否是一个合法的列名,在我们查询表 sales 之前都是未知的。如果我们不知道其类型或者没有匹配到输入表(或别名),那么这个属性就未被解析。Spark SQL 使用 Catalyst 规则和一个 Catelyst 对象去追踪所有数据源的表来解析这些属性。从未绑定的属性和数据类型构建一个“未解析的逻辑计划”,然后应用规则执行下面的步骤:

col = col
1 + col

总共,分析器相关的规则大概 [1000 行代码]。( https://github.com/apache/spark/blob/fedbfc7074dd6d38dc5301d66d1ca097bc2a21e0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala)。

逻辑优化

逻辑优化阶段对逻辑优化应用了标准的基于规则的优化方式。(执行基于成本的优化,通过使用规则生成多个计划并计算他们的成本。)包括:常量折叠(Constant Folding)、谓词下推(Predicate Pushdown)、投影裁剪(Projection Pruning)、空传递(Null Propagation)、布尔表达式简化(Boolean Expression Simplification)和其它规则。总的来说,我们发现为各种情形添加新的规则都极为简单。例如,当我们添加固定精度的 DECIMAL 类型到 Spark SQL 时,以低精度的方式优化对 DECIMAL 的如 SUM 和 AVG 的聚合操作;只要 12 行代码编写一条规则在 SUM 和 AVG 表达式中找到这样的 DECIMAL,然后将他们转换为 64 位的 LONG 类型进行聚合操作,最后将结果转换回来。下面是一个仅优化了 SUM 表达式的简化版本:

object DecimalAggregates extends Rule[LogicalPlan] {

  /** Maximum number of decimal digits in a Long */

  val MAX_LONG_DIGITS = 18

  def apply(plan: LogicalPlan): LogicalPlan = {

    plan transformAllExpressions {

      case Sum(e @ DecimalType.Expression(prec, scale))

          if prec + 10 <= MAX_LONG_DIGITS =>

        MakeDecimal(Sum(UnscaledValue(e)), prec + 10, scale) }

}

另外一个例子,一条 12 行的规则通过简单的正则表达式将 LIKE 表达式优化为 String.startsWith 或 String.contains 调用。在规则中使用任意 Scala 代码的自由,使得这些优化超越了模式匹配子树结构,更易于表达。

总共,逻辑优化规则大概 [800 行代码]。( https://github.com/apache/spark/blob/fedbfc7074dd6d38dc5301d66d1ca097bc2a21e0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala)。

物理计划

物理计划阶段,Spark SQL 将一个逻辑计划使用匹配的 Spark 执行引擎的物理操作符生成一个或更多的物理计划。然后选择一个计划应用成本模型。此时,基于成本的优化器只用于选择连接算法:对于已知的很小的关系,Spark SQL 使用 broadcast join,使用 Spark 里可用的点对点的广播工具。框架支持更广泛的使用基于成本的优化,这是因为成本可以通过对整棵树使用规则来递归估计。所以,未来我们打算实现更丰富的基于成本的优化。

物理计划同样执行基于规则的物理优化,如在一个 Spark 的 map 操作执行流水线投影(Piplining Projection)或过滤。除此之外,还可以从逻辑计划将操作推到支持谓词或投影下推的数据源。我们将在之后的章节描述这些数据源的 API。

总共,物理计划规则大概 500 行代码

代码生成

查询优化的最后阶段涉及生成运行在各台机器上的 Java 字节码。由于 Spark SQL 通常是运行在内存数据集上,其处理受限于 CPU,因此我们希望支持代码生成来加快执行速度。然而,构建代码生成引擎非常的复杂,尤其是编译器。Catalyst 依赖于 Scala 语言特定的属性 Quasiquotes 使得代码生成更加简单。Quasiquotes 允许在 Scala 语言中使用编程的方式构建抽象语法树(ASTs),然后可以在运行时提供给 Scala 编译器生成字节码。我们使用 Calalyst 将 SQL 表达式的树转换为 Scala 代码的 AST 评估表达式,然后编译并运行生成的代码。

举一个简单的例子,回忆 4.2 节介绍的属性和字面量树节点 Add,这使得我们能够写出表达式 (x+y)+1 。如果没有代码生成,这样的表达书不得不解析每一行数据,一直走到 Add 树,属性和字面量节点。

大量的分支和虚函数调用将减慢执行速度。通过代码生成,我们可以向下面,写一个函数将特定的表达式树转换为 Scala AST:

def compile(node: Node): AST = node match {

  case Literal(value) => q"$value"

  case Attribute(name) => q"row.get($name)"

  case Add(left, right) => q"${compile(left)} + ${compile(right)}"

}

以 q 开头的字符串就是 Quasiquotes,虽然长得像字符串,但是 Scala 编译器会在编译时解析它们,并表示代码中的 ASTs。Quasiquotes 支持变量或其它 ASTs 片段拼接,使用 $ 进行表示。举个例子, Literal(1) 变成了 Scala AST 中的 1,而 Attribute("x") 变成了 row.get("x") 。最后,像是 Add(Literal(1), Attribute("x")) 的树变成了 Scala 表达式 AST 1+row.get("x")

Quasiquotes 会在编译时进行类型检查以确保只有合适的 ASTs 或者字面量能够被替换,这比字符串连接更有用,而且是直接生成 Scala AST 树而不是在运行时运行 Scala 解析器。此外,由于每个节点代码的生成规则不需要知晓其子节点是如何构建的,因此它们是高度可组合的。最后,如果 Catalyst 缺少表达式级别的优化,Scala 编译器会对代码进行进一步的优化。下图展示了 Quasiquotes 生成的代码性能近似于手动优化的程序性能。

【译】深入理解 Spark SQL 的 Catalyst 优化器

我们发现了 Quasiquotes 可以直接用于代码生成,而且我们观察到即使是 Spark SQL 新的提交者也可以快速增加新的表达式类型规则。Quasiquotes 也与我们运行在原生 Java 对象的目标相契合:当需要访问对象中的字段时,我们通过代码生成直接访问需要的字段,而不必拷贝对象到一个 Spark SQL 的 Row 中然后使用 Row 的访问方法。最后,将代码生成评估与没有生成代码的表达式解析评估结合起来也非常便捷,因为我们编译的 Scala 代码可直接在表达式解析器中调用。

总共, Catalyst 代码生成大概 700 行代码

本篇博客覆盖了 Spark SQL 的 Catalyst 优化器的内部实现。全新的简单的设计使得 Spark 社区可以快速建立原型,实现和扩展引擎。可以通过论文中剩余的部分。如果你参加今年的 SIGMOD,请来参加我们的分享吧!

更多关于 Spark SQL 的信息:

参考


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Web 2.0 Architectures

Web 2.0 Architectures

Duane Nickull、Dion Hinchcliffe、James Governor / O'Reilly / 2009 / USD 34.99

The "Web 2.0" phenomena has become more pervasive than ever before. It is impacting the very fabric of our society and presents opportunities for those with knowledge. The individuals who understand t......一起来看看 《Web 2.0 Architectures》 这本书的介绍吧!

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具