Data Source V2 聚合下推

栏目: 数据库 · 发布时间: 5年前

内容简介:当 Spark 应用程序对来自不同数据源的分布式数据进行操作时,它们通常必须直接查询 Spark 外部的数据源,比如支持关系数据库或数据仓库。为此,Spark 提供了 Data Source API,这些 API 是一种即插即用机制(pluggable mechanism),用于通过 Spark SQL 访问结构化数据。Data Source API 与 Spark Optimizer 紧密集成。它们提供了诸多优化,比如将过滤器下推到外部数据源和列剪枝(column pruning)。虽然这些优化显著加快了

背景和动机

当 Spark 应用程序对来自不同数据源的分布式数据进行操作时,它们通常必须直接查询 Spark 外部的数据源,比如支持关系数据库或数据仓库。为此,Spark 提供了 Data Source API,这些 API 是一种即插即用机制(pluggable mechanism),用于通过 Spark SQL 访问结构化数据。Data Source API 与 Spark Optimizer 紧密集成。它们提供了诸多优化,比如将过滤器下推到外部数据源和列剪枝(column pruning)。虽然这些优化显著加快了 Spark 查询的执行速度,但根据数据源的不同,它们仅支持将部分功能下推到数据源并执行。我们正在开展一个提供通用数据源下推 API 的项目,作为该项目的一部分,本博客将介绍我们有关聚合下推的工作。我们开放了 Spark jira 22390 来解决此问题,设计文档已在 jira 中。

过滤器下推实现

SQL 语句中,过滤器通常用于选择满足给定条件的行。在 Spark 中,可以使用以下实现将过滤器下推到数据源层:

  1. 逻辑计划过滤器包含在 Catalyst Expression 中。
  2. 一个 Catalyst Expression 被转换为 数据源过滤器
  3. 如果该 Catalyst Expression 无法转换为数据源 过滤器 ,或者不受数据源支持,那么将在 Spark 层上处理它。
  4. 否则,会将它下推到数据源层。

聚合下推

SQL 中经常使用聚合函数利用一组输入值来计算单个结果。最常用的聚合函数是 AVG、COUNT、MAX、MIN 和 SUM。如果 SQL 语句中的聚合得到与 Spark 具有相同语义的数据源的支持,那么可以将这些聚合下推到数据源级别,以提升性能。性能提升主要表现在两个领域:

  • Spark 与数据源之间的网络 IO 显著减少。
  • 由于索引的存在,数据源中的聚合计算速度变得更快。

聚合通常与过滤器结合使用。例如:

Select sum(i) From T Where i > 3 Group by j Having sum(i) > 10

下图展示了如何使用 Spark Data Source v2 下推上述过滤器并进行聚合。

Data Source V2 聚合下推

Data Source V2 聚合下推

DSV:数据源视图

AGG:聚合

FilterPD: 过滤器下推

AGGPD:聚合下推

下推聚合和无下推聚合的逻辑/物理计划

让我们看看前一个 SQL 语句对于下推和非下推情况的逻辑和物理计划。在 Spark 中,前面的 SQL 语句可以编写为 df.filter('i > 3).groupBy('j).agg(sum($"i")).where(sum('i) > 10) 在聚合下推前,只有过滤器 i> 3 被下推到数据源。但聚合函数 sum (i) 和聚合过滤器 sum(i) > 10 仍在 Spark 层。下面给出了经过优化的逻辑计划和物理计划:

== Optimized Logical Plan ==
Project [j#1, sum(i)#10L]
+- Filter (isnotnull(sum(cast(i#0 as bigint))#21L) && (sum(cast(i#0 as
      bigint))#21L > 10))
   +- Aggregate [j#1], [j#1, sum(cast(i#0 as bigint)) AS sum(i)#10L,
     sum(cast(i#0 as bigint)) AS sum(cast(i#0 as bigint))#21L]
      +- DataSourceV2Relation (source=AdvancedDataSourceV2,schema=[i#0 int,
       j#1 int],filters=[isnotnull(i#0), (i#0 > 3)]


== Physical Plan ==
*Project [j#1, sum(i)#10L]
+- *Filter (isnotnull(sum(cast(i#0 as bigint))#21L) && (sum(cast(i#0 as
      bigint))#21L > 10))
   +- *HashAggregate(keys=[j#1], functions=[sum(cast(i#0 as bigint))],
     output=[j#1, sum(i)#10L, sum(cast(i#0 as bigint))#21L])
       +- Exchange hashpartitioning(j#1, 5)
      +- *HashAggregate(keys=[j#1], functions=[partial_sum(cast(i#0 as
          bigint))], output=[j#1, sum#24L])
      +- DataSourceV2Scan(source=AdvancedDataSourceV2,
      schema=[i#0 int, j#1 int],filters=[isnotnull(i#0), (i#0 > 3)]

聚合下推后,它具有以下经过优化的逻辑计划和物理计划:

== Optimized Logical Plan ==
DataSourceV2Relation (source=AdvancedDataSourceV2,schema=[i#0 int, j#1 int],filters=[isnotnull(i#0), (i#0 > 3)] aggregates=[sum(cast(i#0 as bigint))], groupby=[j#1], havingClause==[sum(cast(i#0 as bigint))>10], options=Map())

== Physical Plan ==
DataSourceV2Scan(source=AdvancedDataSourceV2,schema=[i#0 int, j#1 int],filters=[isnotnull(i#0), (i#0 > 3)] aggregates=[sum(cast(i#0 as bigint))], groupby=[j#1], havingClause==[sum(cast(i#0 as bigint))>10], options=Map())

使用 TPCDS 1TB 设置的性能结果

在具有大量聚合的工作负载中,此功能的早期原型表现出很大的改进。下面是一些结果:

测试 1(含 group by、完全下推、无 partition)

select sum(cs_quantity), cs_sold_date_sk from catalog_sales group by cs_sold_date_sk

无聚合下推:782.287 秒

聚合下推:250.331 秒

性能提升:约 3 倍的提升

测试 2(无 group by、完全下推、无 partition)

SELECT avg(ss_quantity), avg(ss_ext_sales_price), avg(ss_ext_wholesale_cost), sum(ss_ext_wholesale_cost) FROM store_sales

无聚合下推:2219.104 秒

聚合下推:839.664 秒

性能提升:约 2.6 倍的提升

测试 3(含 group by、完全下推、partition 列与 group by 列相同)

select sum(cs_quantity), cs_sold_date_sk from catalog_sales group by cs_sold_date_sk partition by cs_sold_date_sk

无聚合下推:588.918 秒

聚合下推:296.763 秒

性能提升:约 2 倍的提升

测试 4(含 group by、完全下推、partition 列与 group by 列不同)

select sum(cs_quantity), cs_sold_date_sk from catalog_sales group by cs_sold_date_sk partition by cs_sold_time_sk

无聚合下推:344.509 秒

聚合下推:225.186 秒

性能提升:约 1.5 倍的提升

本文翻译自 : Data Source V2 aggregate push down (2018-10-23)


以上所述就是小编给大家介绍的《Data Source V2 聚合下推》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

SQL完全手册

SQL完全手册

格罗夫 / 电子工业 / 2006-6 / 68.00元

本书为专业和非专业用户、程序员、数据处理方面的专业人士和希望理解sQL在今天计算机产业中的影响的经理们提供了关于SQL语言的全面深入的介绍。本书为理解和使用SQL提供了一个概念上的框架,描述了SQL的历史和SQL的标准,解释了SQL在各种计算机产业领域(如企业级数据处理、数据仓库、Web站点体系结构)中的作用。这一版包含一些新的章节,专门讲述SQL在应用服务器体系结构中的作用、sQL与xML的集成......一起来看看 《SQL完全手册》 这本书的介绍吧!

SHA 加密
SHA 加密

SHA 加密工具

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换