内容简介:Flink
Flink 运行时主要角色有两个: JobManager 和 TaskManager ,无论是 standalone 集群, on yarn 都是要启动这两个角色。有点类似于 MRv1 的架构了, JobManager 主要是负责接受客户端的 job ,调度 job ,协调 checkpoint 等。 TaskManager 执行具体的 Task 。 TaskManager 为了对资源进行隔离和增加允许的 task 数,引入了 slot 的概念,这个 slot 对资源的隔离仅仅是对内存进行隔离,策略是均分,比如 taskmanager 的管理内存是 3GB ,假如有三个 slot ,那么每个 slot 就仅仅有 1GB 内存可用。
根据经验, taskslot 数最佳默认值就是 CPU 核心数。使用超线程,每个 task slot 需要 2 个或更多硬件线程上下文。
Client 这个角色主要是为 job 提交做些准备工作,比如构建 jobgraph 提交到 jobmanager ,提交完了可以立即退出,当然也可以用 client 来监控进度。
Jobmanager 和 TaskManager 之间通信类似于 Spark 的早期版本,采用的是 actor 系统。
根据以上描述,绘制出运行架构图就是下图:
Task 到底是什么玩意?
讲到这可以先回顾一下 Spark 了,主要三个概念:
1. Shuffle
Spark 任务 job 中 shuffle 个数决定着 stage 个数。
2. 分区
Spark 算子中 RDD 的分区数决定者 stage 任务的并行度。
3. 分区传递
复杂的入 union , join 等暂不提。简单的调用链如下:
rdd.map-->filter-->reducebykey-->map。
例子中假设 rdd 有 6 个分区, map 到 fliter 的分区数传递是不变, filter 到 redcuebykey 分区就变了, reducebykey 的分区有个默认计算公式,星球里讲过了,假设我们在使用 reducebykey 的时候传入了一个分区数 12 。
分区数, map 是 6 , filter 也是 6 , reducebykey 后面的 map 就是 12 。
override def getPartitions: Array[Partition] =firstParent[T].partitions
map 这类转换完全继承了父 RDD 的分区器和分区数,默认无法人为设置并行度,只有在 shuffle 的时候,我们才可以传入并行度。
上述讲解主要是想带着大家搞明白,以下几个概念:
-
Flink 的并行度由什么决定的?
-
Flink 的 task 是什么?
1. Flink 的并行度由什么决定的?
这个很简单, Flink 每个算子都可以设置并行度,然后就是也可以设置全局并行度。
Api 的设置
.map(new RollingAdditionMapper()).setParallelism(10)
全局配置在 flink-conf.yaml 文件中, parallelism.default ,默认是 1 :
2. Flink 的 task 是什么?
按理说应该是每个算子的一个并行度实例就是一个 subtask- 在这里为了区分暂时叫做 substask 。那么,带来很多问题,由于 flink 的 taskmanager 运行 task 的时候是每个 task 采用一个单独的线程,这就会带来很多线程切换开销,进而影响吞吐量。
为了减轻这种情况, flink 进行了优化,也即对 subtask 进行链式操作,链式操作结束之后得到的 task ,再作为一个调度执行单元,放到一个线程里执行。
如下图的, source/map 两个算子进行了链式; keyby/window/apply 有进行了链式, sink 单独的一个。
注释 :图中假设是 source/map 的并行度都是 2 , keyby/window/apply 的并行度也都是 2 , sink 的是 1 ,总共 task 有五个,最终需要五个线程。
按照到这一步的理解,画的执行图应该是这样的:
有些朋友该说了,据我观察实际上并不是这样的呀。。。
这个是实际上是 flink 又一次优化。
默认情况下, flink 允许如果任务是不同的 task 的时候,允许任务共享 slot ,当然,前提是必须在同一个 job 内部。
结果就是,每个 slot 可以执行 job 的一整个 pipeline ,如上图。这样做的好处主要有以下几点:
1. Flink 集群所需的 taskslots 数与 job 中最高的并行度一致。 也就是说我们不需要再去计算一个程序总共会起多少个 task 了。
2. 更容易获得更充分的资源利用 。如果没有 slot 共享,那么非密集型操作 source/flatmap 就会占用同密集型操作 keyAggregation/sink 一样多的资源。如果有 slot 共享,将基线的 2 个并行度增加到 6 个,能充分利用 slot 资源,同时保证每个 TaskManager 能平均分配到重的 subtasks ,比如 keyby/window/apply 操作就会均分到申请的所有 slot 里,这样 slot 的负载就均衡了。
链式的原则,也即是什么情况下才会对task进行链式操作呢?简单梗概一下:
-
上下游的并行度一致
-
下游节点的入度为 1 (也就是说下游节点没有来自其他节点的输入)
-
上下游节点都在同一个 slot group 中(下面会解释 slot group )
-
下游节点的 chain 策略为 ALWAYS (可以与上下游链接, map 、 flatmap 、 filter 等默认是 ALWAYS )
-
上游节点的 chain 策略为 ALWAYS 或 HEAD (只能与下游链接,不能与上游链接, Source 默认是 HEAD )
-
两个节点间数据分区方式是 forward (参考理解数据流的分区)
-
用户没有禁用 chain
推荐阅读:
欢迎点赞,转发,给自己小伙伴们学习的机会。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 代理模式——结合SpringAOP讲解
- 如何结合 Scrum 和 Kanban
- NServiceBus 结合 RabbitMQ 使用教程
- ActiveMQ结合Spring收发消息
- quicklink学习以及结合React
- 业务流程与软件架构的结合
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。