作者简介:马阳阳 达达集团数据平台高级开发工程师,负责达达集团计算引擎相关的维护和开发工作
本文主要介绍了达达集团使用基于开源的Flink Stream SQL开发的Dada Flink SQL进行实时计算任务SQL化过程中的实践经验
01
背景
时间回到2018年,在数据平台和数据团队的共同努力下,我们已经有了完整的离线计算流程,完善的离线数仓模型,也上线了很多的数据产品和大量的数据报表。随着业务的发展,我们也逐渐面临着越来越多的实时计算方面的需求。随着Flink在国内的逐渐流行,实时计算也越来越多地进入我们的视野。当时,Flink的SQL功能还不完善,大量数据开发需要的功能无法使用SQL表达。因此,我们的选择和很多公司的选择类似,通过对Flink的框架和API进行封装,降低我们的数据开发人员进行实时任务开发的难度。针对这些需求我们计划通过一些封装,使得数据开发同学无需开发 Java 或者Scala代码,专注于业务逻辑的开发。由于开发资源有限,我们倾向于通过引进一些开源的框架并进行定制性的开发来完成这个任务。通过一些调研,我们锁定了袋鼠云的Flink Stream SQL(以下简称FSL)和Uber的AthenaX。对比后,FSL的丰富的插件、开发的活跃度和支持的相对完善对于我们更有吸引力。因此,我们引进了袋鼠云的FSL,并基于FSL开发了达达的SQL计算引擎Dada Flink SQL(以下简称DFL),并以此进行实时计算任务的SQL化。
02
架构
首先介绍一下DFL的架构。DFL中的主要组件为launcher、core、source插件、sink插件、Flink Siddhi插件以及side插件,其中Flink Siddhi为我们根据开源的Flink Siddhi接入的基于Siddhi的规则引擎,后面我们会有专门的文章介绍Flink Siddhi相关的内容和我们做的封装。launcher负责加载必要的source/side/sink插件,并将Flink program提交到Flink集群,支持session cluster模式和single job模式。core模块负责解析SQL语句,生成SQLTree,并根据解析的source、sink、Flink Siddhi和side内容加载相应的插件,生成必要的组件并注册进Flink TableEnvironment。之后,根据SQL是否使用了维表JOIN的功能 ,会选择直接调用TableEnvironment.sqlUpdate()或者进行维表JOIN的处理。除维表JOIN之外,根据我们数据开发同学的需求,我们还加入了INTERVAL JOIN的支持。使用流程表示,DFL的整体流程如下图所示。
2.1 Parser
DFL使用Parser来解析SQL语句,解析为相应的数据结构,并放入SqlTree进行管理以便后续使用。Parser定义了良好的接口,易于通过增加新的实现类来增加对新的SQL语法的支持。Parser的接口定义如下:
其中match用于判断一个具体的Parser的实现能否实现对给定的SQL语句的解析,verifySyntax为我们新增加的接口功能,用于验证给定SQL的语法是否正确,并将相关的错误信息放入errorInfo中供调用方使用,parserSql实现具体的SQL语法的解析工作。我们为IParser增加了很多的实现以实现新的功能,例如增加对Flink Siddhi的支持等。
2.2 维表JOIN
DFL中包含两种维表JOIN的实现方式:ALL及SIDE方式。ALL方式会将需要JOIN的数据一次性读取并缓存到Task的内存中,并可以设置定期刷新缓存;SIDE方式则在需要进行JOIN时从相应的数据源中读取相应的数据,并根据设置决定是否将读取到的数据缓存在内存中。ALL和SIDE模式相应的抽象类的定义分别为AllReqRow和AsyncReqRow,他们都实现了共同的接口ISideReqRow,ISideReqRow中定义了用于将事实表的数据和维表读取的数据进行JOIN的方法Row fillData(Row input, Object sideInput)。AllReqRow和AsyncReqRow的定义分别如下:
可以看到其中使用了模板方法的设计模式。
AsyncSideReqRow主要提供了初始化LRU缓存,从LRU缓存中获取数据以及从数据源或者LRU缓存中无法找到需要JOIN的数据时的默认处理方法。
03
增加的功能及改进
开发DFL的过程中,根据一些业务相关的需求及简化数据开发人员使用DFL的需要,我们在原生FSL的基础上进行了大量的改进和扩展的工作,下面介绍一些我们在DFL上做的工作。
3.1 Flink HA模式下,SESSION模式提交任务超时
为了Flink任务有较好的容错性,我们为Flink集群配置了基于ZooKeper的HA。出于任务管理和维护的需要,我们的一些Flink任务使用了session模式,在将这些任务迁移到DFL后,发现提交任务时,会报超时的错误。查阅Flink的官方文档也没有发现线索。后面经过我们的探索,发现了在YARN session模式下,配置了HA时,进行任务提交需要指定high-availability.cluster-id。添加了如下代码后,SESSION模式下,任务可以正常提交了。
3.2 Kafka支持使用SQL关键字作为JSON的字段名
当在Flink中使用了SQL关键字作字段名时,即使将字段名用反引号包起来,依然会报如下的错误:
这个是Flink的bug,已经在1.10.1中作了修复,详见这个issue:https://issues.apache.org/jira/browse/FLINK-16526。我们使用的版本为Flink 1.6.2,无法使用这个修复。我们的做法是支持将Kafka中JSON的字段名和引用这个JSON字段的列名作解耦,即在Flink SQL中使用指定的列名引用该JSON字段,而用于JSON解析的还是原始的JSON字段名。具体来说,我们在元数据系统中,支持为Kafka类型的表注册一个可选的sourceName。如果注册了sourceName,Flink Stream SQL将使用sourceName去JSON中解析对应的字段。
3.3 元数据整合
DFL上线后,通过添加必要的功能,使用纯SQL开发已经满足我们的很多实时任务开发的需求。但是在DFL运行一段时间后,我们注意到了管理各种上下游存储的信息给我们的数据开发人员带来的困扰。我们线上使用的存储系统包括了Kafka、HBase、ElasticSearch、 Redis 和MySQL(之后又引入了ClickHouse)。这些数据源基本都是异构的,连接及用户信息各异,而且在不同的任务中使用相同的数据源,每次都需要使用CREATE TABLE
元数据管理系统开发完成后,我们将Flink Stream SQL和元数据管理系统进行了深度集成。通过引入USE TABLE <> AS <> WITH ()的语法,我们的数据开发人员只需要将数据源在元数据管理系统中进行注册 ,之后在Flink Stream SQL中引用注册后的表就无需再填写任何连接信息,而且如果需要引用所有的字段的话,也无需再填写字段信息。如果不想要引用所有的子段,有两种办法可以做到。第一种方法是在USE TABLE的WITH里面使用columns表达需要引用的字段,第二种方法是在元数据系统里注册一张只包含了要引用的字段的表。
3.4 Redis hash/set数据类型的支持
FSL已经内置了对Redis作为sink table和side table的支持,但是FSL只支持Redis的String类型的数据,而我们的场景会使用到Redis的hash和set类型的数据,因此我们需要添加对Redis这两种数据类型的支持。首先介绍一下将Redis中的数据映射到Flink中的表的方法,在我们的Redis的key中包含了两部分的内容(使用":"分隔),两部分分别为固定的keyPrefix和由一到多个字段的值使用":"拼接的primaryKey,其中keyPrefix模拟表的概念,也方便Redis中存储的内容的管理。对String类型的数据,Redis的key会在上面介绍的key的基础上拼接上字段名称(使用":"作为分隔符),并以字段的值作为该key对应的value写入Redis中;对Hash类型的数据,Redis的完整的key就为上面介绍的key,hash的key则由用户指定的字段的值使用":"拼接而成,类似的,hash的value由用户指定的字段的值拼接而成。除了Redis hash和set数据类型的支持之外,我们还为Redis增加了setnx和hsetnx以及TTL的功能。
3.5 ClickHouse sink的支持
FSL内置了对Kafka、 MySQL 、Redis、Elasticsearch和HBbase等数据源作为目标表的支持,但是我们在使用的过程中也遇到了一些新的数据源作为目标写入端的要求,为此我们开发了新的sink插件来支持这种需求。我们开发和维护的sink插件包括了ClickHouse和HdfsFile。下面以ClickHouse的sink为例介绍一下我们在这方面所做的一些工作。
对于ClickHouse,我们开发了实现了RichSinkFunction和CheckpointedFunction的ClickhouseSink。通过实现CheckpointedFunction并在snapshotState()方法中将数据刷写到ClickHouse来确保数据不会丢失。为了处理不同的输入数据类型,我们提供接口ClickhouseMapper
不同于通常情况下由用户提供sink表的schema的方式,我们通过执行DESC