Apache Flink On Yarn 模式高可用 (HA) 集群部署

栏目: 编程工具 · 发布时间: 6年前

Apache Flink On Yarn 模式高可用 (HA) 集群部署

微信公众号: 深广大数据Club

关注可了解更多大数据相关的资讯。问题或建议,请公众号留言;

如果你觉得深广大数据Club对你有帮助,欢迎转发朋友圈

本文介绍如何部署Apache Flink On YARN(也就是如何在YARN上运行Flink作业),采用HDP 2.6.5以及Apache Flink 1.7.2。

Yarn在Hadoop的生态系统中担任了资源管理和任务调度的角色,可以更好对集群资源进行调度和控制。

此处不对HDP安装做讲述,需要安装HDP的可以通过HDP官网安装指南进行安装。

官方文档QuickStart中包含两种Flink启动方式:

  • 启动一个YARN session(Start a long-running Flink cluster on YARN)

  • 直接在YARN上提交运行Flink作业(Run a Flink job on YARN)。

    在讲解运行方式之前,我们先来讲解Flink基于HDP之上的On Yarn安装。

安装

下载Apache Flink安装包

从Apache Flink官网下载地址(http://flink.apache.org/downloads.html)下载对应版本的安装包并解压

curl -O <flink_hadoop2_download_url>
tar xvzf flink-1.8-SNAPSHOT-bin-hadoop2.tgz

与Hadoop集成

Flink On Yarn模式需要用户配置与Hadoop集群,设置HADOOP_CONF_DIR以及HADOOP_CLASSPATH。

将如下代码添加到 ~/.bash_profile 配置文件中

$ vi ~/.bash_profile

export HADOOP_CONF_DIR="/etc/hadoop/conf"
export HADOOP_CLASSPATH="/usr/hdp/current/hadoop-client/*:/usr/hdp/current/hadoop-client/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/usr/hdp/current/hadoop-yarn-client/*:/usr/hdp/current/h配置yarn启动前环境变量
oop-yarn-client/lib/*"

source .bash_profile文件引入环境变量并检查变量是否设置正确

source ~/.bash_profile 
echo $HADOOP_CONFIG_DIR
echo $HADOOP_CLASSPATH

配置

yarn-session.sh配置

由于HDP是运行Hadoop任务以及访问HDFS都是使用hdfs用户,我们需要在yarn启动前指定HADOOP_USER_NAME变量,flink才不会因为权限问题而无法启动。

$ vi /usr/local/flink-1.3.3/bin/yarn-session.sh

#!/usr/bin/env bash
...
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`

# get Flink config
. "$bin"/config.sh

if [ "$FLINK_IDENT_STRING" = "" ]; then
        FLINK_IDENT_STRING="$USER"
fi

export HADOOP_USER_NAME=hdfs

JVM_ARGS="$JVM_ARGS -Xmx512m"

CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS`

log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-yarn-session-$HOSTNAME.log
log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-yarn-session.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback-yarn.xml"

export FLINK_CONF_DIR

$JAVA_RUN $JVM_ARGS -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.yarn.cli.FlinkYarnSessionCli -j "$FLINK_LIB_DIR"/flink-dist*.jar "$@"

注意: HADOOP_USER_NAME 参数必须在JAVA_RUN之前配置,否则程序运行之后无法读取到该环境变量

conf/flink-conf.yaml(HA配置)

要启动HA群集,需要在conf/flink-conf.yaml添加以下配置:

高可用性模式(必需):必须在conf/flink-conf.yaml中将高可用模式设置为zookeeper才能启用高可用模式。 或者,此选项可以设置为Flink应该用于创建HighAvailabilityServices实例的工厂类的FQN。

high-availability: zookeeper

ZooKeeper quorum(必需):ZooKeeper quorum是ZooKeeper服务器的复制组,它提供分布式协调服务。

high-availability.zookeeper.quorum: address1:2181[,...],addressX:2181

每个addressX:port指的是一个ZooKeeper服务器,Flink可以在给定的地址和端口访问它。

Zookeeper root(推荐):ZooKeeper root节点,在该节点下放置所有集群节点。

high-availability.zookeeper.path.root: /flink

Zookeeper Cluster-id(推荐):cluster-id ZooKeeper节点,在该节点下放置集群的所有必需的协调数据。

high-availability.cluster-id: /default_ns # important: customize per cluster

存储目录(必需):JobManager元数据保存在文件系统storageDir中,只有一个指向该状态的指针存储在ZooKeeper中。

high-availability.storageDir: hdfs:///flink/recovery

storageDir存储JobManager故障恢复所需的所有元数据。

配置主服务器和ZooKeeper quorum后,您可以像往常一样使用提供的集群启动脚本。他们将启动HA群集。请记住,调用脚本时必须运行ZooKeeper quorum,并确保为要启动的每个HA群集配置单独的ZooKeeper根路径。

除HA配置外,还需要配置最大尝试次数conf/flink-conf.yaml:

yarn.application-attempts: 10

这意味着在Yarn应用程序失败之前,应用程序可以重新启动9次(9次重试+ 1次初始尝试)。

由于我们是基于HDP创建的Hadoop集群,已有现成的zookeeper集群,所以这里我们使用现有的zookeeper进行HA配置,配置如下:

high-availability: zookeeper
high-availability.zookeeper.quorum: flink-dc-01:2181,flink-dc-02:2181,flink-dc-03:2181
high-availability.zookeeper.path.root: /flink
high-availability.storageDir: hdfs://ns1/flink/recovery
yarn.application-attempts: 10

配置hadoop yarn-site.xml

配置Application最大的尝试次数

<property>
  <name>yarn.resourcemanager.am.max-attempts</name>
  <value>4</value>
  <description>
    The maximum number of application master execution attempts.
  </description>
</property>

当前YARN版本的默认值为2(表示允许单个JobManager失败)。

java.lang.IllegalAccessError:

tried to access method问题

hdp平台需要去掉uber shaded hadoop的包,同时添加mapreduce的包到yarn应用classpath,否则会出现如下问题:

Exception in thread "main" java.lang.IllegalAccessError: tried to access method org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider.getProxyInternal()Ljava/lang/Object; from class org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider

删除/重命名

rm -f /root/flink-1.7.1/lib/flink-shaded-hadoop2-uber-1.7.1.jar

添加mapreduce的包到yarn应用classpath

进入ambari界面,service->yarn->config->advanced->Advanced yarn-site->yarn.application.classpath添加

/usr/hdp/current/hadoop-mapreduce-client/*,/usr/hdp/current/hadoop-mapreduce-client/lib/*

修改后,需要重启yarn相关组件,ambari界面会有指示如何重启,一键搞定.

Flink日志配置

Flink默认包含两种配置方式:log4j以及logback

不配置的情况下运行flink集群或者运行flink job会提示建议移除其中一种。

org.apache.flink.yarn.AbstractYarnClusterDescriptor           - The configuration directory ('/root/flink-1.7.1/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them.

直接移除或者重命名都可行。

例如:

$ mv logback.xml logback.xml_bak

示例配置:

vi /usr/local/flink-1.3.3/conf/log4j.properties
log4j.appender.file.append=true
log4j.appender.file.MaxFileSize=100M  #最大文件大小
log4j.appender.file.MaxBackupIndex=10  # 最大备份索引大小

启动Flink

本节主要介绍Flink的两种启动方式。

启动一个长期运行的Flink集群

启动一个长期运行的flink集群通过yarn-session.sh执行部署。

yarn-session.sh使用指南

$ ./bin/yarn-session.sh
Usage:
   Required
     -n,--container <arg>   Number of YARN container to allocate (=Number of Task Managers)
   Optional
     -D <property=value>             use value for given property
     -d,--detached                   If present, runs the job in detached mode
     -h,--help                       Help for the Yarn session CLI.
     -id,--applicationId <arg>       Attach to running YARN session
     -j,--jar <arg>                  Path to Flink jar file
     -jm,--jobManagerMemory <arg>    Memory for JobManager Container with optional unit (default: MB)
     -m,--jobmanager <arg>           Address of the JobManager (master) to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration.
     -n,--container <arg>            Number of YARN container to allocate (=Number of Task Managers)
     -nl,--nodeLabel <arg>           Specify YARN node label for the YARN application
     -nm,--name <arg>                Set a custom name for the application on YARN
     -q,--query                      Display available YARN resources (memory, cores)
     -qu,--queue <arg>               Specify YARN queue.
     -s,--slots <arg>                Number of slots per TaskManager
     -sae,--shutdownOnAttachedExit   If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly, e.g., in response to a user interrupt, such
                                     as typing Ctrl + C.
     -st,--streaming                 Start Flink in streaming mode
     -t,--ship <arg>                 Ship files in the specified directory (t for transfer)
     -tm,--taskManagerMemory <arg>   Memory per TaskManager Container with optional unit (default: MB)
     -yd,--yarndetached              If present, runs the job in detached mode (deprecated; use non-YARN specific option instead)
     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths for high availability mode

主要参数讲解:

1、-n 指定TaskManager数量

2、-jm 指定JobManager使用内存

3、-m 指定JobManager地址

4、-tm 指定TaskManager使用内存

5、-D 指定动态参数

6、-d 客户端分离,指定后YarnSession部署到yarn之后,客户端会自行关闭。

7、-j 指定执行jar包

部署一个长期运行的Flink on Yarn实例

bin/yarn-session.sh -n 8 -s 5 -jm 2048 -tm 4096 -nm pinpoint-flink-job

实例说明:

  • 8个TaskManager

  • 每个TaskManager5个slot

  • 每个TaskManager内存4g,

  • 指定application名称为pinpoint-flink-job

注意:部署长期运行的flink on yarn实例后,在flink web上看到的TaskManager以及Slots都为0。只有在提交任务的时候,才会依据分配资源给对应的任务执行。

Apache Flink On Yarn 模式高可用 (HA) 集群部署

提交Job到长期运行的flink on yarn实例上

执行任务提交命令:

$ bin/flink run ./examples/batch/WordCount.jar --input hdfs://xdata2/tmp/LICENSE-2.0.txt --output hdfs://xdata2/tmp/wordcount_result.txt

指定输入文件:hdfs://xdata2/tmp/LICENSE-2.0.txt

指定输出文件:hdfs://xdata2/tmp/wordcount_result.txt

命令运行日志如下:

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/root/flink-1.7.1/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/root/flink-1.7.1/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/hdp/2.6.2.0-205/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2019-01-24 16:05:26,059 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - Found Yarn properties file under /tmp/.yarn-properties-root.
2019-01-24 16:05:26,059 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - Found Yarn properties file under /tmp/.yarn-properties-root.
2019-01-24 16:05:26,358 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - YARN properties set default parallelism to 40
2019-01-24 16:05:26,358 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - YARN properties set default parallelism to 40
YARN properties set default parallelism to 40
2019-01-24 16:05:26,618 INFO  org.apache.hadoop.yarn.client.AHSProxy                        - Connecting to Application History server at vigor-dc-38/192.168.2.38:10200
2019-01-24 16:05:26,628 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-01-24 16:05:26,628 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-01-24 16:05:26,638 INFO  org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider  - Looking for the active RM in [rm1, rm2]...
2019-01-24 16:05:26,773 INFO  org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider  - Found active RM [rm1]
2019-01-24 16:05:26,779 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Found application JobManager host name 'vigor-dc-41' and port '39925' from supplied application id 'application_1548213441093_0011'
2019-01-24 16:05:27,186 WARN  org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory       - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
Starting execution of program
Program execution finished
Job with JobID 7ab3cf90748c8d05c7aa2e7cbce85730 has finished.
Job Runtime: 8979 ms

提交后可以在Flink web页面上看到提交的任务信息及执行情况。

Apache Flink On Yarn 模式高可用 (HA) 集群部署

检查任务结果

使用hadoop命令查询执行结果信息

[root@vigor-dc-38 flink-1.7.1]# hadoop fs -cat /tmp/wordcount_result.txt
...
above 1
acceptance 1
accepting 3
act 1
acting 1
acts 1
add 2
addendum 1
additional 5
additions 1
advised 1
against 2
agree 1
agreed 3
agreement 1
all 3
...

在Yarn上运行单个Flink任务

若你想在Yarn上启动Flink用于单独任务执行,可以直接通过 bin/flink run 的方式来实现。

示例:

./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar

Yarn会话的命令行选项也可以用于./bin/flink。使用y或yarn(对于长参数选项)作为前缀。

命令执行后,yarn会为任务单独启动一个flink on yarn实例,用于运行flink任务,在flink web界面上可以看到该任务。

Apache Flink On Yarn 模式高可用 (HA) 集群部署

查看后段执行结果:

Printing result to stdout. Use --output to specify output path.
(a,5)
(action,1)
(after,1)
(against,1)
(all,2)
(and,12)
(arms,1)
(arrows,1)
...

总结

Flink on Yarn两种部署方式可以根据自身的需求自行选择。可选择单独一种,也可以两种结合使用。

重要任务建议单独运行一个实例,其他的任务可以使用长时间运行方式,将多个任务部署到上面,不用到时候资源也会得到释放。

Standalone模式在后续的文章补上。

参考链接

https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/hadoop.html

https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/deployment/yarn_setup.html

https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/jobmanager_high_availability.html

关注公众号

Apache Flink On Yarn 模式高可用 (HA) 集群部署

Apache Flink On Yarn 模式高可用 (HA) 集群部署


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

XML Hacks

XML Hacks

Michael Fitzgerald / O'Reilly Media, Inc. / 2004-07-27 / USD 24.95

Developers and system administrators alike are uncovering the true power of XML, the Extensible Markup Language that enables data to be sent over the Internet from one computer platform to another or ......一起来看看 《XML Hacks》 这本书的介绍吧!

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试