内容简介:本文以流式数据入库的场景为基础,介绍引入Iceberg作为落地格式和嵌入Flink sink的收益,并分析了现有实现的框架和要点。流式数据入库,是大数据和数据湖的典型应用场景。上游的流式数据,如日志,或增量修改,通过数据总线,经过必要的处理后,汇聚并存储于数据湖,供下游的应用(如报表或者商业智能分析)使用。
导言
本文以流式数据入库的场景为基础,介绍引入Iceberg作为落地格式和嵌入Flink sink的收益,并分析了现有实现的框架和要点。
应用场景
流式数据入库,是大数据和数据湖的典型应用场景。上游的流式数据,如日志,或增量修改,通过数据总线,经过必要的处理后,汇聚并存储于数据湖,供下游的应用(如报表或者商业智能分析)使用。
上述的应用场景通常有如下的痛点,需要整个流程不断的优化:
-
支持流式数据写入,并保证端到端的不重不丢(即exactly-once);
-
尽量减少中间环节,能支持更实时(甚至是T+0)的读取或导出,给下游提供更实时更准确的基础数据;
-
支持ACID,避免脏读等错误发生;
-
支持修改已落地的数据。虽然大数据和数据湖长于处理静态的、或者缓慢变化的数据,即读多写少的场景,但方便的修改功能可以提升用户体验,避免用户因为极少的修改,手动更换整个数据文件,甚至是重新导出;
-
支持修改表结构,如增加或者变更列;而且变更不要引起数据的重新组织。
引入Iceberg作为Flink sink
为了解决上述痛点,我们引入了Iceberg作为数据落地的格式。Iceberg支持ACID事务、修改和删除、独立于计算引擎、支持表结构和分区方式动态变更等特性,很好的满足我们的需求。
同时,为了支持流式数据的写入,我们引入Flink作为流式处理框架,并将Iceberg作为Flink sink。
下文主要介绍Flink Iceberg sink的实现框架和要点。但在这之前,需要先介绍一些实现中用到的Flink基本概念。
Flink基本概念
从Flink的角度如何理解"流"和"批"
有界 | 开始 | 结束 | 处理时机 | 处理要求SLA | 处理顺序是否重要 | |
---|---|---|---|---|---|---|
流 | 否 | 有 | 否 | 实时,尽快 | 低延迟, 不重不丢即exactly-once | 是 |
批 | 是 | 有 | 有 | 可以等到全部收到后再处理 | 高吞吐 | 否 |
Flink使用DataFrame API来统一的处理流和批数据。
Stream, Transformation和Operator
一个Flink程序由 stream
和 transformation
组成:
-
Stream
: Transformation之间的中间结果数据; -
Transformation
:对(一个或多个)输入stream进行操作,输出(一个或多个)结果stream。
当Flink程序执行时,其被映射成 Streaming Dataflow
,由如下的部分组成:
-
Source (operator):接收外部输入给Flink;
-
Transformation (operator):中间对stream做的任何操作;
-
Sink (operator):Flink输出给外部。
下图为Flink官网的示例,展示了一个以Kafka作为输入Source,经过中间两个transformation,最终通过sink输出到Flink之外的过程。
State, Checkpoint and Snapshot
Flink依靠checkpoint和基于snapshot的恢复机制,保证程序state的一致性,实现容错。
Checkpoint是对分布式的数据流,以及所有operator的state,打snapshot的过程。
State
一个operator的state,即它包含的所有用于恢复当前状态的信息,可分为两类:
-
系统state:如operator中对数据的缓存。
-
用户自定义state:和用户逻辑相关,可以利用Flink提供的managed state,如ValueState、ListState,来存储。
State的存储位置,可以分为:
-
Local:内存,或者本地磁盘
-
State backend:远端的持久化存储,如HDFS。
如下图所示:
Checkpoint
Flink做checkpoint的过程如下:
-
Checkpoint coordinator首先发送barrier给source。
-
Source做snapshot,完成后向coordinator确认。
-
Source向下游发送barrier。
-
下游operator收到所有上游的barrier后,做snapshot,完成后向coordinator确认。
-
继续 往下游发送barrier,直到sink。
-
Sink通知coordinator自己完成checkpoint。
-
Coordinator确认本周期snapshot做完。
如下图所示:
Barrier
Barrier是Flink做分布式snapshot的重要概念。它作为一个系统标记,被插入到数据流中,随真实数据一起,按照数据流的方向,从上游向下游传递。
由于每个barrier唯一对应checkpoint id,所以数据流中的record实际被barrier分组,如下图所示,barrier n和barrier n-1之间的record,属于checkpoint n。
Barrier的作用是在分布式的数据流中,将operator的多个输入流按照checkpoint对齐(align),如下图所示:
Flink Iceberg sink
了解了上述Flink的基本概念,这些概念又是如何被应用和映射到Flink Iceberg sink当中的呢?
总体框架
如图,Flink Iceberg sink有两个主要模块和两个辅助模块组成:
模块 | 类型 | 功能 | 多个并行 |
---|---|---|---|
Writer | StreamOperator | 累积数据,生成DataFile | 是 |
Committer | SinkFunction | 把DataFile填入manifest file,并commit给Iceberg | 否,唯一 |
SinkAppender | 辅助辅助 | 把Writer和Committer接入DataStream | - |
AvroSerializer | 辅助 | 把输入转化为Avro IndexedRecord,输出给writer | - |
实现要点
Writer
-
在当前的实现中,Java的Map<String, Object>作为每条记录,输入给writer。内部逻辑先将其转化为作为中间格式的Avro IndexedRecord,而后通过Iceberg里的Parquet相关API,累积的写入DataFile。
-
使用Avro作为中间格式是一个临时方案,为简化适配,并最大限度的利用现有逻辑。但长期来看,使用中间格式会影响处理效率,社区也在试图通过ISSUE-870来去掉Avro,进而使用Iceberg内建的数据类型作为输入,同时也需要加入一个到Flink内建数据类型的转换器。
-
在做checkpoint的过程中,发送writer自己的barrier到下游的committer之前,关闭单个Parquet文件,构建DataFile,并发送DataFile的信息给下游。
Committer
-
全局唯一的Committer在收到上游所有writer的barrier以后,将收到的DataFile的信息填入manifest file,并使用ListState把manifest file作为用户自定义的state,保存于snapshot中。
-
当checkpoint完成以后,通过merge append将manifest file提交给Iceberg。Iceberg内部通过后续的一系列操作完成commit。最终让新加入的数据对其他的读任务可见。
试用Flink Iceberg sink
社区上https://github.com/apache/incubator-iceberg/pull/856提供了可以试用的原型代码。下载该patch放入master分支,编译并构建即可。如下的程序展示了如何将该sink嵌入到Flink数据流中:
// Configurate catalog
org.apache.hadoop.conf.Configuration hadoopConf =
new org.apache.hadoop.conf.Configuration();
hadoopConf.set(
org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS.varname,
META_STORE_URIS);
hadoopConf.set(
org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE.varname,
META_STORE_WAREHOUSE);
Catalog icebergCatalog = new HiveCatalog(hadoopConf);
// Create Iceberg table
Schema schema = new Schema(
...
);
PartitionSpec partitionSpec = builderFor(schema)...
TableIdentifier tableIdentifier =
TableIdentifier.of(DATABASE_NAME, TABLE_NAME);
// If needed, check the existence of table by loadTable() and drop it
// before creating it
icebergCatalog.createTable(tableIdentifier, schema, partitionSpec);
// Obtain an execution environment
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// Enable checkpointing
env.enableCheckpointing(...);
// Add Source
DataStream<Map<String, Object>> dataStream =
env.addSource(source, typeInformation);
// Configure Ieberg sink
Configuration conf = new Configuration();
conf.setString(
org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE.varname,
META_STORE_URIS);
conf.setString(IcebergConnectorConstant.DATABASE, DATABASE_NAME);
conf.setString(IcebergConnectorConstant.TABLE, TABLE_NAME);
// Append Iceberg sink to data stream
IcebergSinkAppender<Map<String, Object>> appender =
new IcebergSinkAppender<Map<String, Object>>(conf, "test")
.withSerializer(MapAvroSerializer.getInstance())
.withWriterParallelism(1);
appender.append(dataStream);
// Trigger the execution
env.execute("Sink Test");
后续规划
Flink Iceberg sink有很多需要完善的地方,例如:上文中提到的去掉Avro作为中间格式;以及在各种失败的情况下是否仍能保证端到端的exactly-once;按固定时长做checkpoint,在高低峰时生成不同大小的DataFile,是否对后续读不友好等。这些问题都在我们的后续规划中,也会全数贡献给社区。
参考
[1] Iceberg官网:https://iceberg.apache.org/
[2] Flink 1.10文档:https://ci.apache.org/projects/flink/flink-docs-release-1.10/
[3] Neflix提供的Flink Iceberg connector原型:https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg
[4] Flink Iceberg sink设计文档:https://docs.google.com/document/d/19M-sP6FlTVm7BV7MM4Om1n_MVo1xCy7GyDl_9ZAjVNQ/edit?usp=sharing
[5] Flink容错机制(checkpoint) https://www.cnblogs.com/starzy/p/11439988.html
腾讯大数据诚招计算、存储、消息中间件、调度、中台等各方向的大数据研发工程师,请私信或联系jerryshao@tencent.com 。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 爬虫入库实战之干死反爬虫
- redis缓存队列+MySQL +php任务脚本定时批量入库
- 如何在django里上传csv文件并进行入库处理
- bp(net core)+easyui+efcore实现仓储管理系统——入库管理之二(三十八)
- HBase场景 | 对比MySQL,一文看透HBase的能力及使用场景
- 容器的应用场景
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
UNIX 时间戳转换
UNIX 时间戳转换
HEX CMYK 转换工具
HEX CMYK 互转工具