Iceberg集成|Iceberg在基于Flink的流式数据入库场景中的应用

栏目: IT技术 · 发布时间: 4年前

内容简介:本文以流式数据入库的场景为基础,介绍引入Iceberg作为落地格式和嵌入Flink sink的收益,并分析了现有实现的框架和要点。流式数据入库,是大数据和数据湖的典型应用场景。上游的流式数据,如日志,或增量修改,通过数据总线,经过必要的处理后,汇聚并存储于数据湖,供下游的应用(如报表或者商业智能分析)使用。

导言

本文以流式数据入库的场景为基础,介绍引入Iceberg作为落地格式和嵌入Flink sink的收益,并分析了现有实现的框架和要点。

应用场景

流式数据入库,是大数据和数据湖的典型应用场景。上游的流式数据,如日志,或增量修改,通过数据总线,经过必要的处理后,汇聚并存储于数据湖,供下游的应用(如报表或者商业智能分析)使用。

Iceberg集成|Iceberg在基于Flink的流式数据入库场景中的应用

上述的应用场景通常有如下的痛点,需要整个流程不断的优化:

  • 支持流式数据写入,并保证端到端的不重不丢(即exactly-once);

  • 尽量减少中间环节,能支持更实时(甚至是T+0)的读取或导出,给下游提供更实时更准确的基础数据;

  • 支持ACID,避免脏读等错误发生;

  • 支持修改已落地的数据。虽然大数据和数据湖长于处理静态的、或者缓慢变化的数据,即读多写少的场景,但方便的修改功能可以提升用户体验,避免用户因为极少的修改,手动更换整个数据文件,甚至是重新导出;

  • 支持修改表结构,如增加或者变更列;而且变更不要引起数据的重新组织。

引入Iceberg作为Flink sink

为了解决上述痛点,我们引入了Iceberg作为数据落地的格式。Iceberg支持ACID事务、修改和删除、独立于计算引擎、支持表结构和分区方式动态变更等特性,很好的满足我们的需求。

同时,为了支持流式数据的写入,我们引入Flink作为流式处理框架,并将Iceberg作为Flink sink。

下文主要介绍Flink Iceberg sink的实现框架和要点。但在这之前,需要先介绍一些实现中用到的Flink基本概念。

Flink基本概念

从Flink的角度如何理解"流"和"批"

有界 开始 结束 处理时机 处理要求SLA 处理顺序是否重要
实时,尽快 低延迟, 不重不丢即exactly-once
可以等到全部收到后再处理 高吞吐

Flink使用DataFrame API来统一的处理流和批数据。

Stream, Transformation和Operator

一个Flink程序由 streamtransformation 组成:

  • Stream : Transformation之间的中间结果数据;
  • Transformation :对(一个或多个)输入stream进行操作,输出(一个或多个)结果stream。

当Flink程序执行时,其被映射成 Streaming Dataflow ,由如下的部分组成:

  • Source (operator):接收外部输入给Flink;

  • Transformation (operator):中间对stream做的任何操作;

  • Sink (operator):Flink输出给外部。

下图为Flink官网的示例,展示了一个以Kafka作为输入Source,经过中间两个transformation,最终通过sink输出到Flink之外的过程。

Iceberg集成|Iceberg在基于Flink的流式数据入库场景中的应用

State, Checkpoint and Snapshot

Flink依靠checkpoint和基于snapshot的恢复机制,保证程序state的一致性,实现容错。

Checkpoint是对分布式的数据流,以及所有operator的state,打snapshot的过程。

State

一个operator的state,即它包含的所有用于恢复当前状态的信息,可分为两类:

  • 系统state:如operator中对数据的缓存。

  • 用户自定义state:和用户逻辑相关,可以利用Flink提供的managed state,如ValueState、ListState,来存储。

State的存储位置,可以分为:

  • Local:内存,或者本地磁盘

  • State backend:远端的持久化存储,如HDFS。

如下图所示:

Iceberg集成|Iceberg在基于Flink的流式数据入库场景中的应用

Checkpoint

Flink做checkpoint的过程如下:

  1. Checkpoint coordinator首先发送barrier给source。

  2. Source做snapshot,完成后向coordinator确认。

  3. Source向下游发送barrier。

  4. 下游operator收到所有上游的barrier后,做snapshot,完成后向coordinator确认。

  5. 继续 往下游发送barrier,直到sink。

  6. Sink通知coordinator自己完成checkpoint。

  7. Coordinator确认本周期snapshot做完。

如下图所示:

Iceberg集成|Iceberg在基于Flink的流式数据入库场景中的应用

Barrier

Barrier是Flink做分布式snapshot的重要概念。它作为一个系统标记,被插入到数据流中,随真实数据一起,按照数据流的方向,从上游向下游传递。

由于每个barrier唯一对应checkpoint id,所以数据流中的record实际被barrier分组,如下图所示,barrier n和barrier n-1之间的record,属于checkpoint n。

Iceberg集成|Iceberg在基于Flink的流式数据入库场景中的应用

Barrier的作用是在分布式的数据流中,将operator的多个输入流按照checkpoint对齐(align),如下图所示:

Iceberg集成|Iceberg在基于Flink的流式数据入库场景中的应用

Flink Iceberg sink

了解了上述Flink的基本概念,这些概念又是如何被应用和映射到Flink Iceberg sink当中的呢?

总体框架

Iceberg集成|Iceberg在基于Flink的流式数据入库场景中的应用

如图,Flink Iceberg sink有两个主要模块和两个辅助模块组成:

模块 类型 功能 多个并行
Writer StreamOperator 累积数据,生成DataFile
Committer SinkFunction 把DataFile填入manifest file,并commit给Iceberg 否,唯一
SinkAppender 辅助辅助 把Writer和Committer接入DataStream -
AvroSerializer 辅助 把输入转化为Avro IndexedRecord,输出给writer -

实现要点

Writer

  1. 在当前的实现中,Java的Map<String, Object>作为每条记录,输入给writer。内部逻辑先将其转化为作为中间格式的Avro IndexedRecord,而后通过Iceberg里的Parquet相关API,累积的写入DataFile。

  2. 使用Avro作为中间格式是一个临时方案,为简化适配,并最大限度的利用现有逻辑。但长期来看,使用中间格式会影响处理效率,社区也在试图通过ISSUE-870来去掉Avro,进而使用Iceberg内建的数据类型作为输入,同时也需要加入一个到Flink内建数据类型的转换器。

  3. 在做checkpoint的过程中,发送writer自己的barrier到下游的committer之前,关闭单个Parquet文件,构建DataFile,并发送DataFile的信息给下游。

Committer

  1. 全局唯一的Committer在收到上游所有writer的barrier以后,将收到的DataFile的信息填入manifest file,并使用ListState把manifest file作为用户自定义的state,保存于snapshot中。

  2. 当checkpoint完成以后,通过merge append将manifest file提交给Iceberg。Iceberg内部通过后续的一系列操作完成commit。最终让新加入的数据对其他的读任务可见。

试用Flink Iceberg sink

社区上https://github.com/apache/incubator-iceberg/pull/856提供了可以试用的原型代码。下载该patch放入master分支,编译并构建即可。如下的程序展示了如何将该sink嵌入到Flink数据流中:

// Configurate catalog

org.apache.hadoop.conf.Configuration hadoopConf =

new org.apache.hadoop.conf.Configuration();

hadoopConf.set(

org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS.varname,

META_STORE_URIS);

hadoopConf.set(

org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE.varname,

META_STORE_WAREHOUSE);

Catalog icebergCatalog = new HiveCatalog(hadoopConf);

// Create Iceberg table

Schema schema = new Schema(

...

);

PartitionSpec partitionSpec = builderFor(schema)...

TableIdentifier tableIdentifier =

TableIdentifier.of(DATABASE_NAME, TABLE_NAME);

// If needed, check the existence of table by loadTable() and drop it

// before creating it

icebergCatalog.createTable(tableIdentifier, schema, partitionSpec);

// Obtain an execution environment

StreamExecutionEnvironment env =

StreamExecutionEnvironment.getExecutionEnvironment();

// Enable checkpointing

env.enableCheckpointing(...);

// Add Source

DataStream<Map<String, Object>> dataStream =

env.addSource(source, typeInformation);

// Configure Ieberg sink

Configuration conf = new Configuration();

conf.setString(

org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE.varname,

META_STORE_URIS);

conf.setString(IcebergConnectorConstant.DATABASE, DATABASE_NAME);

conf.setString(IcebergConnectorConstant.TABLE, TABLE_NAME);

// Append Iceberg sink to data stream

IcebergSinkAppender<Map<String, Object>> appender =

new IcebergSinkAppender<Map<String, Object>>(conf, "test")

.withSerializer(MapAvroSerializer.getInstance())

.withWriterParallelism(1);

appender.append(dataStream);

// Trigger the execution

env.execute("Sink Test");

后续规划

Flink Iceberg sink有很多需要完善的地方,例如:上文中提到的去掉Avro作为中间格式;以及在各种失败的情况下是否仍能保证端到端的exactly-once;按固定时长做checkpoint,在高低峰时生成不同大小的DataFile,是否对后续读不友好等。这些问题都在我们的后续规划中,也会全数贡献给社区。

参考

[1] Iceberg官网:https://iceberg.apache.org/

[2] Flink 1.10文档:https://ci.apache.org/projects/flink/flink-docs-release-1.10/

[3] Neflix提供的Flink Iceberg connector原型:https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg

[4] Flink Iceberg sink设计文档:https://docs.google.com/document/d/19M-sP6FlTVm7BV7MM4Om1n_MVo1xCy7GyDl_9ZAjVNQ/edit?usp=sharing

[5] Flink容错机制(checkpoint) https://www.cnblogs.com/starzy/p/11439988.html

Iceberg集成|Iceberg在基于Flink的流式数据入库场景中的应用

腾讯大数据诚招计算、存储、消息中间件、调度、中台等各方向的大数据研发工程师,请私信或联系jerryshao@tencent.com


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

创京东

创京东

李志刚 / 中信出版社 / 2015-5-1 / CNY 49.80

1998年,刘强东创业,在中关村经销光磁产品。2004年,因为非典,京东偶然之下转向线上销售。2014年,京东市值已超400亿美元,跻身全球前十大互联网公司之列。 这是一个听起来很传奇的创业故事,但只有当事人了解创业维艰。 刚转向电商时,传统企业前景光明,而电商看起来前途未卜,京东如何能毅然转型并坚持到底?资金匮乏的时候,京东靠什么说服投资人?在强大的对手面前,京东靠什么反超并一路领先......一起来看看 《创京东》 这本书的介绍吧!

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具