Data Source V2 聚合下推

栏目: 数据库 · 发布时间: 5年前

内容简介:当 Spark 应用程序对来自不同数据源的分布式数据进行操作时,它们通常必须直接查询 Spark 外部的数据源,比如支持关系数据库或数据仓库。为此,Spark 提供了 Data Source API,这些 API 是一种即插即用机制(pluggable mechanism),用于通过 Spark SQL 访问结构化数据。Data Source API 与 Spark Optimizer 紧密集成。它们提供了诸多优化,比如将过滤器下推到外部数据源和列剪枝(column pruning)。虽然这些优化显著加快了

背景和动机

当 Spark 应用程序对来自不同数据源的分布式数据进行操作时,它们通常必须直接查询 Spark 外部的数据源,比如支持关系数据库或数据仓库。为此,Spark 提供了 Data Source API,这些 API 是一种即插即用机制(pluggable mechanism),用于通过 Spark SQL 访问结构化数据。Data Source API 与 Spark Optimizer 紧密集成。它们提供了诸多优化,比如将过滤器下推到外部数据源和列剪枝(column pruning)。虽然这些优化显著加快了 Spark 查询的执行速度,但根据数据源的不同,它们仅支持将部分功能下推到数据源并执行。我们正在开展一个提供通用数据源下推 API 的项目,作为该项目的一部分,本博客将介绍我们有关聚合下推的工作。我们开放了 Spark jira 22390 来解决此问题,设计文档已在 jira 中。

过滤器下推实现

SQL 语句中,过滤器通常用于选择满足给定条件的行。在 Spark 中,可以使用以下实现将过滤器下推到数据源层:

  1. 逻辑计划过滤器包含在 Catalyst Expression 中。
  2. 一个 Catalyst Expression 被转换为 数据源过滤器
  3. 如果该 Catalyst Expression 无法转换为数据源 过滤器 ,或者不受数据源支持,那么将在 Spark 层上处理它。
  4. 否则,会将它下推到数据源层。

聚合下推

SQL 中经常使用聚合函数利用一组输入值来计算单个结果。最常用的聚合函数是 AVG、COUNT、MAX、MIN 和 SUM。如果 SQL 语句中的聚合得到与 Spark 具有相同语义的数据源的支持,那么可以将这些聚合下推到数据源级别,以提升性能。性能提升主要表现在两个领域:

  • Spark 与数据源之间的网络 IO 显著减少。
  • 由于索引的存在,数据源中的聚合计算速度变得更快。

聚合通常与过滤器结合使用。例如:

Select sum(i) From T Where i > 3 Group by j Having sum(i) > 10

下图展示了如何使用 Spark Data Source v2 下推上述过滤器并进行聚合。

Data Source V2 聚合下推

Data Source V2 聚合下推

DSV:数据源视图

AGG:聚合

FilterPD: 过滤器下推

AGGPD:聚合下推

下推聚合和无下推聚合的逻辑/物理计划

让我们看看前一个 SQL 语句对于下推和非下推情况的逻辑和物理计划。在 Spark 中,前面的 SQL 语句可以编写为 df.filter('i > 3).groupBy('j).agg(sum($"i")).where(sum('i) > 10) 在聚合下推前,只有过滤器 i> 3 被下推到数据源。但聚合函数 sum (i) 和聚合过滤器 sum(i) > 10 仍在 Spark 层。下面给出了经过优化的逻辑计划和物理计划:

== Optimized Logical Plan ==
Project [j#1, sum(i)#10L]
+- Filter (isnotnull(sum(cast(i#0 as bigint))#21L) && (sum(cast(i#0 as
      bigint))#21L > 10))
   +- Aggregate [j#1], [j#1, sum(cast(i#0 as bigint)) AS sum(i)#10L,
     sum(cast(i#0 as bigint)) AS sum(cast(i#0 as bigint))#21L]
      +- DataSourceV2Relation (source=AdvancedDataSourceV2,schema=[i#0 int,
       j#1 int],filters=[isnotnull(i#0), (i#0 > 3)]


== Physical Plan ==
*Project [j#1, sum(i)#10L]
+- *Filter (isnotnull(sum(cast(i#0 as bigint))#21L) && (sum(cast(i#0 as
      bigint))#21L > 10))
   +- *HashAggregate(keys=[j#1], functions=[sum(cast(i#0 as bigint))],
     output=[j#1, sum(i)#10L, sum(cast(i#0 as bigint))#21L])
       +- Exchange hashpartitioning(j#1, 5)
      +- *HashAggregate(keys=[j#1], functions=[partial_sum(cast(i#0 as
          bigint))], output=[j#1, sum#24L])
      +- DataSourceV2Scan(source=AdvancedDataSourceV2,
      schema=[i#0 int, j#1 int],filters=[isnotnull(i#0), (i#0 > 3)]

聚合下推后,它具有以下经过优化的逻辑计划和物理计划:

== Optimized Logical Plan ==
DataSourceV2Relation (source=AdvancedDataSourceV2,schema=[i#0 int, j#1 int],filters=[isnotnull(i#0), (i#0 > 3)] aggregates=[sum(cast(i#0 as bigint))], groupby=[j#1], havingClause==[sum(cast(i#0 as bigint))>10], options=Map())

== Physical Plan ==
DataSourceV2Scan(source=AdvancedDataSourceV2,schema=[i#0 int, j#1 int],filters=[isnotnull(i#0), (i#0 > 3)] aggregates=[sum(cast(i#0 as bigint))], groupby=[j#1], havingClause==[sum(cast(i#0 as bigint))>10], options=Map())

使用 TPCDS 1TB 设置的性能结果

在具有大量聚合的工作负载中,此功能的早期原型表现出很大的改进。下面是一些结果:

测试 1(含 group by、完全下推、无 partition)

select sum(cs_quantity), cs_sold_date_sk from catalog_sales group by cs_sold_date_sk

无聚合下推:782.287 秒

聚合下推:250.331 秒

性能提升:约 3 倍的提升

测试 2(无 group by、完全下推、无 partition)

SELECT avg(ss_quantity), avg(ss_ext_sales_price), avg(ss_ext_wholesale_cost), sum(ss_ext_wholesale_cost) FROM store_sales

无聚合下推:2219.104 秒

聚合下推:839.664 秒

性能提升:约 2.6 倍的提升

测试 3(含 group by、完全下推、partition 列与 group by 列相同)

select sum(cs_quantity), cs_sold_date_sk from catalog_sales group by cs_sold_date_sk partition by cs_sold_date_sk

无聚合下推:588.918 秒

聚合下推:296.763 秒

性能提升:约 2 倍的提升

测试 4(含 group by、完全下推、partition 列与 group by 列不同)

select sum(cs_quantity), cs_sold_date_sk from catalog_sales group by cs_sold_date_sk partition by cs_sold_time_sk

无聚合下推:344.509 秒

聚合下推:225.186 秒

性能提升:约 1.5 倍的提升

本文翻译自 : Data Source V2 aggregate push down (2018-10-23)


以上所述就是小编给大家介绍的《Data Source V2 聚合下推》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

领域驱动设计

领域驱动设计

[美] Eric Evans / 赵俐、盛海艳、刘霞 / 人民邮电出版社 / 2016-6-1 / 69

本书是领域驱动设计方面的经典之作,修订版更是对之前出版的中文版进行了全面的修订和完善。 全书围绕着设计和开发实践,结合若干真实的项目案例,向读者阐述如何在真实的软件开发中应用领域驱动设计。书中给出了领域驱动设计的系统化方法,并将人们普遍接受的一些实践综合到一起,融入了作者的见解和经验,展现了一些可扩展的设计新实践、已验证过的技术以及便于应对复杂领域的软件项目开发的基本原则。一起来看看 《领域驱动设计》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

html转js在线工具
html转js在线工具

html转js在线工具