微信公众号: 深广大数据Club
关注可了解更多大数据相关的资讯。问题或建议,请公众号留言;
如果你觉得深广大数据Club对你有帮助,欢迎转发朋友圈
本文介绍如何部署Apache Flink On YARN(也就是如何在YARN上运行Flink作业),采用HDP 2.6.5以及Apache Flink 1.7.2。
Yarn在Hadoop的生态系统中担任了资源管理和任务调度的角色,可以更好对集群资源进行调度和控制。
此处不对HDP安装做讲述,需要安装HDP的可以通过HDP官网安装指南进行安装。
官方文档QuickStart中包含两种Flink启动方式:
-
启动一个YARN session(Start a long-running Flink cluster on YARN)
-
直接在YARN上提交运行Flink作业(Run a Flink job on YARN)。
在讲解运行方式之前,我们先来讲解Flink基于HDP之上的On Yarn安装。
安装
下载Apache Flink安装包
从Apache Flink官网下载地址(http://flink.apache.org/downloads.html)下载对应版本的安装包并解压
curl -O <flink_hadoop2_download_url> tar xvzf flink-1.8-SNAPSHOT-bin-hadoop2.tgz
与Hadoop集成
Flink On Yarn模式需要用户配置与Hadoop集群,设置HADOOP_CONF_DIR以及HADOOP_CLASSPATH。
将如下代码添加到 ~/.bash_profile
配置文件中
$ vi ~/.bash_profile export HADOOP_CONF_DIR="/etc/hadoop/conf" export HADOOP_CLASSPATH="/usr/hdp/current/hadoop-client/*:/usr/hdp/current/hadoop-client/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/usr/hdp/current/hadoop-yarn-client/*:/usr/hdp/current/h配置yarn启动前环境变量 oop-yarn-client/lib/*"
source .bash_profile文件引入环境变量并检查变量是否设置正确
source ~/.bash_profile echo $HADOOP_CONFIG_DIR echo $HADOOP_CLASSPATH
配置
yarn-session.sh配置
由于HDP是运行Hadoop任务以及访问HDFS都是使用hdfs用户,我们需要在yarn启动前指定HADOOP_USER_NAME变量,flink才不会因为权限问题而无法启动。
$ vi /usr/local/flink-1.3.3/bin/yarn-session.sh #!/usr/bin/env bash ... bin=`dirname "$0"` bin=`cd "$bin"; pwd` # get Flink config . "$bin"/config.sh if [ "$FLINK_IDENT_STRING" = "" ]; then FLINK_IDENT_STRING="$USER" fi export HADOOP_USER_NAME=hdfs JVM_ARGS="$JVM_ARGS -Xmx512m" CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS` log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-yarn-session-$HOSTNAME.log log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-yarn-session.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback-yarn.xml" export FLINK_CONF_DIR $JAVA_RUN $JVM_ARGS -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.yarn.cli.FlinkYarnSessionCli -j "$FLINK_LIB_DIR"/flink-dist*.jar "$@"
注意: HADOOP_USER_NAME
参数必须在JAVA_RUN之前配置,否则程序运行之后无法读取到该环境变量
conf/flink-conf.yaml(HA配置)
要启动HA群集,需要在conf/flink-conf.yaml添加以下配置:
高可用性模式(必需):必须在conf/flink-conf.yaml中将高可用模式设置为zookeeper才能启用高可用模式。 或者,此选项可以设置为Flink应该用于创建HighAvailabilityServices实例的工厂类的FQN。
high-availability: zookeeper
ZooKeeper quorum(必需):ZooKeeper quorum是ZooKeeper服务器的复制组,它提供分布式协调服务。
high-availability.zookeeper.quorum: address1:2181[,...],addressX:2181
每个addressX:port指的是一个ZooKeeper服务器,Flink可以在给定的地址和端口访问它。
Zookeeper root(推荐):ZooKeeper root节点,在该节点下放置所有集群节点。
high-availability.zookeeper.path.root: /flink
Zookeeper Cluster-id(推荐):cluster-id ZooKeeper节点,在该节点下放置集群的所有必需的协调数据。
high-availability.cluster-id: /default_ns # important: customize per cluster
存储目录(必需):JobManager元数据保存在文件系统storageDir中,只有一个指向该状态的指针存储在ZooKeeper中。
high-availability.storageDir: hdfs:///flink/recovery
storageDir存储JobManager故障恢复所需的所有元数据。
配置主服务器和ZooKeeper quorum后,您可以像往常一样使用提供的集群启动脚本。他们将启动HA群集。请记住,调用脚本时必须运行ZooKeeper quorum,并确保为要启动的每个HA群集配置单独的ZooKeeper根路径。
除HA配置外,还需要配置最大尝试次数conf/flink-conf.yaml:
yarn.application-attempts: 10
这意味着在Yarn应用程序失败之前,应用程序可以重新启动9次(9次重试+ 1次初始尝试)。
由于我们是基于HDP创建的Hadoop集群,已有现成的zookeeper集群,所以这里我们使用现有的zookeeper进行HA配置,配置如下:
high-availability: zookeeper high-availability.zookeeper.quorum: flink-dc-01:2181,flink-dc-02:2181,flink-dc-03:2181 high-availability.zookeeper.path.root: /flink high-availability.storageDir: hdfs://ns1/flink/recovery yarn.application-attempts: 10
配置hadoop yarn-site.xml
配置Application最大的尝试次数
<property> <name>yarn.resourcemanager.am.max-attempts</name> <value>4</value> <description> The maximum number of application master execution attempts. </description> </property>
当前YARN版本的默认值为2(表示允许单个JobManager失败)。
java.lang.IllegalAccessError:
tried to access method问题
hdp平台需要去掉uber shaded hadoop的包,同时添加mapreduce的包到yarn应用classpath,否则会出现如下问题:
Exception in thread "main" java.lang.IllegalAccessError: tried to access method org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider.getProxyInternal()Ljava/lang/Object; from class org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider
删除/重命名
rm -f /root/flink-1.7.1/lib/flink-shaded-hadoop2-uber-1.7.1.jar
添加mapreduce的包到yarn应用classpath
进入ambari界面,service->yarn->config->advanced->Advanced yarn-site->yarn.application.classpath添加
/usr/hdp/current/hadoop-mapreduce-client/*,/usr/hdp/current/hadoop-mapreduce-client/lib/*
修改后,需要重启yarn相关组件,ambari界面会有指示如何重启,一键搞定.
Flink日志配置
Flink默认包含两种配置方式:log4j以及logback
不配置的情况下运行flink集群或者运行flink job会提示建议移除其中一种。
org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration directory ('/root/flink-1.7.1/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them.
直接移除或者重命名都可行。
例如:
$ mv logback.xml logback.xml_bak
示例配置:
vi /usr/local/flink-1.3.3/conf/log4j.properties log4j.appender.file.append=true log4j.appender.file.MaxFileSize=100M #最大文件大小 log4j.appender.file.MaxBackupIndex=10 # 最大备份索引大小
启动Flink
本节主要介绍Flink的两种启动方式。
启动一个长期运行的Flink集群
启动一个长期运行的flink集群通过yarn-session.sh执行部署。
yarn-session.sh使用指南
$ ./bin/yarn-session.sh Usage: Required -n,--container <arg> Number of YARN container to allocate (=Number of Task Managers) Optional -D <property=value> use value for given property -d,--detached If present, runs the job in detached mode -h,--help Help for the Yarn session CLI. -id,--applicationId <arg> Attach to running YARN session -j,--jar <arg> Path to Flink jar file -jm,--jobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB) -m,--jobmanager <arg> Address of the JobManager (master) to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration. -n,--container <arg> Number of YARN container to allocate (=Number of Task Managers) -nl,--nodeLabel <arg> Specify YARN node label for the YARN application -nm,--name <arg> Set a custom name for the application on YARN -q,--query Display available YARN resources (memory, cores) -qu,--queue <arg> Specify YARN queue. -s,--slots <arg> Number of slots per TaskManager -sae,--shutdownOnAttachedExit If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly, e.g., in response to a user interrupt, such as typing Ctrl + C. -st,--streaming Start Flink in streaming mode -t,--ship <arg> Ship files in the specified directory (t for transfer) -tm,--taskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB) -yd,--yarndetached If present, runs the job in detached mode (deprecated; use non-YARN specific option instead) -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode
主要参数讲解:
1、-n 指定TaskManager数量
2、-jm 指定JobManager使用内存
3、-m 指定JobManager地址
4、-tm 指定TaskManager使用内存
5、-D 指定动态参数
6、-d 客户端分离,指定后YarnSession部署到yarn之后,客户端会自行关闭。
7、-j 指定执行jar包
部署一个长期运行的Flink on Yarn实例
bin/yarn-session.sh -n 8 -s 5 -jm 2048 -tm 4096 -nm pinpoint-flink-job
实例说明:
-
8个TaskManager
-
每个TaskManager5个slot
-
每个TaskManager内存4g,
-
指定application名称为pinpoint-flink-job
注意:部署长期运行的flink on yarn实例后,在flink web上看到的TaskManager以及Slots都为0。只有在提交任务的时候,才会依据分配资源给对应的任务执行。
提交Job到长期运行的flink on yarn实例上
执行任务提交命令:
$ bin/flink run ./examples/batch/WordCount.jar --input hdfs://xdata2/tmp/LICENSE-2.0.txt --output hdfs://xdata2/tmp/wordcount_result.txt
指定输入文件:hdfs://xdata2/tmp/LICENSE-2.0.txt
指定输出文件:hdfs://xdata2/tmp/wordcount_result.txt
命令运行日志如下:
SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/root/flink-1.7.1/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/root/flink-1.7.1/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/usr/hdp/2.6.2.0-205/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 2019-01-24 16:05:26,059 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn properties file under /tmp/.yarn-properties-root. 2019-01-24 16:05:26,059 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn properties file under /tmp/.yarn-properties-root. 2019-01-24 16:05:26,358 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - YARN properties set default parallelism to 40 2019-01-24 16:05:26,358 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - YARN properties set default parallelism to 40 YARN properties set default parallelism to 40 2019-01-24 16:05:26,618 INFO org.apache.hadoop.yarn.client.AHSProxy - Connecting to Application History server at vigor-dc-38/192.168.2.38:10200 2019-01-24 16:05:26,628 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2019-01-24 16:05:26,628 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2019-01-24 16:05:26,638 INFO org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider - Looking for the active RM in [rm1, rm2]... 2019-01-24 16:05:26,773 INFO org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider - Found active RM [rm1] 2019-01-24 16:05:26,779 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Found application JobManager host name 'vigor-dc-41' and port '39925' from supplied application id 'application_1548213441093_0011' 2019-01-24 16:05:27,186 WARN org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. Starting execution of program Program execution finished Job with JobID 7ab3cf90748c8d05c7aa2e7cbce85730 has finished. Job Runtime: 8979 ms
提交后可以在Flink web页面上看到提交的任务信息及执行情况。
检查任务结果
使用hadoop命令查询执行结果信息
[root@vigor-dc-38 flink-1.7.1]# hadoop fs -cat /tmp/wordcount_result.txt ... above 1 acceptance 1 accepting 3 act 1 acting 1 acts 1 add 2 addendum 1 additional 5 additions 1 advised 1 against 2 agree 1 agreed 3 agreement 1 all 3 ...
在Yarn上运行单个Flink任务
若你想在Yarn上启动Flink用于单独任务执行,可以直接通过 bin/flink run
的方式来实现。
示例:
./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar
Yarn会话的命令行选项也可以用于./bin/flink。使用y或yarn(对于长参数选项)作为前缀。
命令执行后,yarn会为任务单独启动一个flink on yarn实例,用于运行flink任务,在flink web界面上可以看到该任务。
查看后段执行结果:
Printing result to stdout. Use --output to specify output path. (a,5) (action,1) (after,1) (against,1) (all,2) (and,12) (arms,1) (arrows,1) ...
总结
Flink on Yarn两种部署方式可以根据自身的需求自行选择。可选择单独一种,也可以两种结合使用。
重要任务建议单独运行一个实例,其他的任务可以使用长时间运行方式,将多个任务部署到上面,不用到时候资源也会得到释放。
Standalone模式在后续的文章补上。
参考链接
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/hadoop.html
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/deployment/yarn_setup.html
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/jobmanager_high_availability.html
关注公众号
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- docker部署rabbitmq集群
- 部署Ceph集群--jluocc
- 部署高可用Kubernetes集群
- Eureka使用及集群部署
- 使用Docker部署RabbitMQ集群
- GreenPlum 5.10.0 集群部署
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。