内容简介:上一篇我们分析了 topology 构建和提交过程在客户端的逻辑,并最终通过Storm 集群的任务提交主要分为三种类型:新任务提交、热部署,以及灰度发布。首先来看灰度发布的情况,当客户端请求灰度发布时,nimbus 节点会检查对应 topology 在服务端的运行情况,只有状态为 ACTIVE 时才允许执行灰度发布。灰度发布的相关实现如下:
上一篇我们分析了 topology 构建和提交过程在客户端的逻辑,并最终通过 submitTopology
方法向 storm 集群的 nimbus 节点提交任务。Nimbus 以 Thrift RPC 服务的方式运行,相应 thrift 接口方法实现位于 ServiceHandler 类中,下面我们从 ServiceHandler#submitTopology
方法切入,分析 nimbus 节点之于客户端提交任务的资源分配过程,该方法包装了 ServiceHandler#submitTopologyWithOpts
方法。
Storm 集群的任务提交主要分为三种类型:新任务提交、热部署,以及灰度发布。 ServiceHandler#submitTopologyWithOpts
方法统一处理这三种情况,但是不管哪种提交方式都会首先验证 topology 名称和配置的合法性,然后基于具体提交类型分而治之。
灰度发布 & 热部署
首先来看灰度发布的情况,当客户端请求灰度发布时,nimbus 节点会检查对应 topology 在服务端的运行情况,只有状态为 ACTIVE 时才允许执行灰度发布。灰度发布的相关实现如下:
// 获取指定 topology 的运行数据 TopologyInfo topologyInfo = this.getTopologyInfo(topologyId); if (topologyInfo == null) { throw new TException("Failed to get topology info"); } // 获取指定的 worker 数目:${topology.upgrade.worker.num} int workerNum = ConfigExtension.getUpgradeWorkerNum(serializedConf); // 获取指定的组件名称:${topology.upgrade.component} String component = ConfigExtension.getUpgradeComponent(serializedConf); // 获取指定的 worker 列表:${topology.upgrade.workers} Set<String> workers = ConfigExtension.getUpgradeWorkers(serializedConf); // 判定 topology master 是不是使用独立的 worker if (!ConfigExtension.isTmSingleWorker(serializedConf, topologyInfo.get_topology().get_numWorkers())) { throw new TException("Gray upgrade requires that topology master to be a single worker, cannot perform the upgrade!"); } // 灰度发布 return this.grayUpgrade(topologyId, uploadedJarLocation, topology, serializedConf, component, workers, workerNum);
对于允许灰度发布的场景,storm 会基于当前提交 topology 的配置首先会尝试获取以下 3 个参数用于挑选 worker 进行发布:
- topology.upgrade.worker.num
- topology.upgrade.component
- topology.upgrade.workers
如果同时指定了多个参数,方法会基于一定的优先级进行决策,具体如下:
-
如果参数
topology.upgrade.workers
不为空则忽略其他参数,挑选指定的 worker 进行发布,需要注意的是这些 worker 发布完之后,这个参数就自动置空 -
否则查看参数
topology.upgrade.component
是否为空,如果不为空还需要查看参数topology.upgrade.worker.num
是否为 0, 如果不为 0 则挑选指定工作组件下topology.upgrade.worker.num
指定数目的 worker 进行发布,否则对这些工作组件下所有 worker 进行发布 -
如果上面两个都为空,则随机挑选
topology.upgrade.worker.num
个 worker 进行发布
灰度发布的具体执行流程位于 ServiceHandler#grayUpgrade
方法中,该方法实现比较冗长,故不在此贴出,下面参考源码和官网文档对发布的过程进行说明:
-
方法首先尝试从 ZK 获取当前 topology 对应的基本信息(路径:
/topology/${topology_id}
)和灰度发布信息(路径:/gray_upgrade/${topology_id}
),以及任务分配信息(路径:assignments/${topology_id}
)。 - 如果存在灰度发布信息,则判断对应的灰度状态(已过期 / 已完成 / 进行中),如果正在灰度中则拒绝本次灰度请求,否则(包含不存在灰度发布信息的情况)继续执行灰度发布。
-
方法利用 GrayUpgradeConfig 对象封装灰度发布信息,并写入到 ZK 的
/gray_upgrade/${topology_id}
路径下,同时设置config.continueUpgrading=true
。 -
Topology Master 有一个线程 GrayUpgradeHandler 会定时读取该节点的配置,检测到有灰度发布配置且
continueUpgrading=true
时,将分配指定数目的 worker,添加到 ZK 的/gray_upgrade/${topology_id}/upgrading_workers
路径下,并设置continueUpgrading=false
(防止自动进行后续的灰度发布)。 - SyncSupervisorEvent 会定时检查每个拓扑的 upgrading_workers 节点,一旦有数据就和自身的 IP 和端口列表进行对比,如果有属于该 supervisor 节点的灰度发布就下载最新的 storm-code 和 storm-jar,然后重启 worker,同时将 worker 添加到 ZK 的 upgraded_workers 节点下。
- GrayUpgradeHandler 检测 ZK,如果 upgraded_workers 的 worker 数大于等于当前总 worker 数减 1(topology master 组件占用),则认为此次灰度发布已经完成,删除 ZK 上的灰度发布配置、upgrading_workers,以及 upgraded_workers。
如果只想升级部分 worker 或特定组件,可以用 complete_upgrade 强制完成升级。灰度发布过程中使用单独的 upgrading_workers 和 upgraded_workers 的设计主要是为了避免同步问题。如果将这些信息写在 GrayUpgradeConfig 类中可能会涉及到多个 supervisor 节点同时更新 workers 的情况,而使用单独的节点则只需要在这个节点下添加和删除子节点,不会有同步问题。
热部署和灰度发布从形式上来看都是对运行中 topology 的更新替换操作,但是对于 nimbus 来说,在处理上却是两条不同的分支,实际上热部署与新任务提交在处理过程上更加形似,毕竟热部署的过程就是杀死处于运行中的 topology 然后执行新任务提交的过程,所以接下来我们主要分析新任务的调度细节。
新任务提交
对于新提交的任务来说,storm 会为该 topology 执行一些准备和验证工作,并在 ZK 上创建相应的结点记录该 topology 的元数据和任务分配信息,然后为该 topology 生成一个事件提交给任务分配队列等待 nimbus 节点为当前 topology 制定运行方案,并在执行成功后发送相应的通知事件。相关实现如下:
// 对当前 topology 配置进行规范化,并附加一些必要的配置 Map<Object, Object> stormConf = NimbusUtils.normalizeConf(conf, serializedConf, topology); LOG.info("Normalized configuration:" + stormConf); // 合并集群配置和拓扑配置 Map<Object, Object> totalStormConf = new HashMap<>(conf); totalStormConf.putAll(stormConf); // 确定 topology 中各个组件的并行度,保证不超过当前 topology 允许的最大值 StormTopology normalizedTopology = NimbusUtils.normalizeTopology(stormConf, topology, true); /* * 验证 topology 的基本结构信息: * 1. 验证 topologyName,组件 ID 是否合法 * 2. 验证是否存在缺失 input 声明的 spout * 3. 验证 woker 和 acker 数目参数配置 */ Common.validate_basic(normalizedTopology, totalStormConf, topologyId); StormClusterState stormClusterState = data.getStormClusterState(); // 创建 /local-dir/nimbus/${topology_id}/xxxx 文件,并将元数据同步到 ZK this.setupStormCode(topologyId, uploadedJarLocation, stormConf, normalizedTopology, false); // wait for blob replication before activate topology this.waitForDesiredCodeReplication(conf, topologyId); // generate TaskInfo for every bolt or spout in ZK : /ZK/tasks/topoologyId/xxx // 为当前 topology 在 ZK 上生成 task 信息:/tasks/${topology_id} this.setupZkTaskInfo(conf, topologyId, stormClusterState); // mkdir topology error directory : taskerrors/${topology_id} String path = Cluster.taskerror_storm_root(topologyId); stormClusterState.mkdir(path); String grayUpgradeBasePath = Cluster.gray_upgrade_base_path(topologyId); // gray_upgrade/${topology_id} stormClusterState.mkdir(grayUpgradeBasePath); // gray_upgrade/${topology_id}/upgraded_workers stormClusterState.mkdir(Cluster.gray_upgrade_upgraded_workers_path(topologyId)); // gray_upgrade/${topology_id}/upgrading_workers stormClusterState.mkdir(Cluster.gray_upgrade_upgrading_workers_path(topologyId)); // 为当前 topology 执行任务分配 LOG.info("Submit topology {} with conf {}", topologyName, serializedConf); this.makeAssignment(topologyName, topologyId, options.get_initial_status()); // push start event after startup double metricsSampleRate = ConfigExtension.getMetricSampleRate(stormConf); // ${topology.metric.sample.rate},默认是 0.05 StartTopologyEvent.pushEvent(topologyId, metricsSampleRate); this.notifyTopologyActionListener(topologyName, "submitTopology");
下面对源码中涉及到的相关流程进行进一步分析。对于新提交的任务首先会执行一些准备工作,包括:
- 规范化 topology 的配置信息
- 确定 topology 各个组件的并行度,保证不超过允许的最大值
- 验证 topology 的基本结构信息(topology 名称和组件 ID 的合法性、spout 是否缺失 input 声明,以及验证 worker 和 acker 的参数配置等)
然后 storm 会创建或更新当前 topology 对象的序列化文件(stormcode.ser)和配置信息文件(stormconf.ser)到 blobstore 中,如果是采用 nimbus 本地模式存储,还需要将对应的元数据写入 ZK 来保证数据一致性。
接下来会为当前 topology 生成 task 信息,并记录到 ZK 上(路径: /tasks/${topology_id}
),对于一个 topology 的同一个组件来说,如果并行度大于 1,那么 storm 会为其创建对应数量的 task,并保证 taskId 是连续的,相应实现位于 ServiceHandler#setupZkTaskInfo
方法中:
public void setupZkTaskInfo(Map<Object, Object> conf, String topologyId, StormClusterState stormClusterState) throws Exception { // 为当前 topology 追加系统组件,同时基于并行度创建组件对应的 task 信息,同一个组件的多个 task 信息具备连续的 ID Map<Integer, TaskInfo> taskToTaskInfo = this.mkTaskComponentAssignments(conf, topologyId); // 获取 topology master 的 ID (这里使用的是其对应的 task ID) int masterId = NimbusUtils.getTopologyMasterId(taskToTaskInfo); TopologyTaskHbInfo topoTaskHbInfo = new TopologyTaskHbInfo(topologyId, masterId); data.getTasksHeartbeat().put(topologyId, topoTaskHbInfo); // 创建 /ZK/taskbeats/${topology_id},并写入 topologyId 和 topologyMasterId stormClusterState.topology_heartbeat(topologyId, topoTaskHbInfo); if (taskToTaskInfo == null || taskToTaskInfo.size() == 0) { throw new InvalidTopologyException("Failed to generate TaskIDs map"); } // key is task id, value is task info // 记录 task 信息到 ZK : /ZK/tasks/${topology_id} stormClusterState.set_task(topologyId, taskToTaskInfo); }
该方法主要做了 3 件事情:
- 为当前 topology 追加系统组件(acker-bolt、master-bolt,以及 system-bolt)
- 为当前 topology 生成 task 分配信息,并记录到 ZK 相应结点
- 为当前 topology 在 ZK 上创建对应的 task 心跳记录文件
其中 1 和 2 位于 ServiceHandler#mkTaskComponentAssignments
方法中:
public Map<Integer, TaskInfo> mkTaskComponentAssignments(Map<Object, Object> conf, String topologyId) throws IOException, InvalidTopologyException, KeyNotFoundException { // 从 blobstore 中获取当前 topology 的配置信息 Map<Object, Object> stormConf = StormConfig.read_nimbus_topology_conf(topologyId, data.getBlobStore()); // 从 blobstore 中获取当前 topology 的 StormTopology 对象 StormTopology rawTopology = StormConfig.read_nimbus_topology_code(topologyId, data.getBlobStore()); // 追加一些系统组件到当前 topology 中 StormTopology topology = Common.system_topology(stormConf, rawTopology); // 为当前 topology 生成 task 信息,key 是 taskId return Common.mkTaskInfo(stormConf, topology, topologyId); }
方法首先会从 blobstore 中获取 topology 的配置信息和 StormTopology 对象,然后调用 Common#system_topology
方法添加一些系统组件,包括 acker-bolt、master-bolt,以及 system-bolt 等。
然后调用 Common#mkTaskInfo
方法为当前 topology 中的各个组件生成 task 分配信息。方法实现比较简单,返回的结果是一个 map 类型,其中 key 是 taskId,对于同一个组件来说为其分配的 taskId 是连续的,value 是对应的 TaskInfo 对象,包含两个字段:componentId 和 componentType。前者对应系统组件 ID 和用户自定义组件 ID,后者对应组件类型,也就是 bolt 和 spout。
完成了 topology 组件 task 分配信息的创建,接下来方法为当前任务创建对应的 TopologyAssignEvent 事件对象,并将事件添加到队列中,等待集群为其分配资源。这一过程位于 ServiceHandler#makeAssignment
方法中,等待的过程采用了 CountDownLatch 机制,count 值设置为 1,并设置 5 分钟上限等待集群分配资源,超时则返回 false 表示本次任务提交失败。
队列的维护和消费过程位于 TopologyAssign 类中,该类实现了 Runnable 接口,并以单例的形式对外提供服务。Nimbus 节点在启动的时候会创建并初始化 TopologyAssign 对象,并以守护线程的方式启动队列的消费过程。线程的 run 方法会循环的从队列头部以阻塞的方式获取对应的 TopologyAssignEvent 事件对象,并调用 TopologyAssign#doTopologyAssignment
方法为相应的 topology 创建任务分配信息(Assignment 对象)和基本运行信息(StormBase 对象),并将任务分配信息和基本运行信息写入 ZK,其中关键的资源分配过程位于 TopologyAssign#mkAssignment
方法中,实现如下:
public Assignment mkAssignment(TopologyAssignEvent event) throws Exception { String topologyId = event.getTopologyId(); LOG.info("Determining assignment for " + topologyId); // 1. 基于配置和当前集群运行状态创建 topology 任务分配的上下文信息 TopologyAssignContext context = this.prepareTopologyAssign(event); // 2. 依据当前的运行模式基于对应节点负载为当前 topology 中的 task 分配 worker Set<ResourceWorkerSlot> assignments; if (!StormConfig.local_mode(nimbusData.getConf())) { // 集群模式,获取模式的调度器 ITopologyScheduler scheduler = schedulers.get(DEFAULT_SCHEDULER_NAME); // DefaultTopologyScheduler // 为当前 topology 中的 task 分配 worker assignments = scheduler.assignTasks(context); } else { // 本地模式 assignments = mkLocalAssignment(context); } // 3. 记录任务分配信息到 ZK: assignments/${topology_id} Assignment assignment = null; if (assignments != null && assignments.size() > 0) { // 获取服务中的 supervisorId 及其 hostname 映射信息 Map<String, String> nodeHost = getTopologyNodeHost(context.getCluster(), context.getOldAssignment(), assignments); // 获取 task 的启动时间:<taskId, start_second> Map<Integer, Integer> startTimes = getTaskStartTimes( context, nimbusData, topologyId, context.getOldAssignment(), assignments); String codeDir = (String) nimbusData.getConf().get(Config.STORM_LOCAL_DIR); assignment = new Assignment(codeDir, assignments, nodeHost, startTimes); // the topology binary changed. if (event.isScaleTopology()) { assignment.setAssignmentType(Assignment.AssignmentType.ScaleTopology); } StormClusterState stormClusterState = nimbusData.getStormClusterState(); // 写入 assignment 信息到 ZK: assignments/${topology_id} stormClusterState.set_assignment(topologyId, assignment); // update task heartbeat's start time NimbusUtils.updateTaskHbStartTime(nimbusData, assignment, topologyId); NimbusUtils.updateTopologyTaskTimeout(nimbusData, topologyId); LOG.info("Successfully make assignment for topology id " + topologyId + ": " + assignment); } return assignment; }
TopologyAssign#mkAssignment
方法主要做了下面三件事情:
- 基于配置和当前集群运行状态为当前 topology 创建任务分配的上下文信息
- 依据当前的运行模式基于对应节点负载为当前 topology 中的 task 分配 worker
- 将 topology 的任务分配信息 Assignment 对象记录到 ZK 相应节点上
方法一开始会为当前 topology 创建任务分配的上下文信息 TopologyAssignContext 对象,该对象主要包含一下信息:
- 当前 topology 的 topologyId 和 topologyMasterId
- 当前 topology 对应的 StormTopology 对象
- 配置信息,包括 nimbus 节点配置和 topology 配置
- 当前集群可用的 supervisor 节点信息(不包含位于黑名单中的和已经死亡的)
- 当前 topology 范围内所有 taskId 与其对应的组件 ID 之间的映射关系
- 当前 topology 范围内所有 task 的状态信息
- 其他信息,包括任务分配类型、老的任务分配信息、是否是 reassign,以及未停止的 worker 列表等
完成任务分配的上下文信息创建之后,storm 会基于该信息为当前 topology 分配 worker,集群模式下该过程的实现位于 DefaultTopologyScheduler#assignTasks
方法中,该方法会先计算需要分配的 worker 数目,然后分别为每个 worker 分配对应的 supervisor 节点,最后为 topology 范围内所有组件(包括系统组件)的 task 分配对应的 worker 进程。下面先来看一下 为 worker 分配 supervisor 节点的过程:
public List<ResourceWorkerSlot> getAvailableWorkers( DefaultTopologyAssignContext context, Set<Integer> needAssign, int allocWorkerNum) { // 1. 计算需要分配的 worker 数目 int reserveWorkers = context.getReserveWorkerNum(); // 需要保留的 worker 数目 int workersNum = this.getAvailableWorkersNum(context); // 当前集群总的可用的 worker 数目 if ((workersNum - reserveWorkers) < allocWorkerNum) { // 没有足够的 worker 可以分配:可用 worker 数目 - 保留的 worker 数目 < 需要分配的数目 throw new FailedAssignTopologyException("there's no enough worker. allocWorkerNum=" + allocWorkerNum + ", availableWorkerNum=" + workersNum + ",reserveWorkerNum=" + reserveWorkers); } workersNum = allocWorkerNum; // 记录分配到的 worker List<ResourceWorkerSlot> assignedWorkers = new ArrayList<>(); // 2. 分配 worker // 2.1 处理用户自定义分配的情况 // 从 needAssign 中移除已经分配的 task,并记录分配的 worker 到 assignedWorkers 中 this.getRightWorkers(context, needAssign, assignedWorkers, workersNum, // 获取用户自定义分配 worker slot 信息,排除状态为 unstopped 的 worker this.getUserDefineWorkers(context, ConfigExtension.getUserDefineAssignment(context.getStormConf()))); if (ConfigExtension.isUseOldAssignment(context.getStormConf())) { // 2.2 如果配置指定要复用旧的分配,则优先从旧的分配中选出合适的 worker this.getRightWorkers(context, needAssign, assignedWorkers, workersNum, context.getOldWorkers()); } else if (context.getAssignType() == TopologyAssignContext.ASSIGN_TYPE_REBALANCE && !context.isReassign()) { // 2.3 如果是 rebalance 任务分配类型,且可以复用原来的 worker 则将原来分配的 worker 记录下来 int cnt = 0; for (ResourceWorkerSlot worker : context.getOldWorkers()) { if (cnt < workersNum) { ResourceWorkerSlot resFreeWorker = new ResourceWorkerSlot(); resFreeWorker.setPort(worker.getPort()); resFreeWorker.setHostname(worker.getHostname()); resFreeWorker.setNodeId(worker.getNodeId()); assignedWorkers.add(resFreeWorker); cnt++; } else { break; } } } LOG.info("Get workers from user define and old assignments: " + assignedWorkers); int restWorkerNum = workersNum - assignedWorkers.size(); // 还需要分配的 worker 数目 if (restWorkerNum < 0) { throw new FailedAssignTopologyException( "Too many workers are required for user define or old assignments. " + "workersNum=" + workersNum + ", assignedWorkersNum=" + assignedWorkers.size()); } // 2.4 对于剩下需要的 worker,直接添加 ResourceWorkerSlot 实例对象 for (int i = 0; i < restWorkerNum; i++) { assignedWorkers.add(new ResourceWorkerSlot()); } /* * 3. 遍历将 worker 分配给相应的 supervisor * - 如果 worker 指定了 supervisor,则优先分配给指定 supervisor * - 依据 supervisor 的负载情况优先选择负载较低的进行分配 */ List<SupervisorInfo> isolationSupervisors = this.getIsolationSupervisors(context); if (isolationSupervisors.size() != 0) { this.putAllWorkerToSupervisor(assignedWorkers, this.getResAvailSupervisors(isolationSupervisors)); } else { // 为 worker 分配对应的 supervisor this.putAllWorkerToSupervisor(assignedWorkers, this.getResAvailSupervisors(context.getCluster())); } this.setAllWorkerMemAndCpu(context.getStormConf(), assignedWorkers); LOG.info("Assigned workers=" + assignedWorkers); return assignedWorkers; }
为 worker 分配 supervisor 节点的过程可以概括为:
- 计算需要分配的 worker 数目,如果可用的 worker 数目不满足要求则会抛出异常
-
为需要分配的 worker 创建 ResourceWorkerSlot 分配单元信息,主要分为四种情况:
- 用户自定义 worker slot 分配
- 配置指定复用旧的分配信息则优先从旧的分配中选出合适的 worker slot
- 对于 rebalance 任务分配类型,如果允许则复用原来的 worker slot
- 剩余情况,创建新的 work slot
- 为 worker 分配相应的 supervisor 节点
下面主要来看一下步骤 3,相应实现位于 WorkerScheduler#putAllWorkerToSupervisor
方法中:
private void putAllWorkerToSupervisor(List<ResourceWorkerSlot> assignedWorkers, List<SupervisorInfo> supervisors) { // 遍历处理 worker,如果指定了 supervisor,且 supervisor 存在空闲端口,则将其分配给该 supervisor for (ResourceWorkerSlot worker : assignedWorkers) { if (worker.getHostname() != null) { for (SupervisorInfo supervisor : supervisors) { // 如果当前 worker 对应的 hostname 是该 supervisor,且 supervisor 存在空闲的 worker if (NetWorkUtils.equals(supervisor.getHostName(), worker.getHostname()) && supervisor.getAvailableWorkerPorts().size() > 0) { /* * 基于当前 supervisor 信息更新对应的 worker 信息: * 1. 保证 worker 对应的端口号是当前 supervisor 空闲的,否则选一个 supervisor 空闲的给 worker * 2. 设置 worker 对应的 nodeId 为当前 supervisor 的 ID */ this.putWorkerToSupervisor(supervisor, worker); break; } } } } // 更新 supervisor 列表,移除没有空闲端口的 supervisor supervisors = this.getResAvailSupervisors(supervisors); // 对 supervisor 按照空闲端口数由大到小排序 Collections.sort(supervisors, new Comparator<SupervisorInfo>() { @Override public int compare(SupervisorInfo o1, SupervisorInfo o2) { return -NumberUtils.compare(o1.getAvailableWorkerPorts().size(), o2.getAvailableWorkerPorts().size()); } }); /* * 按照 supervisor 的负载对 worker 进行分配: * 1. 优先选择负载较低的 supervisor 进分配 * 2. 如果 supervisor 都已经过载但还有未分配的 worker,则从过载 supervisor 优先选择空闲端口较多的进行分配 */ this.putWorkerToSupervisor(assignedWorkers, supervisors); }
如果 worker 指定了 supervisor 节点,则会将其分配给对应的 supervisor,对于剩余的 worker 来说会考虑 supervisor 节点的负载进行分配,以保证集群中 supervisor 负载的均衡性。Storm 依据集群中 supervisor 节点的平均空闲端口数作为标准来衡量 supervisor 节点的负载,如果一个 supervisor 节点的空闲端口数小于该值则认为该 supervisor 过载。集群负载均衡性的保证主要参考以下两个规则:
- 优先选择负载较低的 supervisor 节点进分配
- 如果 supervisor 节点都处于过载状态,但还有未分配的 worker,则从过载 supervisor 节点中优先选择空闲端口较多的节点进行分配
再来看一下为 task 分配 worker 进程的过程,实现位于 TaskScheduler#assign
方法中,该方法按照组件的类别分先后对 task 进行 worker 分配,顺序如下:
task.on.differ.node=true
方法实现如下:
public List<ResourceWorkerSlot> assign() { if (tasks.size() == 0) { // 没有需要再分配的任务 assignments.addAll(this.getRestAssignedWorkers()); return assignments; } // 1. 处理设置了 task.on.differ.node=true 的组件,为其在不同 supervisor 节点上分配 worker Set<Integer> assignedTasks = this.assignForDifferNodeTask(); // 2. 为剩余 task 分配 worker,不包含系统组件 tasks.removeAll(assignedTasks); Map<Integer, String> systemTasks = new HashMap<>(); for (Integer task : tasks) { String name = context.getTaskToComponent().get(task); if (Common.isSystemComponent(name)) { systemTasks.put(task, name); continue; } this.assignForTask(name, task); } // 3. 为系统组件 task 分配 worker, e.g. acker, topology master... for (Entry<Integer, String> entry : systemTasks.entrySet()) { this.assignForTask(entry.getValue(), entry.getKey()); } // 记录所有分配了 task 的 worker 集合 assignments.addAll(this.getRestAssignedWorkers()); return assignments; }
对于设置了 task.on.differ.node=true
的组件,要求名下的 task 需要运行在不同的 supervisor 节点上,所以需要优先进行分配,否则如果一些 supervisor 因为配额已满从资源池移除之后,很可能导致没有足够多的 supervisor 节点来满足此类组件的 task 分配需求。
对于这三类组件的 task 分配过程基本过程类似,基本流程可以概括如下:
- 基于多重选择器为当前 task 选择最优 worker 进行分配
- 将 task 加入到被分配 worker 的 task 列表,并更新 worker 持有的 task 数目
- 检查当前 worker 分配的 task 数目,如果配额已满则将其从资源池移除,不再分配新的 task
- 更新 task 所属组件分配在指定 worker 上的 task 数目
完成了 task 到 worker,以及 worker 到 supervisor 的配置关系,也就相当于完成了对当前 topology 的任务分配过程,紧接着 storm 会将任务分配信息记录到 ZK 对应的任务分配路径下面。需要清楚的一点是当前的分配还只是一个方案,storm 集群并没有开始真正执行当前 topology,如果需要真正启动方案的执行,storm 还需要调度各个 supervisor 节点按照方案启动相应的 worker 进程,并在每个 worker 进程上启动相应数量的线程来执行 task,相应过程我们后面会逐一进行分析。
(本篇完)
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 如何优化生产环境下的Kubernetes资源分配
- 实现动态、弹性和细粒度的资源分配和控制,支持同时加速运行多个模型训练
- 不仅Docker会使用Control Group,KVM也会使用Cgroup来控制资源分配 (1)
- 密歇根州立大学提出NestDNN:动态分配多任务资源的移动端深度学习框架
- 揭开「拓扑排序」的神秘面纱
- 理解 Storm 拓扑的并行度
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。