微信公众号: 深广大数据Club
关注可了解更多大数据相关的资讯。问题或建议,请公众号留言;
如果你觉得深广大数据Club对你有帮助,欢迎转发朋友圈
本文介绍如何部署Apache Flink On YARN(也就是如何在YARN上运行Flink作业),采用HDP 2.6.5以及Apache Flink 1.7.2。
Yarn在Hadoop的生态系统中担任了资源管理和任务调度的角色,可以更好对集群资源进行调度和控制。
此处不对HDP安装做讲述,需要安装HDP的可以通过HDP官网安装指南进行安装。
官方文档QuickStart中包含两种Flink启动方式:
-
启动一个YARN session(Start a long-running Flink cluster on YARN)
-
直接在YARN上提交运行Flink作业(Run a Flink job on YARN)。
在讲解运行方式之前,我们先来讲解Flink基于HDP之上的On Yarn安装。
安装
下载Apache Flink安装包
从Apache Flink官网下载地址(http://flink.apache.org/downloads.html)下载对应版本的安装包并解压
curl -O <flink_hadoop2_download_url> tar xvzf flink-1.8-SNAPSHOT-bin-hadoop2.tgz
与Hadoop集成
Flink On Yarn模式需要用户配置与Hadoop集群,设置HADOOP_CONF_DIR以及HADOOP_CLASSPATH。
将如下代码添加到 ~/.bash_profile 配置文件中
$ vi ~/.bash_profile export HADOOP_CONF_DIR="/etc/hadoop/conf" export HADOOP_CLASSPATH="/usr/hdp/current/hadoop-client/*:/usr/hdp/current/hadoop-client/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/usr/hdp/current/hadoop-yarn-client/*:/usr/hdp/current/h配置yarn启动前环境变量 oop-yarn-client/lib/*"
source .bash_profile文件引入环境变量并检查变量是否设置正确
source ~/.bash_profile echo $HADOOP_CONFIG_DIR echo $HADOOP_CLASSPATH
配置
yarn-session.sh配置
由于HDP是运行Hadoop任务以及访问HDFS都是使用hdfs用户,我们需要在yarn启动前指定HADOOP_USER_NAME变量,flink才不会因为权限问题而无法启动。
$ vi /usr/local/flink-1.3.3/bin/yarn-session.sh
#!/usr/bin/env bash
...
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
# get Flink config
. "$bin"/config.sh
if [ "$FLINK_IDENT_STRING" = "" ]; then
FLINK_IDENT_STRING="$USER"
fi
export HADOOP_USER_NAME=hdfs
JVM_ARGS="$JVM_ARGS -Xmx512m"
CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS`
log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-yarn-session-$HOSTNAME.log
log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-yarn-session.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback-yarn.xml"
export FLINK_CONF_DIR
$JAVA_RUN $JVM_ARGS -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.yarn.cli.FlinkYarnSessionCli -j "$FLINK_LIB_DIR"/flink-dist*.jar "$@"
注意: HADOOP_USER_NAME 参数必须在JAVA_RUN之前配置,否则程序运行之后无法读取到该环境变量
conf/flink-conf.yaml(HA配置)
要启动HA群集,需要在conf/flink-conf.yaml添加以下配置:
高可用性模式(必需):必须在conf/flink-conf.yaml中将高可用模式设置为zookeeper才能启用高可用模式。 或者,此选项可以设置为Flink应该用于创建HighAvailabilityServices实例的工厂类的FQN。
high-availability: zookeeper
ZooKeeper quorum(必需):ZooKeeper quorum是ZooKeeper服务器的复制组,它提供分布式协调服务。
high-availability.zookeeper.quorum: address1:2181[,...],addressX:2181
每个addressX:port指的是一个ZooKeeper服务器,Flink可以在给定的地址和端口访问它。
Zookeeper root(推荐):ZooKeeper root节点,在该节点下放置所有集群节点。
high-availability.zookeeper.path.root: /flink
Zookeeper Cluster-id(推荐):cluster-id ZooKeeper节点,在该节点下放置集群的所有必需的协调数据。
high-availability.cluster-id: /default_ns # important: customize per cluster
存储目录(必需):JobManager元数据保存在文件系统storageDir中,只有一个指向该状态的指针存储在ZooKeeper中。
high-availability.storageDir: hdfs:///flink/recovery
storageDir存储JobManager故障恢复所需的所有元数据。
配置主服务器和ZooKeeper quorum后,您可以像往常一样使用提供的集群启动脚本。他们将启动HA群集。请记住,调用脚本时必须运行ZooKeeper quorum,并确保为要启动的每个HA群集配置单独的ZooKeeper根路径。
除HA配置外,还需要配置最大尝试次数conf/flink-conf.yaml:
yarn.application-attempts: 10
这意味着在Yarn应用程序失败之前,应用程序可以重新启动9次(9次重试+ 1次初始尝试)。
由于我们是基于HDP创建的Hadoop集群,已有现成的zookeeper集群,所以这里我们使用现有的zookeeper进行HA配置,配置如下:
high-availability: zookeeper high-availability.zookeeper.quorum: flink-dc-01:2181,flink-dc-02:2181,flink-dc-03:2181 high-availability.zookeeper.path.root: /flink high-availability.storageDir: hdfs://ns1/flink/recovery yarn.application-attempts: 10
配置hadoop yarn-site.xml
配置Application最大的尝试次数
<property>
<name>yarn.resourcemanager.am.max-attempts</name>
<value>4</value>
<description>
The maximum number of application master execution attempts.
</description>
</property>
当前YARN版本的默认值为2(表示允许单个JobManager失败)。
java.lang.IllegalAccessError:
tried to access method问题
hdp平台需要去掉uber shaded hadoop的包,同时添加mapreduce的包到yarn应用classpath,否则会出现如下问题:
Exception in thread "main" java.lang.IllegalAccessError: tried to access method org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider.getProxyInternal()Ljava/lang/Object; from class org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider
删除/重命名
rm -f /root/flink-1.7.1/lib/flink-shaded-hadoop2-uber-1.7.1.jar
添加mapreduce的包到yarn应用classpath
进入ambari界面,service->yarn->config->advanced->Advanced yarn-site->yarn.application.classpath添加
/usr/hdp/current/hadoop-mapreduce-client/*,/usr/hdp/current/hadoop-mapreduce-client/lib/*
修改后,需要重启yarn相关组件,ambari界面会有指示如何重启,一键搞定.
Flink日志配置
Flink默认包含两种配置方式:log4j以及logback
不配置的情况下运行flink集群或者运行flink job会提示建议移除其中一种。
org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration directory ('/root/flink-1.7.1/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them.
直接移除或者重命名都可行。
例如:
$ mv logback.xml logback.xml_bak
示例配置:
vi /usr/local/flink-1.3.3/conf/log4j.properties log4j.appender.file.append=true log4j.appender.file.MaxFileSize=100M #最大文件大小 log4j.appender.file.MaxBackupIndex=10 # 最大备份索引大小
启动Flink
本节主要介绍Flink的两种启动方式。
启动一个长期运行的Flink集群
启动一个长期运行的flink集群通过yarn-session.sh执行部署。
yarn-session.sh使用指南
$ ./bin/yarn-session.sh
Usage:
Required
-n,--container <arg> Number of YARN container to allocate (=Number of Task Managers)
Optional
-D <property=value> use value for given property
-d,--detached If present, runs the job in detached mode
-h,--help Help for the Yarn session CLI.
-id,--applicationId <arg> Attach to running YARN session
-j,--jar <arg> Path to Flink jar file
-jm,--jobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB)
-m,--jobmanager <arg> Address of the JobManager (master) to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration.
-n,--container <arg> Number of YARN container to allocate (=Number of Task Managers)
-nl,--nodeLabel <arg> Specify YARN node label for the YARN application
-nm,--name <arg> Set a custom name for the application on YARN
-q,--query Display available YARN resources (memory, cores)
-qu,--queue <arg> Specify YARN queue.
-s,--slots <arg> Number of slots per TaskManager
-sae,--shutdownOnAttachedExit If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly, e.g., in response to a user interrupt, such
as typing Ctrl + C.
-st,--streaming Start Flink in streaming mode
-t,--ship <arg> Ship files in the specified directory (t for transfer)
-tm,--taskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB)
-yd,--yarndetached If present, runs the job in detached mode (deprecated; use non-YARN specific option instead)
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode
主要参数讲解:
1、-n 指定TaskManager数量
2、-jm 指定JobManager使用内存
3、-m 指定JobManager地址
4、-tm 指定TaskManager使用内存
5、-D 指定动态参数
6、-d 客户端分离,指定后YarnSession部署到yarn之后,客户端会自行关闭。
7、-j 指定执行jar包
部署一个长期运行的Flink on Yarn实例
bin/yarn-session.sh -n 8 -s 5 -jm 2048 -tm 4096 -nm pinpoint-flink-job
实例说明:
-
8个TaskManager
-
每个TaskManager5个slot
-
每个TaskManager内存4g,
-
指定application名称为pinpoint-flink-job
注意:部署长期运行的flink on yarn实例后,在flink web上看到的TaskManager以及Slots都为0。只有在提交任务的时候,才会依据分配资源给对应的任务执行。
提交Job到长期运行的flink on yarn实例上
执行任务提交命令:
$ bin/flink run ./examples/batch/WordCount.jar --input hdfs://xdata2/tmp/LICENSE-2.0.txt --output hdfs://xdata2/tmp/wordcount_result.txt
指定输入文件:hdfs://xdata2/tmp/LICENSE-2.0.txt
指定输出文件:hdfs://xdata2/tmp/wordcount_result.txt
命令运行日志如下:
SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/root/flink-1.7.1/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/root/flink-1.7.1/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/usr/hdp/2.6.2.0-205/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 2019-01-24 16:05:26,059 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn properties file under /tmp/.yarn-properties-root. 2019-01-24 16:05:26,059 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn properties file under /tmp/.yarn-properties-root. 2019-01-24 16:05:26,358 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - YARN properties set default parallelism to 40 2019-01-24 16:05:26,358 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - YARN properties set default parallelism to 40 YARN properties set default parallelism to 40 2019-01-24 16:05:26,618 INFO org.apache.hadoop.yarn.client.AHSProxy - Connecting to Application History server at vigor-dc-38/192.168.2.38:10200 2019-01-24 16:05:26,628 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2019-01-24 16:05:26,628 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2019-01-24 16:05:26,638 INFO org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider - Looking for the active RM in [rm1, rm2]... 2019-01-24 16:05:26,773 INFO org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider - Found active RM [rm1] 2019-01-24 16:05:26,779 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Found application JobManager host name 'vigor-dc-41' and port '39925' from supplied application id 'application_1548213441093_0011' 2019-01-24 16:05:27,186 WARN org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. Starting execution of program Program execution finished Job with JobID 7ab3cf90748c8d05c7aa2e7cbce85730 has finished. Job Runtime: 8979 ms
提交后可以在Flink web页面上看到提交的任务信息及执行情况。
检查任务结果
使用hadoop命令查询执行结果信息
[root@vigor-dc-38 flink-1.7.1]# hadoop fs -cat /tmp/wordcount_result.txt ... above 1 acceptance 1 accepting 3 act 1 acting 1 acts 1 add 2 addendum 1 additional 5 additions 1 advised 1 against 2 agree 1 agreed 3 agreement 1 all 3 ...
在Yarn上运行单个Flink任务
若你想在Yarn上启动Flink用于单独任务执行,可以直接通过 bin/flink run 的方式来实现。
示例:
./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar
Yarn会话的命令行选项也可以用于./bin/flink。使用y或yarn(对于长参数选项)作为前缀。
命令执行后,yarn会为任务单独启动一个flink on yarn实例,用于运行flink任务,在flink web界面上可以看到该任务。
查看后段执行结果:
Printing result to stdout. Use --output to specify output path. (a,5) (action,1) (after,1) (against,1) (all,2) (and,12) (arms,1) (arrows,1) ...
总结
Flink on Yarn两种部署方式可以根据自身的需求自行选择。可选择单独一种,也可以两种结合使用。
重要任务建议单独运行一个实例,其他的任务可以使用长时间运行方式,将多个任务部署到上面,不用到时候资源也会得到释放。
Standalone模式在后续的文章补上。
参考链接
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/hadoop.html
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/deployment/yarn_setup.html
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/jobmanager_high_availability.html
关注公众号
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- docker部署rabbitmq集群
- 部署Ceph集群--jluocc
- 部署高可用Kubernetes集群
- Eureka使用及集群部署
- 使用Docker部署RabbitMQ集群
- GreenPlum 5.10.0 集群部署
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
XML Hacks
Michael Fitzgerald / O'Reilly Media, Inc. / 2004-07-27 / USD 24.95
Developers and system administrators alike are uncovering the true power of XML, the Extensible Markup Language that enables data to be sent over the Internet from one computer platform to another or ......一起来看看 《XML Hacks》 这本书的介绍吧!