内容简介:01—
“本文分析SparkSQL ThriftServer工作原理,修改Spark SQL源代码并实现了 SQL 查询进度的计算,最后展示了一点资讯基于Presto+SparkSQL+Hive的Web查询引擎 ”
01
—
问题背景:SparkSQL ThriftServer 无法获取查询进度
目前公司的分布式Adhoc查询有以下几种:Presto,Hive,SparkSQL。公司外部开源系统里比较知名的有Redash,Zeppelin等查询工具。
以上查询 工具 均支持:Hive、SparkSQL、 Mysql 等查询数据源,SQL格式化,结果进行 排序 等。Redash还支持语法动态提示等功能,非常的方便。
然而很遗憾的是: 上述开源工具,无论是对接SparkSQL还是对接HiveServer2都有一个致命的问题就是—— 无法知晓查询的进度 ,甚至是像Presto这种本身带有进度的查询引擎,在接入Redash等系统后,也无法知晓当前查询的时间、进度、剩余时间等。
查询进度重要吗?
从人机交互的角度看,快查询进度条意义不大,但是Hive等MR查询 长时间的盲目等待是难以接受的(Hive命令行提供大致的MR进度展示)。 如MysqlWorkBench和Navicat Mysql等工具
-
Mysql查询大都是毫秒级别返回,不显示进度条。
-
导入导出数据库时以上工具都有进度条,即长时间作业需要进度条。
我们的目标是 :深入分析SparkSQL ThriftServer的工作过程,并通过修改源代码等方式来获取执行的状态,让Presto查询、Hive查询、SparkSQL查询均能感知到进度。本文重点介绍了SparkSQL查询进度的实现。
02
—
SparkSQL ThriftServer工作原理剖析
图1: SparkSQL ThriftServer工作的5个步骤
从上图可知,SparkSQL ThriftServer的启动过程分 5个步骤。 在步骤3 ExecuteStatement时,SparkSQL 为每一个查询创建Spark的Job,并随机生成UUID。
一点大数据团队修改SparkSQL源代码实现定制化的JobID生成,并跟踪每个SparkJob的运行过程实现SQL执行进度的计算。
1:SparkSQL初始化
图2: SparkSQL ThriftServer 初始化
如上图所知,SparkSQL ThriftServer初始化包含以下几个步骤:
-
Java Main函数启动,初始化HiveServer2,创建 ThriftBinaryCLIService
-
创建Hive模块内的 ThriftBinaryCLIService 并传入 SparkSQL模块内的SparkSqlCliService
-
创建SparkSQLSessionManager 通过反射方式将CliService中的SessionManager设置为自身
-
使用反射是 因为Hive代码中的CLiService的SessonManager为 Private方法
-
Thrift BinaryService启动,并根据配置的ThriftSever端口、地址等信息创建TThreadPoolServer。
2:建立链接OpenSession:
客户端(Beeline CLI) 向服务器ThriftBinaryServer发送OpenSession的Thrift请求:
-
SparkSQLCLIService 调用初始化过程中创建的SessionManager 创建Session
-
此时根据配置的不同为Thrift的会话绑定一个SparkSession
-
每个Session设置自己的SessionManager和OperationManager
-
SessonManager用于管理整个Hive的查询周期,比如建立链接、关闭链接等。
-
OperationManager 完成具体任务:用于执行具体的任务逻辑比如查看表信息、查看元数据
-
SparkSQL重写OperationManager的 newExecuteStatementOperation()方法, 转换为Spark作业
3:发起SQL查询ExecuteStatement(重点)
图3: Beeline 客户端发起查询SQL请求到 SparkSQL Server
-
客户端向服务器ThriftBinaryServer发送ExecuteStatement的Thrift请求,服务器接到请求后获取当且Thrift会话中的 HiveSession ,并调用 ExecuteStatement 方法。
-
继续调用 executeStatementInternal 函数。此方法调用 步骤2:建立链 接OpenSession : 中初始化好的 OperationManager执行 newExecuteStatementOperation ()
-
HiveSession继续调用重写的 runInternal ()方法,将Hive请求进一步调用 execute() 来完成Spark的计算逻辑。
-
Spark收到查询请求后,在 SparkExecuteStatementOperation 中为每一个查询语句随机生成UUID作为JobId:
-
此处是关键点即Spark通过UUID来识别每一个的Job,并在Spark UI显示
-
而UUID和SQL语句间没有直接的关联
-
Spark执行SparkContext.Sql() 函数直接计算获取结果的DataFrame,结果计算完成将statement标记为完成状态。返回给客户端 OperationHandle 句柄作为查询结果的依据。
4: 客户端获取结果FetchResults
客户端检查Opeartion的状态,当发现是FINISHED状态时候,请求结果的元数据Meta信息和结果内容信息,分别由客户端的 FetchResults 请求和 GetResultSetMetadata 请求
-
Spark 将DataFrame的每一行转换为Thrift结果的Row
-
Spark 将DataFrame的 DataFrame的行列信息转换为 Hive的元数据信息。
5:关闭并释放链接
03
—
设计原理与代码修改
步骤1: 一点查询客户端根据SQL生成UUID并保存对应关系:
-
我们根据Hive JDBC接口,封装了自己的查询客户端,并生成唯一的查询ID
-
QueryID 样例:
-
20181014_101745_57dbf79bc1e1f27f82911b00b91ddcde
-
客户端通过 jdbc:hive2://IP:Port/dw? shark.sparksql.queryid=$QueryID 这样的连接来访问后端的ThriftServer。
步骤2: 服务器接受SQL和QueryID,将Spark的JobID设置为QueryID:
图4: 如何实现HiveJDBC客户端传递UUID到服务器端
-
Spark ThriftServer 建立连接请求的时候,根据Url中的参数信息,HiveSession在建立链接的时候解析 JDBC:hive2 url参数 并且将参数 “shark.sparksql.queryid“ , 以Key-Value 的形式存放于 HiveConf
-
在OperationManager中,当创建ExecuteStatementOperation时候,读取所有HiveConf中的配置信息,写入到SparkContxt的配置中。
-
在 执行 ExecuteStatementOperation 时,取上下文 “shark.sparksql.quer yid" 对应的值,并将其设置为JobID, 此ID在SparkUI中可以查询到
-
statementId=sqlContext.getConf("shark.sparksql.queryid", statementId)
修改后的SparkSQL执行页面中JobID已经生效
步骤3: 客户端读取 Spark RESTful API 获取查询进度:
我们采用Spark的Restful API来获取每一个查询ID的进展情况
Spark RESTful API的访问地址为
http://HOST:4041/api/v1/applications
-
Step1:查询当前Spark的ApplicationID:http://HOST:4041/api/v1/applications
-
Step2:查询当前Spark的所有Job列表:http://HOST:4041/api/v1/applications/application_id/jobs application_id为 Step_1中 取到的application_id
-
Step3: 根据QueryID,可以获取到当前QueryID下所有任务数(numTasks),已经完成的任务数(numCompletedTasks),跳过的Task数(numSkippedTasks)
-
计算公式为:完成任务数+跳过任务数)/(总任务数)
图6: Spark 的Restful API获取任务状态
步骤4:效果展示
通过以上对SparkSQL的改进,我们的Shark 一点大数据查询系统可以支持SparkSQL的进度查询。
图7: 蓝色为SparkSQL 查询进度条 当前进度9%
另外我们通过SQL语法分析工具、LeaderLatch(ZK Leader选择)等技术实现了SQL格式化、高可用的Presto等,使得我们的Shark查询引擎达到了生产系统的要求。
图8: 一点大数据查询系统与Redash、Zeppelin对比分析
04
—
未来规划
1: SparkSQL ThriftServer的分布式化:
-
智能的客户端根据查询类型、数据分区大小提交至不同的SparkSQL ThriftServer。
-
对数据取样、小查询等建立单独的SparkSQL查询后端服务,提高查询的响应速度对大数据量、高内存占用查询分配大内存的Executor保障查询的稳定性。
-
统一的Web客户端维护查询ID和后端SparkThriftServer的关系,支持失败重试、同时提交等功能。
2 : 用户无感知的 智能查询引擎选择
-
综合利用Presto、SparkSQL、Hive等查询系统。短查询优先Presto,长查询优先Hive
-
统一查询语法,消除Presto语法和Hive语法直接的不同。
-
Presto和Hive的UDF不同,需要建立统一的UDF管理系统实现语法和代码的统一管理。
以上所述就是小编给大家介绍的《一点资讯 SparkSQL 查询引擎实践》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- 火山引擎 Redis 云原生实践
- 微博大数据即席查询 (OLAP) 引擎实践
- 实时计算引擎在贝壳的应用与实践
- 最佳实践 | OceanBase事务引擎的技术创新
- 美团无人车引擎在仿真中的实践
- 字节跳动在 RocksDB 存储引擎上的改进实践
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。