微信公众号: 深广大数据Club
关注可了解更多的大数据相关资讯。问题或建议,请公众号留言;
如果你觉得 深广大数据Club 对你有帮助,欢迎赞赏 [1]
从《Apache Flink本地部署》这篇文章中可以看到,我们启动集群都是通过脚本start-cluster.sh开始执行。
我们的源码解析之路就从flink的bash脚本入手。
start-cluster.sh
bin=`dirname "$0"` bin=`cd "$bin"; pwd` . "$bin"/config.sh # Start the JobManager instance(s) shopt -s nocasematch if [[ $HIGH_AVAILABILITY == "zookeeper" ]]; then # HA Mode readMasters echo "Starting HA cluster with ${#MASTERS[@]} masters." for ((i=0;i<${#MASTERS[@]};++i)); do master=${MASTERS[i]} webuiport=${WEBUIPORTS[i]} if [ ${MASTERS_ALL_LOCALHOST} = true ] ; then "${FLINK_BIN_DIR}"/jobmanager.sh start "${master}" "${webuiport}" else ssh -n $FLINK_SSH_OPTS $master -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/jobmanager.sh\" start ${master} ${webuiport} &" fi done else echo "Starting cluster." # Start single JobManager on this machine "$FLINK_BIN_DIR"/jobmanager.sh start fi shopt -u nocasematch # Start TaskManager instance(s) TMSlaves start
从代码上看,不管是启动HA模式,还是非HA模式,脚本都会调用jobmanager.sh
jobmanager.sh
STARTSTOP=$1 ... ENTRYPOINT=standalonesession ... if [[ $STARTSTOP == "start-foreground" ]]; then exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${args[@]}" else "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${args[@]}" fi
这里的STARTSTOP=
flink-daemon.sh脚本
核心分为以下三个代码块
# Start/stop a Flink daemon. USAGE="Usage: flink-daemon.sh (start|stop|stop-all) (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob) [args]" STARTSTOP=$1 DAEMON=$2 ARGS=("${@:3}") # get remaining arguments as array
从脚本的使用定义来看flink-daemon.sh可以启动taskexecutor|zookeeper|historyserver|standalonesession|standalonejob
这里的DAEMON=
1 , 其 实 就 是 S T A R T S T O P = s t a r t , 所 以 这 里 会 走 f l i n k − d a e m o n . s h 脚 本 这 条 线 。 ¨ K 1 2 K 核 心 分 为 以 下 三 个 代 码 块 ¨ G 2 G 从 脚 本 的 使 用 定 义 来 看 f l i n k − d a e m o n . s h 可 以 启 动 t a s k e x e c u t o r ∣ z o o k e e p e r ∣ h i s t o r y s e r v e r ∣ s t a n d a l o n e s e s s i o n ∣ s t a n d a l o n e j o b 这 里 的 D A E M O N =
2从上一个脚本传递的值可以的值DAEMON=standalonesession
case $DAEMON in (taskexecutor) CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner ;; (zookeeper) CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer ;; (historyserver) CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer ;; (standalonesession) CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint ;; (standalonejob) CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint ;; (*) echo "Unknown daemon '${DAEMON}'. $USAGE." exit 1 ;; esac
根据
case $STARTSTOP in (start) # Rotate log files rotateLogFilesWithPrefix "$FLINK_LOG_DIR" "$FLINK_LOG_PREFIX" # Print a warning if daemons are already running on host if [ -f "$pid" ]; then active=() while IFS='' read -r p || [[ -n "$p" ]]; do kill -0 $p >/dev/null 2>&1 if [ $? -eq 0 ]; then active+=($p) fi done < "${pid}" count="${#active[@]}" if [ ${count} -gt 0 ]; then echo "[INFO] $count instance(s) of $DAEMON are already running on $HOSTNAME." fi fi # Evaluate user options for local variable expansion FLINK_ENV_JAVA_OPTS=$(eval echo ${FLINK_ENV_JAVA_OPTS}) echo "Starting $DAEMON daemon on host $HOSTNAME." $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 200<&- 2>&1 < /dev/null & mypid=$! # Add to pid file if successful start if [[ ${mypid} =~ ${IS_NUMBER} ]] && kill -0 $mypid > /dev/null 2>&1 ; then echo $mypid >> "$pid" else echo "Error starting $DAEMON daemon." exit 1 fi ;; ... esac
case
STARTSTOP in程序块中包含三个功能:start、stop、stop-all。我们这里通过start功能进行启动
StandaloneSessionClusterEntrypoint.java
public class StandaloneSessionClusterEntrypoint extends SessionClusterEntrypoint { public StandaloneSessionClusterEntrypoint(Configuration configuration) { super(configuration); } @Override protected DispatcherResourceManagerComponentFactory<?> createDispatcherResourceManagerComponentFactory(Configuration configuration) { return new SessionDispatcherResourceManagerComponentFactory(StandaloneResourceManagerFactory.INSTANCE); } public static void main(String[] args) { // startup checks and logging EnvironmentInformation.logEnvironmentInfo(LOG, StandaloneSessionClusterEntrypoint.class.getSimpleName(), args); SignalHandler.register(LOG); JvmShutdownSafeguard.installAsShutdownHook(LOG); EntrypointClusterConfiguration entrypointClusterConfiguration = null; final CommandLineParser<EntrypointClusterConfiguration> commandLineParser = new CommandLineParser<>(new EntrypointClusterConfigurationParserFactory()); try { entrypointClusterConfiguration = commandLineParser.parse(args); } catch (FlinkParseException e) { LOG.error("Could not parse command line arguments {}.", args, e); commandLineParser.printHelp(StandaloneSessionClusterEntrypoint.class.getSimpleName()); System.exit(1); } Configuration configuration = loadConfiguration(entrypointClusterConfiguration); StandaloneSessionClusterEntrypoint entrypoint = new StandaloneSessionClusterEntrypoint(configuration); ClusterEntrypoint.runClusterEntrypoint(entrypoint); } }
StandaloneSessionClusterEntrypoint的main方法的执行流程如下:
-
通过commandLineParser对象解析参数信息
-
loadConfiguration加载配置
-
通过配置实例化StandaloneSessionClusterEntrypoint对象
-
最终通过ClusterEntrypoint的runClusterEntrypoint方法运行StandaloneSessionClusterEntrypoint实例
ClusterEntrypoint.java
private void runCluster(Configuration configuration) throws Exception { synchronized (lock) { initializeServices(configuration); // write host information into configuration configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress()); configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort()); final DispatcherResourceManagerComponentFactory<?> dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration); clusterComponent = dispatcherResourceManagerComponentFactory.create( configuration, commonRpcService, haServices, blobServer, heartbeatServices, metricRegistry, archivedExecutionGraphStore, new AkkaQueryServiceRetriever( metricQueryServiceActorSystem, Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT))), this); clusterComponent.getShutDownFuture().whenComplete( (ApplicationStatus applicationStatus, Throwable throwable) -> { if (throwable != null) { shutDownAsync( ApplicationStatus.UNKNOWN, ExceptionUtils.stringifyException(throwable), false); } else { // This is the general shutdown path. If a separate more specific shutdown was // already triggered, this will do nothing shutDownAsync( applicationStatus, null, true); } }); } }
runCluster方法中流程如下:
protected void initializeServices(Configuration configuration) throws Exception { LOG.info("Initializing cluster services."); synchronized (lock) { final String bindAddress = configuration.getString(JobManagerOptions.ADDRESS); final String portRange = getRPCPortRange(configuration); commonRpcService = createRpcService(configuration, bindAddress, portRange); // update the configuration used to create the high availability services configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress()); configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort()); haServices = createHaServices(configuration, commonRpcService.getExecutor()); blobServer = new BlobServer(configuration, haServices.createBlobStore()); blobServer.start(); heartbeatServices = createHeartbeatServices(configuration); metricRegistry = createMetricRegistry(configuration); // TODO: This is a temporary hack until we have ported the MetricQueryService to the new RpcEndpoint // Start actor system for metric query service on any available port metricQueryServiceActorSystem = MetricUtils.startMetricsActorSystem(configuration, bindAddress, LOG); metricRegistry.startQueryService(metricQueryServiceActorSystem, null); archivedExecutionGraphStore = createSerializableExecutionGraphStore(configuration, commonRpcService.getScheduledExecutor()); transientBlobCache = new TransientBlobCache( configuration, new InetSocketAddress( commonRpcService.getAddress(), blobServer.getPort())); } }
-
initializeServices(configuration); 初始化服务
-
创建RPC服务
-
创建HA服务
-
创建blob服务
-
创建心跳服务
-
创建metrice注册
-
创建ActorSystem
-
创建ArchivedExecutionGraphStore
-
创建TransientBlobCache
-
创建dispatcherResourceManagerComponentFactory对象
dispatcherResourceManagerComponentFactory的create方法参数中我们可以看到很多服务相关的信息:
-
configuration,
-
commonRpcService,
-
haServices,
-
blobServer,
-
heartbeatServices,
-
metricRegistry,
-
archivedExecutionGraphStore,
-
AkkaQueryServiceRetriever
这块主要是Flink UI上所要展示的内容相关信息。
总结
到这里我们就了解了Apache Flink是如何通过start-cluster.sh脚本执行到最后的程序运行启动的全流程。从下一篇文章开始会根据ScoketWindowWorkCount这个示例,开始讲解任务运行后相关的一些内容。
关注公众号
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 从路由原理出发,深入阅读理解react-router 4.0的源码
- 全局角度出发讨论敏捷
- 2019 已落幕,2020 再出发
- 从朴素解释出发解释leveldb的设计
- 从观察者模式出发,聊聊RxJava
- 【技术分享】从MySQL出发的反击之路
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
高效前端:Web高效编程与优化实践
李银城 著 / 机械工业出版社 / 2018-3-15 / 89.00元
这不是一本单纯讲解前端编程技巧的书,而是一本注重思想提升和内功修炼的书。 全书以问题为导向,精选了前端开发中的34个疑难问题,从分析问题的原因入手,逐步给出解决方案,并分析各种方案的优劣,最后针对每个问题总结出高效编程的最佳实践和各种性能优化的方法。 全书共7章,内容从逻辑上大致可以分为两大类: 第一类,偏向实践,围绕HTML、CSS、JavaScript等传统前端技术,以及PW......一起来看看 《高效前端:Web高效编程与优化实践》 这本书的介绍吧!
CSS 压缩/解压工具
在线压缩/解压 CSS 代码
JSON 在线解析
在线 JSON 格式化工具