内容简介:01—
“本文分析SparkSQL ThriftServer工作原理,修改Spark SQL源代码并实现了 SQL 查询进度的计算,最后展示了一点资讯基于Presto+SparkSQL+Hive的Web查询引擎 ”
01
—
问题背景:SparkSQL ThriftServer 无法获取查询进度
目前公司的分布式Adhoc查询有以下几种:Presto,Hive,SparkSQL。公司外部开源系统里比较知名的有Redash,Zeppelin等查询工具。
以上查询 工具 均支持:Hive、SparkSQL、 Mysql 等查询数据源,SQL格式化,结果进行 排序 等。Redash还支持语法动态提示等功能,非常的方便。
然而很遗憾的是: 上述开源工具,无论是对接SparkSQL还是对接HiveServer2都有一个致命的问题就是—— 无法知晓查询的进度 ,甚至是像Presto这种本身带有进度的查询引擎,在接入Redash等系统后,也无法知晓当前查询的时间、进度、剩余时间等。
查询进度重要吗?
从人机交互的角度看,快查询进度条意义不大,但是Hive等MR查询 长时间的盲目等待是难以接受的(Hive命令行提供大致的MR进度展示)。 如MysqlWorkBench和Navicat Mysql等工具
-
Mysql查询大都是毫秒级别返回,不显示进度条。
-
导入导出数据库时以上工具都有进度条,即长时间作业需要进度条。
我们的目标是 :深入分析SparkSQL ThriftServer的工作过程,并通过修改源代码等方式来获取执行的状态,让Presto查询、Hive查询、SparkSQL查询均能感知到进度。本文重点介绍了SparkSQL查询进度的实现。
02
—
SparkSQL ThriftServer工作原理剖析
图1: SparkSQL ThriftServer工作的5个步骤
从上图可知,SparkSQL ThriftServer的启动过程分 5个步骤。 在步骤3 ExecuteStatement时,SparkSQL 为每一个查询创建Spark的Job,并随机生成UUID。
一点大数据团队修改SparkSQL源代码实现定制化的JobID生成,并跟踪每个SparkJob的运行过程实现SQL执行进度的计算。
1:SparkSQL初始化
图2: SparkSQL ThriftServer 初始化
如上图所知,SparkSQL ThriftServer初始化包含以下几个步骤:
-
Java Main函数启动,初始化HiveServer2,创建 ThriftBinaryCLIService
-
创建Hive模块内的 ThriftBinaryCLIService 并传入 SparkSQL模块内的SparkSqlCliService
-
创建SparkSQLSessionManager 通过反射方式将CliService中的SessionManager设置为自身
-
使用反射是 因为Hive代码中的CLiService的SessonManager为 Private方法
-
Thrift BinaryService启动,并根据配置的ThriftSever端口、地址等信息创建TThreadPoolServer。
2:建立链接OpenSession:
客户端(Beeline CLI) 向服务器ThriftBinaryServer发送OpenSession的Thrift请求:
-
SparkSQLCLIService 调用初始化过程中创建的SessionManager 创建Session
-
此时根据配置的不同为Thrift的会话绑定一个SparkSession
-
每个Session设置自己的SessionManager和OperationManager
-
SessonManager用于管理整个Hive的查询周期,比如建立链接、关闭链接等。
-
OperationManager 完成具体任务:用于执行具体的任务逻辑比如查看表信息、查看元数据
-
SparkSQL重写OperationManager的 newExecuteStatementOperation()方法, 转换为Spark作业
3:发起SQL查询ExecuteStatement(重点)
图3: Beeline 客户端发起查询SQL请求到 SparkSQL Server
-
客户端向服务器ThriftBinaryServer发送ExecuteStatement的Thrift请求,服务器接到请求后获取当且Thrift会话中的 HiveSession ,并调用 ExecuteStatement 方法。
-
继续调用 executeStatementInternal 函数。此方法调用 步骤2:建立链 接OpenSession : 中初始化好的 OperationManager执行 newExecuteStatementOperation ()
-
HiveSession继续调用重写的 runInternal ()方法,将Hive请求进一步调用 execute() 来完成Spark的计算逻辑。
-
Spark收到查询请求后,在 SparkExecuteStatementOperation 中为每一个查询语句随机生成UUID作为JobId:
-
此处是关键点即Spark通过UUID来识别每一个的Job,并在Spark UI显示
-
而UUID和SQL语句间没有直接的关联
-
Spark执行SparkContext.Sql() 函数直接计算获取结果的DataFrame,结果计算完成将statement标记为完成状态。返回给客户端 OperationHandle 句柄作为查询结果的依据。
4: 客户端获取结果FetchResults
客户端检查Opeartion的状态,当发现是FINISHED状态时候,请求结果的元数据Meta信息和结果内容信息,分别由客户端的 FetchResults 请求和 GetResultSetMetadata 请求
-
Spark 将DataFrame的每一行转换为Thrift结果的Row
-
Spark 将DataFrame的 DataFrame的行列信息转换为 Hive的元数据信息。
5:关闭并释放链接
03
—
设计原理与代码修改
步骤1: 一点查询客户端根据SQL生成UUID并保存对应关系:
-
我们根据Hive JDBC接口,封装了自己的查询客户端,并生成唯一的查询ID
-
QueryID 样例:
-
20181014_101745_57dbf79bc1e1f27f82911b00b91ddcde
-
客户端通过 jdbc:hive2://IP:Port/dw? shark.sparksql.queryid=$QueryID 这样的连接来访问后端的ThriftServer。
步骤2: 服务器接受SQL和QueryID,将Spark的JobID设置为QueryID:
图4: 如何实现HiveJDBC客户端传递UUID到服务器端
-
Spark ThriftServer 建立连接请求的时候,根据Url中的参数信息,HiveSession在建立链接的时候解析 JDBC:hive2 url参数 并且将参数 “shark.sparksql.queryid“ , 以Key-Value 的形式存放于 HiveConf
-
在OperationManager中,当创建ExecuteStatementOperation时候,读取所有HiveConf中的配置信息,写入到SparkContxt的配置中。
-
在 执行 ExecuteStatementOperation 时,取上下文 “shark.sparksql.quer yid" 对应的值,并将其设置为JobID, 此ID在SparkUI中可以查询到
-
statementId=sqlContext.getConf("shark.sparksql.queryid", statementId)
修改后的SparkSQL执行页面中JobID已经生效
步骤3: 客户端读取 Spark RESTful API 获取查询进度:
我们采用Spark的Restful API来获取每一个查询ID的进展情况
Spark RESTful API的访问地址为
http://HOST:4041/api/v1/applications
-
Step1:查询当前Spark的ApplicationID:http://HOST:4041/api/v1/applications
-
Step2:查询当前Spark的所有Job列表:http://HOST:4041/api/v1/applications/application_id/jobs application_id为 Step_1中 取到的application_id
-
Step3: 根据QueryID,可以获取到当前QueryID下所有任务数(numTasks),已经完成的任务数(numCompletedTasks),跳过的Task数(numSkippedTasks)
-
计算公式为:完成任务数+跳过任务数)/(总任务数)
图6: Spark 的Restful API获取任务状态
步骤4:效果展示
通过以上对SparkSQL的改进,我们的Shark 一点大数据查询系统可以支持SparkSQL的进度查询。
图7: 蓝色为SparkSQL 查询进度条 当前进度9%
另外我们通过SQL语法分析工具、LeaderLatch(ZK Leader选择)等技术实现了SQL格式化、高可用的Presto等,使得我们的Shark查询引擎达到了生产系统的要求。
图8: 一点大数据查询系统与Redash、Zeppelin对比分析
04
—
未来规划
1: SparkSQL ThriftServer的分布式化:
-
智能的客户端根据查询类型、数据分区大小提交至不同的SparkSQL ThriftServer。
-
对数据取样、小查询等建立单独的SparkSQL查询后端服务,提高查询的响应速度对大数据量、高内存占用查询分配大内存的Executor保障查询的稳定性。
-
统一的Web客户端维护查询ID和后端SparkThriftServer的关系,支持失败重试、同时提交等功能。
2 : 用户无感知的 智能查询引擎选择
-
综合利用Presto、SparkSQL、Hive等查询系统。短查询优先Presto,长查询优先Hive
-
统一查询语法,消除Presto语法和Hive语法直接的不同。
-
Presto和Hive的UDF不同,需要建立统一的UDF管理系统实现语法和代码的统一管理。
以上所述就是小编给大家介绍的《一点资讯 SparkSQL 查询引擎实践》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- 火山引擎 Redis 云原生实践
- 微博大数据即席查询 (OLAP) 引擎实践
- 实时计算引擎在贝壳的应用与实践
- 最佳实践 | OceanBase事务引擎的技术创新
- 美团无人车引擎在仿真中的实践
- 字节跳动在 RocksDB 存储引擎上的改进实践
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
The Smashing Book
Jacob Gube、Dmitry Fadeev、Chris Spooner、Darius A Monsef IV、Alessandro Cattaneo、Steven Snell、David Leggett、Andrew Maier、Kayla Knight、Yves Peters、René Schmidt、Smashing Magazine editorial team、Vitaly Friedman、Sven Lennartz / 2009 / $ 29.90 / € 23.90
The Smashing Book is a printed book about best practices in modern Web design. The book shares technical tips and best practices on coding, usability and optimization and explores how to create succes......一起来看看 《The Smashing Book》 这本书的介绍吧!