内容简介:本文主要是讲解flink on yarn的部署过程,然后yarn-session的基本原理,如何启动多个yarn-session的话如何部署应用到指定的yarn-session上,然后是用户jar的管理配置及故障恢复相关的参数。
简介
本文主要是讲解flink on yarn的部署过程,然后yarn-session的基本原理,如何启动多个yarn-session的话如何部署应用到指定的yarn-session上,然后是用户jar的管理配置及故障恢复相关的参数。
交互过程概览
flink on yarn 的整个交互过程图,如下:
要使得 flink 运行于 yarn 上, flink 要能找到 hadoop 配置,因为要连接到 yarn 的 resourcemanager 和 hdfs 。可以使用下面的策略来指定 hadoop 配置:
1.会查看 YARN_CONF_DIR , HADOOP_CONF_DIR 或者 HADOOP_CONF_PATH 是否设置,按照顺序检查的。然后,假如配置了就会从该文件夹下读取配置。
2. 如果上面环境变量都没有配置的话,会使用 HADOOP_HOME 环境变量。对于 hadoop2 的话会查找的配置路径是 $HADOOP_HOME/etc/hadoop; 对于 hadoop1 会查找的路径是 $HADOOP_HOME/conf.
每当常见一个新 flink 的 yarn session 的时候,客户端会首先检查要请求的资源 (containers 和 memory) 是否可用。然后,将包含 flink 相关的 jar 包盒配置上传到 hdfs 。
接下来就是客户端会向 resourcemanager 申请一个 yarn container 用以启动 ApplicationMaster 。由于客户端已经将配置和 jar 文件注册为了 container 的资源,所以 nodemanager 会直接使用这些资源准备好 container (例如,下载文件等)。一旦该过程结束, AM 就被启动了。
Jobmanager 和 AM 运行于同一个 container 。一旦创建成功, AM 就知道了 Jobmanager 的地址。它会生成一个新的 flink 配置文件,这个配置文件是给将要启动的 taskManager 用的,该配置文件也会上传到 hdfs 。另外, AM 的 container 也提供了 Flink 的 web 接口。 Yarn 代码申请的端口都是临时端口,目的是为了让用户并行启动多个 Flink YARN Session 。
最后, AM 开始申请启动 Flink Taskmanager 的 containers ,这些 container 会从 hdfs 上下载 jar 文件和已修改的配置文件。一旦这些步骤完成, flink 就可以接受任务了。
部署启动yarn-session
这个就是 yarn-session 脚本启动的整个过程吧。
默认可以直接执行 bin/yarn-session.sh 默认启动的配置是
{masterMemoryMB=1024, taskManagerMemoryMB=1024,numberTaskManagers=1, slotsPerTaskManager=1}
需要自己自定义配置的话,可以使用来查看参数:
bin/yarn-session.sh –help
比如,我们启动一个 yarn-session 有 10 个 Taskmanager , 8GB 内存, 32 处理 slot ,那么脚本编写应该是这样的:
./bin/yarn-session.sh-n 10 -tm 8192 -s 32
系统默认使用 con/flink-conf.yaml 里的配置。 Flink onyarn 将会覆盖掉几个参数: jobmanager.rpc.address 因为 jobmanager 的在集群的运行位置并不是实现确定的,前面也说到了就是 am 的地址; taskmanager.tmp.dirs 使用 yarn 给定的临时目录 ;parallelism.default 也会被覆盖掉,如果在命令行里指定了 slot 数。
如果你想保证 conf/flink-conf.yaml 仅是全局末日配置,然后针对要启动的每一个 yarn-session.sh 都设置自己的配置,那么可以考虑使用 -D 修饰。
这种情况下启动完成 yarn-session.sh 会在会话窗口结尾
输入 stop 然后回车就会停掉整个应用。
官网说的是 CTRL+C 可以会在杀死 yarn-session.sh 的客户端的时候停止整个应用, max os 下实测,不行的。
假如要启动多个需要多个 shell 会话窗口,那么假如想在启动完 yarn-session.sh 脚本之后使其退出,那么只需要加上 -d 或者 -detached 参数即可。这种情况下,客户端在提交 flink 到集群之后就会退出,这个时候要停止该 yarn-session.sh 必须要用 yarn 的命令了 yarn application –kill <appid>
提交job到yarn-session
启动完 yarn-session 就是提交应用了,那么一个集群中可以存在多个 yarn-session 如何提交到自己的 yarn-session 呢?
其实,前面在讲 yarn-session 启动的时候应该强调一下那个叫做 -nm 的参数,这个就是给你的 yarn-session 起一个名字。比如
bin/yarn-session.sh -nm test
这样根据你的业务需求特点,可以自己起一个名字,然后就可以确定那个 yarn-session 可以用来提交 job 了。
当然,前面我们也说了
运行 bin/flink run –help 可以产看 flink 提交到 yarn 的相关参数其中有一个叫做
然后就可以提交任务了
./bin/flink run./examples/batch/WordCount.jar --input /input/test.txt --output/output/result.txt
假如只启动了一个 yarn-session 的话,那么就是他会找到默认的,否则的话就用 -m 参数指定了。
jobmanager 的地址也可以从下面页面查询。
用户依赖与classlpath
用户依赖管理还是有一定的注意事项的,默认情况下当单个job在运行的时 候flink会将用户jar包含进系统chasspath内部。该行为也可以通过 yarn.per-job-cluster.include-user-jar参数进行控制。
当将该参数设置为 DISABLED,flink会将jar放入到用户classpath里面(这里要强调一下,前面说的是系统classpath,而这里是用户classpath)。
用户jar的在classpath的位置顺序是由该参数的下面几个值决定的:
1).ORDER:(默认)按照字典顺序将jar添加到系统classpath里。
2).FIRST:将jar添加到系统classpath的开始位置。
3).LAST:将jar添加到系统classpath里的结束位置。
故障恢复
Flink的yarn客户端有一些配置可以控制在containers失败的情况下应该怎么做。可以在conf/flink-conf.yaml或者启动YARN session以-D形式指定。
yarn.reallocate-failed: 默认值是true,该参数控制flink是否会重新申请失败的taskmanager的container。
yarn.maximum-failed-containers: 在整个yarn-session挂掉之前,ApplicationMaster最大接受失败containers的数目。默认是最初请求的taskmanager数(-n)
yarn.application-attempts: yarn的applicationMaster失败后尝试的次数,如果此值设置为1,默认值,则当AM失败时,整个yarn session就失败了,所以该值可以设置为一个较大的值。
推荐阅读:
2019与近550位球友一起进步~
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 圣思园《精通Spring Boot/Cloud》与《精通Java并发》课程现已宣布
- Charles 从入门到精通
- MAT 入门到精通(一)
- Git 从入门到精通
- Webpack入门到精通(1)
- 爬虫入门到精通-网页的下载
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。