内容简介:Supervisor 节点可以理解为单机任务调度器,它负责监听 nimbus 节点的任务资源分配,启动相应的 worker 进程执行 nimbus 分配给当前节点的任务,同时监测 worker 的运行状态,一旦发现有 worker 运行异常,就会杀死该 worker 进程,并将原先分配给 worker 的任务交还给 nimbus 节点进行重新分配。Supervisor 节点的启动过程位于 Supervisor 类中,main 方法的实现比较简单,主要就是创建了一个 Supervisor 类对象,并调用实例
Supervisor 节点可以理解为单机任务调度器,它负责监听 nimbus 节点的任务资源分配,启动相应的 worker 进程执行 nimbus 分配给当前节点的任务,同时监测 worker 的运行状态,一旦发现有 worker 运行异常,就会杀死该 worker 进程,并将原先分配给 worker 的任务交还给 nimbus 节点进行重新分配。
Supervisor 节点的启动过程位于 Supervisor 类中,main 方法的实现比较简单,主要就是创建了一个 Supervisor 类对象,并调用实例方法 Supervisor#run
,该方法的实现如下:
public void run() { try { /* * 解析配置文件: * 1. 解析 default.yaml * 2. 解析 storm.yaml * 3. 解析 -Dstorm.options 指定的命令行参数 * 4. 替换所有配置项中的 JSTORM_HOME 占位符 */ Map<Object, Object> conf = Utils.readStormConfig(); // 确保当前为集群运行模式 StormConfig.validate_distributed_mode(conf); // 创建进程文件: ${storm.local.dir}/supervisor/pids/${pid} this.createPid(conf); // 创建并启动 supervisor SupervisorManger supervisorManager = this.mkSupervisor(conf, null); JStormUtils.redirectOutput("/dev/null"); // 注册 SupervisorManger,当 JVM 进程停止时执行 shutdown 逻辑 this.initShutdownHook(supervisorManager); // 循环监测 shutdown 方法是否执行完毕 while (!supervisorManager.isFinishShutdown()) { try { Thread.sleep(1000); } catch (InterruptedException ignored) { } } } // 省略 catch 代码块 }
整个方法的逻辑比较清晰(如代码注释),核心实现位于 Supervisor#mkSupervisor
方法中,该方法主要用于创建和启动 supervisor 节点,基本执行流程如下:
SyncSupervisorEvent#run()
Supervisor 节点在启动时首先会在本地创建并清空临时目录(路径: supervisor/tmp
),Supervisor 从 nimbus 节点下载下来的文件会临时存放在这里,包括 stormcode.cer、stormconf.cer、stormjar.jar,以及 lib 目录下面的文件等,经过简单处理之后会将其复制到 stormdist/${topology_id}
本地目录中,supervisor 本地文件说明如下:
+ ${supervisor_local_dir} | ---- + supervisor | ---- | ---- + stormdist | ---- | ---- | ---- + ${topology_id} | ---- | ---- | ---- | ---- + resources: 指定 topology 程序包 resources 目录下面的所有文件 | ---- | ---- | ---- | ---- + stormjar.jar: 包含指定 topology 所有代码的 jar 文件 | ---- | ---- | ---- | ---- + stormcode.ser: 包含指定 topology 对象的序列化文件 | ---- | ---- | ---- | ---- + stormconf.ser: 包含指定 topology 的配置信息文件 | ---- | ---- + localstate: 本地状态信息 | ---- | ---- + tmp: 临时目录,从 nimbus 下载的文件的临时存储目录,简单处理之后复制到 stormdist/${topology_id} | ---- | ---- | ---- + ${uuid} | ---- | ---- | ---- | ---- + stormjar.jar: 从 nimbus 节点下载下来的 jar 文件 | ---- | ---- | ---- + ${topology_id} | ---- | ---- | ---- | ---- + stormjar.jar: 包含指定 topology 所有代码的 jar 文件(从 inbox 目录复制过来) | ---- | ---- | ---- | ---- + stormcode.ser: 包含指定 topology 对象的序列化文件 | ---- | ---- | ---- | ---- + stormconf.ser: 包含指定 topology 的配置信息文件 | ---- + workers | ---- | ---- + ${worker_id} | ---- | ---- | ---- + pids | ---- | ---- | ---- | ---- + ${pid}: 指定 worker 进程 ID | ---- | ---- | ---- + heartbeats | ---- | ---- | ---- | ---- + ${worker_id}: 指定 worker 心跳信息(心跳时间、worker 的进程 ID)
接下来 supervisor 会创建 StormClusterState 对象,用于操作 ZK 集群,同时还会创建一个 WorkerReportError 类对象,用于上报 worker 的运行错误数据到 ZK,该类仅包含一个实例方法 report,用于执行上报逻辑。然后 supervisor 节点会创建一个 LocalState 对象用于存储节点的状态信息,这是一个简单、低效的键值存储数据库,每一次操作都会落盘,在这里对应的落盘目录是 supervisor/localstate
。Supervisor 的 ID(UUID 字符串) 就存储在该数据库中,supervisor 启动时会先尝试从本地状态信息对象中获取 ID 值,如果不存在的话就会创建一个新的 UUID 字符串作为 ID。
Supervisor 节点在启动的过程中会初始化心跳机制,间隔指定时间将当前节点的相关信息上报给 ZK(路径: supervisors/${supervisor_id}
),包含当前 supervisor 节点的主机名、ID、最近一次上报时间、截止上次上报节点的运行时间,以及 worker 端口列表信息。相关信息的初始化在 Heartbeat 类对象实例化时进行设置,期间会依据当前机器 CPU 核心数和物理内存大小计算允许的 worker 端口数目,并默认从 6800 端口号开始分配 worker 端口。Supervisor 节点会启动一个线程,默认每间隔 60 秒调用 Heartbeat#update
方法同步心跳信息到 ZK,该方法的实现如下:
public void update() { // 更新本次上报时间为当前时间(单位:秒) supervisorInfo.setTimeSecs(TimeUtils.current_time_secs()); // 更新截止目前节点的运行时间(单位:秒) supervisorInfo.setUptimeSecs(TimeUtils.current_time_secs() - startTime); // 依据具体配置和资源占用,调整端口号列表 this.updateSupervisorInfo(); try { // 将 supervisor 信息写入 ZK:supervisors/${supervisor_id} stormClusterState.supervisor_heartbeat(supervisorId, supervisorInfo); } catch (Exception e) { LOG.error("Failed to update SupervisorInfo to ZK", e); } }
具体过程如代码注释,下面是一个实际的心跳信息示例:
{ "hostName": "10.38.164.192", "supervisorId": "980bbcfd-5438-4e25-aee9-bf411304a446", "timeSecs": 1533373753, "uptimeSecs": 2879598, "workerPorts": [ 6912, 6900, 6901, 6902, 6903, 6904, 6905, 6906, 6907, 6908, 6909, 6910, 6911 ] }
下面来重点看一下 supervisor 节点领取分配给当前节点的任务并启动执行的过程。该过程的实现代码块如下:
/* * 5. 启动并定期执行 SyncSupervisorEvent#run() 方法(默认间隔 10 秒),从 nimbus 节点领取分配给当前节点的任务并启动执行 */ ConcurrentHashMap<String, String> workerThreadPids = new ConcurrentHashMap<>(); SyncProcessEvent syncProcessEvent = new SyncProcessEvent( supervisorId, conf, localState, workerThreadPids, sharedContext, workerReportError, stormClusterState); EventManagerImp syncSupEventManager = new EventManagerImp(); AsyncLoopThread syncSupEventThread = new AsyncLoopThread(syncSupEventManager); threads.add(syncSupEventThread); SyncSupervisorEvent syncSupervisorEvent = new SyncSupervisorEvent( supervisorId, conf, syncSupEventManager, stormClusterState, localState, syncProcessEvent, hb); // ${supervisor.monitor.frequency.secs},默认为 10 秒 int syncFrequency = JStormUtils.parseInt(conf.get(Config.SUPERVISOR_MONITOR_FREQUENCY_SECS)); EventManagerPusher syncSupervisorPusher = new EventManagerPusher(syncSupEventManager, syncSupervisorEvent, syncFrequency); /* * 每间隔一段时间(默认为 10 秒)调用 EventManagerPusher#run(), * 本质上是调用 EventManagerImp#add(RunnableCallback) 将 syncSupervisorEvent 记录到自己的阻塞队列中, * 同时 EventManagerImp 也会循环消费阻塞队列,取出其中的 syncSupervisorEvent,并应用其 run 方法:SyncSupervisorEvent#run() */ AsyncLoopThread syncSupervisorThread = new AsyncLoopThread(syncSupervisorPusher); threads.add(syncSupervisorThread);
要理解该过程的运行机制,我们应该倒着来看相应的源码实现,首先看一下代码块的倒数第二行:
AsyncLoopThread syncSupervisorThread = new AsyncLoopThread(syncSupervisorPusher);
由前面我们对 storm 基本线程模型的分析可以知道,这行代码会启动一个线程去循环执行入参回调的 run 方法,这里也就是 EventManagerPusher#run
方法,该方法的实现比较简单:
@Override public void run() { eventManager.add(event); }
也就是不断的调用 EventManager#add
方法(默认间隔时间为 10 秒),继续往前看我们知道这里的 EventManager 类实际实现是 EventManagerImp,而不断的调用其 add 方法添加的 event 本质上就是一个 SyncSupervisorEvent 实例对象。EventManagerImp 维护了一个阻塞队列来不断记录加入的 event,它本身也是一个回调,再往前看我们就可以看到它在实例化时也被 AsyncLoopThread 启动, EventManagerImp#run
方法实现如下:
public void run() { try { RunnableCallback r = queue.take(); if (r == null) { return; } r.run(); e = r.error(); this.processInc(); } catch (InterruptedException e) { LOG.info("Interrupted when processing event."); } }
该方法就是不断的从阻塞队列中取出相应的回调并应用其 run 方法,也就是不断应用 SyncSupervisorEvent#run
方法。
以上就是步骤五的整体逻辑,简单描述就是定期的往阻塞队列中添加 SyncSupervisorEvent 事件,而线程会循环的消费队列,取出事件并应用事件的 run 方法。下面来深入分析一下 SyncSupervisorEvent 的 run 方法,该方法所做的工作也就是 supervisor 的核心逻辑,主要可以概括为 3 点:
- 从 ZK 上下载任务分配信息,并更新到本地
- 从 nimbus 节点上下载 topology 对应的 jar 和配置文件
- 启动 worker 执行分配给当前 supervisor 的 topology 任务
SyncSupervisorEvent#run
方法的实现比较长,下面按照执行步骤逐步拆分进行分析,首先来看一下从 ZK 上下载任务分配信息,并更新到本地的过程,相应实现如下:
/* * 1.1. 同步所有 topology 的任务分配信息及其版本信息到本地 */ if (healthStatus.isMoreSeriousThan(HealthStatus.ERROR)) { // 检查当前 supervisor 的状态信息,如果是 PANIC 或 ERROR,则清除所有本地的任务分配相关信息 assignmentVersion.clear(); assignments.clear(); LOG.warn("Supervisor machine check status: " + healthStatus + ", killing all workers."); } else { // 同步所有 topology 的任务分配信息及其版本(即更新 assignmentVersion 和 assignments 参数) this.getAllAssignments(assignmentVersion, assignments, syncCallback); } LOG.debug("Get all assignments " + assignments); /* * 1.2. 从 supervisor 本地(supervisor/stormdist/)获取已经下载的所有的 topologyId */ List<String> downloadedTopologyIds = StormConfig.get_supervisor_toplogy_list(conf); LOG.debug("Downloaded storm ids: " + downloadedTopologyIds); /* * 1.3. 获取分配给当前 supervisor 的任务信息:<port, LocalAssignments> */ Map<Integer, LocalAssignment> zkAssignment = this.getLocalAssign(stormClusterState, supervisorId, assignments); /* * 1.4. 更新 supervisor 本地的任务分配信息 */ Map<Integer, LocalAssignment> localAssignment; try { LOG.debug("Writing local assignment " + zkAssignment); localAssignment = (Map<Integer, LocalAssignment>) localState.get(Common.LS_LOCAL_ASSIGNMENTS); // local-assignments if (localAssignment == null) { localAssignment = new HashMap<>(); } localState.put(Common.LS_LOCAL_ASSIGNMENTS, zkAssignment); } catch (IOException e) { LOG.error("put LS_LOCAL_ASSIGNMENTS " + zkAssignment + " to localState failed"); throw e; }
Supervisor 节点在本地会缓存任务分配信息,同时会定期从 ZK 同步最新的任务分配信息到本地,从 ZK 上获取任务分配信息的逻辑位于 SyncSupervisorEvent#getAllAssignments
方法中,方法会从 ZK 的 assignments 路径下获取所有的 topologyId,并与本地比较对应 topology 的任务分配信息版本,如果版本有更新则更新本地缓存的任务分配信息。
接下来 supervisor 会计算所有需要下载的 topology,包括需要更新的、需要重新下载的(之前下载有失败),以及在当前节点进行灰度的,并从 nimbus 节点下载各个 topology 对应的文件,包括 stormjar.jar、stormcode.ser、stormconf.ser,以及 lib 目录下面的依赖文件(如果存在的话),最后从本地删除那些之前下载过但是本次未分配给当前 supervisor 节点的 topology 文件,相应实现如下:
/* * 2.1. 获取所有需要执行下载操作的 topology_id 集合(包括需要更新的、需要重新下载,以及在当前节点灰度的) */ Set<String> updateTopologies = this.getUpdateTopologies(localAssignment, zkAssignment, assignments); Set<String> reDownloadTopologies = this.getNeedReDownloadTopologies(localAssignment); if (reDownloadTopologies != null) { updateTopologies.addAll(reDownloadTopologies); } // 获取灰度发布且指定在当前 supervisor 的 topology:[topology_id, Pair(host, port)] Map<String, Set<Pair<String, Integer>>> upgradeTopologyPorts = this.getUpgradeTopologies(stormClusterState, localAssignment, zkAssignment); if (upgradeTopologyPorts.size() > 0) { LOG.info("upgrade topology ports:{}", upgradeTopologyPorts); updateTopologies.addAll(upgradeTopologyPorts.keySet()); } /* * 2.2. 从 nimbus 下载对应的 topology 任务代码 */ // 从 ZK 上获取分配给当前 supervisor 的 [topologyId, master-code-dir] 信息 Map<String, String> topologyCodes = getTopologyCodeLocations(assignments, supervisorId); // downloadFailedTopologyIds which can't finished download binary from nimbus Set<String> downloadFailedTopologyIds = new HashSet<>(); // 记录所有下载失败的 topologyId // 从 nimbus 下载相应的 topology jar 文件到 supervisor 本地 this.downloadTopology(topologyCodes, downloadedTopologyIds, updateTopologies, assignments, downloadFailedTopologyIds); /* * 2.3. 删除无用的 topology 相关文件(之前下载过,但是本次未分配给当前 supervisor) */ this.removeUselessTopology(topologyCodes, downloadedTopologyIds);
文件下载的逻辑位于 SyncSupervisorEvent#downloadTopology
方法中,文件下载的过程可以概括为以下 5 个步骤:
${storm.local.dir}/supervisor/tmp/${uuid} ${storm.local.dir}/supervisor/stormdist/${topology_id} ${storm.local.dir}/supervisor/stormdist/${topology_id}/timestamp
最后 supervisor 节点会调用 SyncProcessEvent#run
方法杀死状态异常的 worker,同时启动新的 worker 执行分配的任务:
/* * 3. kill bad workers, start new workers */ syncProcesses.run(zkAssignment, downloadFailedTopologyIds, upgradeTopologyPorts); // SyncProcessEvent#run public void run(Map<Integer, LocalAssignment> localAssignments, Set<String> downloadFailedTopologyIds, Map<String, Set<Pair<String, Integer>>> upgradeTopologyPorts) { LOG.debug("Syncing processes, interval (sec): " + TimeUtils.time_delta(lastTime)); lastTime = TimeUtils.current_time_secs(); try { if (localAssignments == null) { localAssignments = new HashMap<>(); } LOG.debug("Assigned tasks: " + localAssignments); /* * 3.1 获取本地所有 worker 的状态信息:Map<worker_id [WorkerHeartbeat, state]> */ Map<String, StateHeartbeat> localWorkerStats; try { // Map[workerId, [worker heartbeat, state]] localWorkerStats = this.getLocalWorkerStats(conf, localState, localAssignments); } catch (Exception e) { LOG.error("Failed to get local worker stats"); throw e; } LOG.debug("Allocated: " + localWorkerStats); /* * 3.2 杀死无用的 worker,并从 localWorkerStats 中移除 */ Map<String, Integer> taskCleanupTimeoutMap; Set<Integer> keepPorts = null; try { // [topology_id, cleanup_second] taskCleanupTimeoutMap = (Map<String, Integer>) localState.get(Common.LS_TASK_CLEANUP_TIMEOUT); // task-cleanup-timeout // 对于一些状态为 disallowed/timedOut 的 worker 进行 kill,并清空相应的数据,同时返可用的 worker port keepPorts = this.killUselessWorkers(localWorkerStats, localAssignments, taskCleanupTimeoutMap); localState.put(Common.LS_TASK_CLEANUP_TIMEOUT, taskCleanupTimeoutMap); } catch (IOException e) { LOG.error("Failed to kill workers", e); } // 3.3 检测 worker 是否正在启动中,清空处于运行态和启动失败 worker 的相应数据(workerIdToStartTimeAndPort 和 portToWorkerId) this.checkNewWorkers(conf); // 3.4 标记需要重新下载的 topology(没有启动成功,同时下载时间已经超过 2 分钟) this.checkNeedUpdateTopologies(localWorkerStats, localAssignments); // 3.5 启动新的 worker 执行 topology 任务 this.startNewWorkers(keepPorts, localAssignments, downloadFailedTopologyIds); // 3.6 启动相应的 worker 执行在当前节点 灰度的 topology 任务 this.restartUpgradingWorkers(localAssignments, localWorkerStats, upgradeTopologyPorts); } catch (Exception e) { LOG.error("Failed to init SyncProcessEvent", e); } }
无论是新任务分配,还是灰度更新,启动 worker 的过程都是调用了 SyncProcessEvent#startWorkers
方法,该方法为每个新的 worker 基于 UUID 创建一个 workerId,以及进程目录 ${storm.local.dir}/workers/${worker_id}/pids
,并调用 SyncProcessEvent#doLaunchWorker
方法启动 worker,同时更新 worker 在本地的相应数据。Worker 进程的启动和运行机制将在下一篇中进行详细说明。
在分析 nimbus 节点启动过程中有一步会启动一个 HTTP 服务,用于接收查询 nimbus 节点本地日志和配置等数据的需求,supervisor 节点的启动过程也同样包含这样一个过程。Supervisor 的 HTTP 服务默认会监听在 7622 端口,用于接收来自 UI 的请求。
最后对于集群模式,如果配置了 supervisor.enable.check=true
则 supervisor 节点在启动时会创建一个线程用于定期检查 supervisor 的运行状况,另外还会启动一个线程用于同步 nimbus 的配置信息到本地节点。最后会创建并返回一个 SupervisorManger 类对象,用于对于当前 supervisor 节点进行管理。
到此,supervisor 节点基本启动完成了,supervisor 会定期基于 ZK 从 nimbus 节点领取任务,然后启动 worker 去执行任务,而启动 worker 的过程我们将在下一篇中进行详细分析。
(本篇完)
转载声明 : 版权所有,商业转载请联系作者,非商业转载请注明出处
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- ReactNative源码解析-初识源码
- Spring源码系列:BeanDefinition源码解析
- Spring源码分析:AOP源码解析(下篇)
- Spring源码分析:AOP源码解析(上篇)
- 注册中心 Eureka 源码解析 —— EndPoint 与 解析器
- 新一代Json解析库Moshi源码解析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。