flink

栏目: 编程工具 · 发布时间: 6年前

内容简介:官方:基于yarn的部署

官方: https://flink.apache.org

逻辑架构

flink

部署架构

基于yarn的部署

flink

HA:job manager单点

Standalone : Zookeeper

对于 Yarn Cluaster 模式来说,Flink 就要依靠 Yarn 本身来对 JobManager 做 HA 了。其实这里完全是 Yarn 的机制。对于 Yarn Cluster 模式来说,JobManager 和 TaskManager 都是被 Yarn 启动在 Yarn 的 Container 中。此时的 JobManager,其实应该称之为 Flink Application Master。也就说它的故障恢复,就完全依靠着 Yarn 中的 ResourceManager(和 MapReduce 的 AppMaster 一样)。

运行架构

flink

client: 当用户提交一个Flink程序时,会首先创建一个Client,该Client首先会对用户提交的Flink程序进行预处理,并提交到Flink集群中处理,所以Client需要从用户提交的Flink程序配置中获取JobManager的地址,并建立到JobManager的连接,将Flink Job提交给JobManager。Flink程序=》JobGraph(Flink Dataflow:多个JobVertex组成的DAG,一个JobGraph包含了一个Flink程序的如下信息:JobID、Job名称、配置信息、一组JobVertex等)。

jobmanager:它负责接收Flink Job,调度组成Job的多个Task的执行。同时,JobManager还负责收集Job的状态信息,并管理Flink集群中从节点TaskManager。JobManager所负责的各项管理功能,它接收到并处理的事件主要包括:

RegisterTaskManager,SubmitJob,CancelJob,UpdateTaskExecutionState,RequestNextInputSplit,JobStatusChanged

worker JVM进程多线程,task slot内存隔离资源单位,一个job的的多讴歌subtask可以共享slot,

计算模式

在 Hadoop 中 Map 和 Reduce 是两个独立调度的 Task,并且都会去占用计算资源。对 Flink 来说 MapReduce 是一个 Pipeline 的 Task,只占用一个计算资源

flink

https://ci.apache.org/project...

以上有4个源4个map3个reduce。在2个TM(每个3个slots)的并行执行方式如下

flink

其中每个可并行的有一个JV和并行和EV.比如source会在一个JV中保含4个EV,ExecutionGraph还包含IntermediateResult和IntermediateResultPartition。前者跟踪IntermediateDataSet的状态,后者是每个分区的状态。

flink

窗口与时间

倾斜窗口(Tumbling Windows,记录没有重叠)、滑动窗口(Slide Windows,记录有重叠)、会话窗口(Session Windows)

基于时间、数据

基于事件时间(事件创建时间)的水位线watermark算法:

flink

当1、watermark时间 >= window_end_time(对于out-of-order以及正常的数据而言)

&& 2、在[window_start_time,window_end_time)中有数据存在 时窗口关闭开始计算

如下图:设定的maxOutOfOrderness=10000L(10s),窗口3s

flink

  • 定期水位线

    用户定义maxOutOfOrderness,两次水位线之间的数据可以用来调用方法生成下一次的时间,再往后推迟maxOutOfOrderness的时间即可。比如

    class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
        val maxOutOfOrderness = 3500L; // 3.5 seconds
        var currentMaxTimestamp: Long;
        override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
            val timestamp = element.getCreationTime()
            currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
            timestamp;
        }
        override def getCurrentWatermark(): Watermark = {
            // return the watermark as current highest timestamp minus the out-of-orderness bound
            new Watermark(currentMaxTimestamp - maxOutOfOrderness);
        }
    }
  • 标点水位线
    数据流中有标记事件才调用extractTimestamp生成新的wartermark
  • 对于map等,是输入流事件时间的最小时间
  • 迟到事件:
    重新激活已经关闭的窗口并重新计算以修正结果。要保存上次结果重新计算,可能每个迟到事件都要触发。
    将迟到事件收集起来另外处理。直接返回收集结果
    将迟到事件视为错误消息并丢弃。

容错

快照: https://arxiv.org/pdf/1506.08...

源于Chandy-Lamport算法 https://lamport.azurewebsites...

https://ci.apache.org/project...

流障碍被注入流源的并行数据流中,它会将快照n的屏障发送到其所有输出流中。一旦接收器操作员(流式DAG的末端)从其所有输入流接收到障碍n,它就向快照n确认检查点协调器。在所有接收器确认快照后,它被视为已完成。一旦完成了快照n,作业将永远不再向源请求来自Sn之前的记录,因为此时这些记录(及其后代记录)将通过整个数据流拓扑。

完全一次调的保证: 对其 (google的millwheel用的每个数据生成唯一编号,dedup去重实现exactly-once(milwheel)) 接收到一个流的n后,这个流的数据暂存,直到其他流也到n,对其发出快照

状态也要存储(转换函数,系统窗口数据缓冲区等等),信息很大,单独state backend存储,可存储在HDFS中(选项有内存,rocksdb等)

flink

内部优化

避免特定情况下Shuffle、 排序 等昂贵操作,中间结果有必要进行缓存

迭代计算

机器学习和图计算使用

https://ci.apache.org/project...

普通迭代+增量迭代

flink


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Web Applications (Hacking Exposed)

Web Applications (Hacking Exposed)

Joel Scambray、Mike Shema / McGraw-Hill Osborne Media / 2002-06-19 / USD 49.99

Get in-depth coverage of Web application platforms and their vulnerabilities, presented the same popular format as the international bestseller, Hacking Exposed. Covering hacking scenarios across diff......一起来看看 《Web Applications (Hacking Exposed)》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

在线进制转换器
在线进制转换器

各进制数互转换器

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具