JStorm 源码解析:拓扑任务的资源分配过程

栏目: 编程工具 · 发布时间: 5年前

内容简介:上一篇我们分析了 topology 构建和提交过程在客户端的逻辑,并最终通过Storm 集群的任务提交主要分为三种类型:新任务提交、热部署,以及灰度发布。首先来看灰度发布的情况,当客户端请求灰度发布时,nimbus 节点会检查对应 topology 在服务端的运行情况,只有状态为 ACTIVE 时才允许执行灰度发布。灰度发布的相关实现如下:

上一篇我们分析了 topology 构建和提交过程在客户端的逻辑,并最终通过 submitTopology 方法向 storm 集群的 nimbus 节点提交任务。Nimbus 以 Thrift RPC 服务的方式运行,相应 thrift 接口方法实现位于 ServiceHandler 类中,下面我们从 ServiceHandler#submitTopology 方法切入,分析 nimbus 节点之于客户端提交任务的资源分配过程,该方法包装了 ServiceHandler#submitTopologyWithOpts 方法。

Storm 集群的任务提交主要分为三种类型:新任务提交、热部署,以及灰度发布。 ServiceHandler#submitTopologyWithOpts 方法统一处理这三种情况,但是不管哪种提交方式都会首先验证 topology 名称和配置的合法性,然后基于具体提交类型分而治之。

灰度发布 & 热部署

首先来看灰度发布的情况,当客户端请求灰度发布时,nimbus 节点会检查对应 topology 在服务端的运行情况,只有状态为 ACTIVE 时才允许执行灰度发布。灰度发布的相关实现如下:

// 获取指定 topology 的运行数据
TopologyInfo topologyInfo = this.getTopologyInfo(topologyId);
if (topologyInfo == null) {
    throw new TException("Failed to get topology info");
}

// 获取指定的 worker 数目:${topology.upgrade.worker.num}
int workerNum = ConfigExtension.getUpgradeWorkerNum(serializedConf);
// 获取指定的组件名称:${topology.upgrade.component}
String component = ConfigExtension.getUpgradeComponent(serializedConf);
// 获取指定的 worker 列表:${topology.upgrade.workers}
Set<String> workers = ConfigExtension.getUpgradeWorkers(serializedConf);

// 判定 topology master 是不是使用独立的 worker
if (!ConfigExtension.isTmSingleWorker(serializedConf, topologyInfo.get_topology().get_numWorkers())) {
    throw new TException("Gray upgrade requires that topology master to be a single worker, cannot perform the upgrade!");
}

// 灰度发布
return this.grayUpgrade(topologyId, uploadedJarLocation, topology, serializedConf, component, workers, workerNum);

对于允许灰度发布的场景,storm 会基于当前提交 topology 的配置首先会尝试获取以下 3 个参数用于挑选 worker 进行发布:

  • topology.upgrade.worker.num
  • topology.upgrade.component
  • topology.upgrade.workers

如果同时指定了多个参数,方法会基于一定的优先级进行决策,具体如下:

  • 如果参数 topology.upgrade.workers 不为空则忽略其他参数,挑选指定的 worker 进行发布,需要注意的是这些 worker 发布完之后,这个参数就自动置空
  • 否则查看参数 topology.upgrade.component 是否为空,如果不为空还需要查看参数 topology.upgrade.worker.num 是否为 0, 如果不为 0 则挑选指定工作组件下 topology.upgrade.worker.num 指定数目的 worker 进行发布,否则对这些工作组件下所有 worker 进行发布
  • 如果上面两个都为空,则随机挑选 topology.upgrade.worker.num 个 worker 进行发布

灰度发布的具体执行流程位于 ServiceHandler#grayUpgrade 方法中,该方法实现比较冗长,故不在此贴出,下面参考源码和官网文档对发布的过程进行说明:

  1. 方法首先尝试从 ZK 获取当前 topology 对应的基本信息(路径: /topology/${topology_id} )和灰度发布信息(路径: /gray_upgrade/${topology_id} ),以及任务分配信息(路径: assignments/${topology_id} )。
  2. 如果存在灰度发布信息,则判断对应的灰度状态(已过期 / 已完成 / 进行中),如果正在灰度中则拒绝本次灰度请求,否则(包含不存在灰度发布信息的情况)继续执行灰度发布。
  3. 方法利用 GrayUpgradeConfig 对象封装灰度发布信息,并写入到 ZK 的 /gray_upgrade/${topology_id} 路径下,同时设置 config.continueUpgrading=true
  4. Topology Master 有一个线程 GrayUpgradeHandler 会定时读取该节点的配置,检测到有灰度发布配置且 continueUpgrading=true 时,将分配指定数目的 worker,添加到 ZK 的 /gray_upgrade/${topology_id}/upgrading_workers 路径下,并设置 continueUpgrading=false (防止自动进行后续的灰度发布)。
  5. SyncSupervisorEvent 会定时检查每个拓扑的 upgrading_workers 节点,一旦有数据就和自身的 IP 和端口列表进行对比,如果有属于该 supervisor 节点的灰度发布就下载最新的 storm-code 和 storm-jar,然后重启 worker,同时将 worker 添加到 ZK 的 upgraded_workers 节点下。
  6. GrayUpgradeHandler 检测 ZK,如果 upgraded_workers 的 worker 数大于等于当前总 worker 数减 1(topology master 组件占用),则认为此次灰度发布已经完成,删除 ZK 上的灰度发布配置、upgrading_workers,以及 upgraded_workers。

如果只想升级部分 worker 或特定组件,可以用 complete_upgrade 强制完成升级。灰度发布过程中使用单独的 upgrading_workers 和 upgraded_workers 的设计主要是为了避免同步问题。如果将这些信息写在 GrayUpgradeConfig 类中可能会涉及到多个 supervisor 节点同时更新 workers 的情况,而使用单独的节点则只需要在这个节点下添加和删除子节点,不会有同步问题。

热部署和灰度发布从形式上来看都是对运行中 topology 的更新替换操作,但是对于 nimbus 来说,在处理上却是两条不同的分支,实际上热部署与新任务提交在处理过程上更加形似,毕竟热部署的过程就是杀死处于运行中的 topology 然后执行新任务提交的过程,所以接下来我们主要分析新任务的调度细节。

新任务提交

对于新提交的任务来说,storm 会为该 topology 执行一些准备和验证工作,并在 ZK 上创建相应的结点记录该 topology 的元数据和任务分配信息,然后为该 topology 生成一个事件提交给任务分配队列等待 nimbus 节点为当前 topology 制定运行方案,并在执行成功后发送相应的通知事件。相关实现如下:

// 对当前 topology 配置进行规范化,并附加一些必要的配置
Map<Object, Object> stormConf = NimbusUtils.normalizeConf(conf, serializedConf, topology);
LOG.info("Normalized configuration:" + stormConf);

// 合并集群配置和拓扑配置
Map<Object, Object> totalStormConf = new HashMap<>(conf);
totalStormConf.putAll(stormConf);

// 确定 topology 中各个组件的并行度,保证不超过当前 topology 允许的最大值
StormTopology normalizedTopology = NimbusUtils.normalizeTopology(stormConf, topology, true);

/*
 * 验证 topology 的基本结构信息:
 * 1. 验证 topologyName,组件 ID 是否合法
 * 2. 验证是否存在缺失 input 声明的 spout
 * 3. 验证 woker 和 acker 数目参数配置
 */
Common.validate_basic(normalizedTopology, totalStormConf, topologyId);

StormClusterState stormClusterState = data.getStormClusterState();

// 创建 /local-dir/nimbus/${topology_id}/xxxx 文件,并将元数据同步到 ZK
this.setupStormCode(topologyId, uploadedJarLocation, stormConf, normalizedTopology, false);

// wait for blob replication before activate topology
this.waitForDesiredCodeReplication(conf, topologyId);

// generate TaskInfo for every bolt or spout in ZK : /ZK/tasks/topoologyId/xxx
// 为当前 topology 在 ZK 上生成 task 信息:/tasks/${topology_id}
this.setupZkTaskInfo(conf, topologyId, stormClusterState);

// mkdir topology error directory : taskerrors/${topology_id}
String path = Cluster.taskerror_storm_root(topologyId);
stormClusterState.mkdir(path);

String grayUpgradeBasePath = Cluster.gray_upgrade_base_path(topologyId); // gray_upgrade/${topology_id}
stormClusterState.mkdir(grayUpgradeBasePath);
// gray_upgrade/${topology_id}/upgraded_workers
stormClusterState.mkdir(Cluster.gray_upgrade_upgraded_workers_path(topologyId));
// gray_upgrade/${topology_id}/upgrading_workers
stormClusterState.mkdir(Cluster.gray_upgrade_upgrading_workers_path(topologyId));

// 为当前 topology 执行任务分配
LOG.info("Submit topology {} with conf {}", topologyName, serializedConf);
this.makeAssignment(topologyName, topologyId, options.get_initial_status());

// push start event after startup
double metricsSampleRate = ConfigExtension.getMetricSampleRate(stormConf); // ${topology.metric.sample.rate},默认是 0.05
StartTopologyEvent.pushEvent(topologyId, metricsSampleRate);

this.notifyTopologyActionListener(topologyName, "submitTopology");

下面对源码中涉及到的相关流程进行进一步分析。对于新提交的任务首先会执行一些准备工作,包括:

  1. 规范化 topology 的配置信息
  2. 确定 topology 各个组件的并行度,保证不超过允许的最大值
  3. 验证 topology 的基本结构信息(topology 名称和组件 ID 的合法性、spout 是否缺失 input 声明,以及验证 worker 和 acker 的参数配置等)

然后 storm 会创建或更新当前 topology 对象的序列化文件(stormcode.ser)和配置信息文件(stormconf.ser)到 blobstore 中,如果是采用 nimbus 本地模式存储,还需要将对应的元数据写入 ZK 来保证数据一致性。

接下来会为当前 topology 生成 task 信息,并记录到 ZK 上(路径: /tasks/${topology_id} ),对于一个 topology 的同一个组件来说,如果并行度大于 1,那么 storm 会为其创建对应数量的 task,并保证 taskId 是连续的,相应实现位于 ServiceHandler#setupZkTaskInfo 方法中:

public void setupZkTaskInfo(Map<Object, Object> conf, String topologyId, StormClusterState stormClusterState) throws Exception {
    // 为当前 topology 追加系统组件,同时基于并行度创建组件对应的 task 信息,同一个组件的多个 task 信息具备连续的 ID
    Map<Integer, TaskInfo> taskToTaskInfo = this.mkTaskComponentAssignments(conf, topologyId);

    // 获取 topology master 的 ID (这里使用的是其对应的 task ID)
    int masterId = NimbusUtils.getTopologyMasterId(taskToTaskInfo);
    TopologyTaskHbInfo topoTaskHbInfo = new TopologyTaskHbInfo(topologyId, masterId);
    data.getTasksHeartbeat().put(topologyId, topoTaskHbInfo);
    // 创建 /ZK/taskbeats/${topology_id},并写入 topologyId 和 topologyMasterId
    stormClusterState.topology_heartbeat(topologyId, topoTaskHbInfo);

    if (taskToTaskInfo == null || taskToTaskInfo.size() == 0) {
        throw new InvalidTopologyException("Failed to generate TaskIDs map");
    }
    // key is task id, value is task info
    // 记录 task 信息到 ZK : /ZK/tasks/${topology_id}
    stormClusterState.set_task(topologyId, taskToTaskInfo);
}

该方法主要做了 3 件事情:

  1. 为当前 topology 追加系统组件(acker-bolt、master-bolt,以及 system-bolt)
  2. 为当前 topology 生成 task 分配信息,并记录到 ZK 相应结点
  3. 为当前 topology 在 ZK 上创建对应的 task 心跳记录文件

其中 1 和 2 位于 ServiceHandler#mkTaskComponentAssignments 方法中:

public Map<Integer, TaskInfo> mkTaskComponentAssignments(Map<Object, Object> conf, String topologyId)
        throws IOException, InvalidTopologyException, KeyNotFoundException {
    // 从 blobstore 中获取当前 topology 的配置信息
    Map<Object, Object> stormConf = StormConfig.read_nimbus_topology_conf(topologyId, data.getBlobStore());
    // 从 blobstore 中获取当前 topology 的 StormTopology 对象
    StormTopology rawTopology = StormConfig.read_nimbus_topology_code(topologyId, data.getBlobStore());
    // 追加一些系统组件到当前 topology 中
    StormTopology topology = Common.system_topology(stormConf, rawTopology);
    // 为当前 topology 生成 task 信息,key 是 taskId
    return Common.mkTaskInfo(stormConf, topology, topologyId);
}

方法首先会从 blobstore 中获取 topology 的配置信息和 StormTopology 对象,然后调用 Common#system_topology 方法添加一些系统组件,包括 acker-bolt、master-bolt,以及 system-bolt 等。

然后调用 Common#mkTaskInfo 方法为当前 topology 中的各个组件生成 task 分配信息。方法实现比较简单,返回的结果是一个 map 类型,其中 key 是 taskId,对于同一个组件来说为其分配的 taskId 是连续的,value 是对应的 TaskInfo 对象,包含两个字段:componentId 和 componentType。前者对应系统组件 ID 和用户自定义组件 ID,后者对应组件类型,也就是 bolt 和 spout。

完成了 topology 组件 task 分配信息的创建,接下来方法为当前任务创建对应的 TopologyAssignEvent 事件对象,并将事件添加到队列中,等待集群为其分配资源。这一过程位于 ServiceHandler#makeAssignment 方法中,等待的过程采用了 CountDownLatch 机制,count 值设置为 1,并设置 5 分钟上限等待集群分配资源,超时则返回 false 表示本次任务提交失败。

队列的维护和消费过程位于 TopologyAssign 类中,该类实现了 Runnable 接口,并以单例的形式对外提供服务。Nimbus 节点在启动的时候会创建并初始化 TopologyAssign 对象,并以守护线程的方式启动队列的消费过程。线程的 run 方法会循环的从队列头部以阻塞的方式获取对应的 TopologyAssignEvent 事件对象,并调用 TopologyAssign#doTopologyAssignment 方法为相应的 topology 创建任务分配信息(Assignment 对象)和基本运行信息(StormBase 对象),并将任务分配信息和基本运行信息写入 ZK,其中关键的资源分配过程位于 TopologyAssign#mkAssignment 方法中,实现如下:

public Assignment mkAssignment(TopologyAssignEvent event) throws Exception {
    String topologyId = event.getTopologyId();
    LOG.info("Determining assignment for " + topologyId);

    // 1. 基于配置和当前集群运行状态创建 topology 任务分配的上下文信息
    TopologyAssignContext context = this.prepareTopologyAssign(event);

    // 2. 依据当前的运行模式基于对应节点负载为当前 topology 中的 task 分配 worker
    Set<ResourceWorkerSlot> assignments;
    if (!StormConfig.local_mode(nimbusData.getConf())) {
        // 集群模式,获取模式的调度器
        ITopologyScheduler scheduler = schedulers.get(DEFAULT_SCHEDULER_NAME); // DefaultTopologyScheduler
        // 为当前 topology 中的 task 分配 worker
        assignments = scheduler.assignTasks(context);
    } else {
        // 本地模式
        assignments = mkLocalAssignment(context);
    }

    // 3. 记录任务分配信息到 ZK: assignments/${topology_id}
    Assignment assignment = null;
    if (assignments != null && assignments.size() > 0) {
        // 获取服务中的 supervisorId 及其 hostname 映射信息
        Map<String, String> nodeHost = getTopologyNodeHost(context.getCluster(), context.getOldAssignment(), assignments);
        // 获取 task 的启动时间:<taskId, start_second>
        Map<Integer, Integer> startTimes = getTaskStartTimes(
                context, nimbusData, topologyId, context.getOldAssignment(), assignments);

        String codeDir = (String) nimbusData.getConf().get(Config.STORM_LOCAL_DIR);

        assignment = new Assignment(codeDir, assignments, nodeHost, startTimes);

        //  the topology binary changed.
        if (event.isScaleTopology()) {
            assignment.setAssignmentType(Assignment.AssignmentType.ScaleTopology);
        }
        StormClusterState stormClusterState = nimbusData.getStormClusterState();
        // 写入 assignment 信息到 ZK: assignments/${topology_id}
        stormClusterState.set_assignment(topologyId, assignment);

        // update task heartbeat's start time
        NimbusUtils.updateTaskHbStartTime(nimbusData, assignment, topologyId);
        NimbusUtils.updateTopologyTaskTimeout(nimbusData, topologyId);

        LOG.info("Successfully make assignment for topology id " + topologyId + ": " + assignment);
    }
    return assignment;
}

TopologyAssign#mkAssignment 方法主要做了下面三件事情:

  1. 基于配置和当前集群运行状态为当前 topology 创建任务分配的上下文信息
  2. 依据当前的运行模式基于对应节点负载为当前 topology 中的 task 分配 worker
  3. 将 topology 的任务分配信息 Assignment 对象记录到 ZK 相应节点上

方法一开始会为当前 topology 创建任务分配的上下文信息 TopologyAssignContext 对象,该对象主要包含一下信息:

  • 当前 topology 的 topologyId 和 topologyMasterId
  • 当前 topology 对应的 StormTopology 对象
  • 配置信息,包括 nimbus 节点配置和 topology 配置
  • 当前集群可用的 supervisor 节点信息(不包含位于黑名单中的和已经死亡的)
  • 当前 topology 范围内所有 taskId 与其对应的组件 ID 之间的映射关系
  • 当前 topology 范围内所有 task 的状态信息
  • 其他信息,包括任务分配类型、老的任务分配信息、是否是 reassign,以及未停止的 worker 列表等

完成任务分配的上下文信息创建之后,storm 会基于该信息为当前 topology 分配 worker,集群模式下该过程的实现位于 DefaultTopologyScheduler#assignTasks 方法中,该方法会先计算需要分配的 worker 数目,然后分别为每个 worker 分配对应的 supervisor 节点,最后为 topology 范围内所有组件(包括系统组件)的 task 分配对应的 worker 进程。下面先来看一下 为 worker 分配 supervisor 节点的过程:

public List<ResourceWorkerSlot> getAvailableWorkers(
        DefaultTopologyAssignContext context, Set<Integer> needAssign, int allocWorkerNum) {

    // 1. 计算需要分配的 worker 数目
    int reserveWorkers = context.getReserveWorkerNum(); // 需要保留的 worker 数目
    int workersNum = this.getAvailableWorkersNum(context); // 当前集群总的可用的 worker 数目
    if ((workersNum - reserveWorkers) < allocWorkerNum) {
        // 没有足够的 worker 可以分配:可用 worker 数目 - 保留的 worker 数目 < 需要分配的数目
        throw new FailedAssignTopologyException("there's no enough worker. allocWorkerNum="
                + allocWorkerNum + ", availableWorkerNum=" + workersNum + ",reserveWorkerNum=" + reserveWorkers);
    }
    workersNum = allocWorkerNum;

    // 记录分配到的 worker
    List<ResourceWorkerSlot> assignedWorkers = new ArrayList<>();

    // 2. 分配 worker

    // 2.1 处理用户自定义分配的情况
    // 从 needAssign 中移除已经分配的 task,并记录分配的 worker 到 assignedWorkers 中
    this.getRightWorkers(context, needAssign, assignedWorkers, workersNum,
            // 获取用户自定义分配 worker slot 信息,排除状态为 unstopped 的 worker
            this.getUserDefineWorkers(context, ConfigExtension.getUserDefineAssignment(context.getStormConf())));

    if (ConfigExtension.isUseOldAssignment(context.getStormConf())) {
        // 2.2 如果配置指定要复用旧的分配,则优先从旧的分配中选出合适的 worker
        this.getRightWorkers(context, needAssign, assignedWorkers, workersNum, context.getOldWorkers());
    } else if (context.getAssignType() == TopologyAssignContext.ASSIGN_TYPE_REBALANCE && !context.isReassign()) {
        // 2.3 如果是 rebalance 任务分配类型,且可以复用原来的 worker 则将原来分配的 worker 记录下来
        int cnt = 0;
        for (ResourceWorkerSlot worker : context.getOldWorkers()) {
            if (cnt < workersNum) {
                ResourceWorkerSlot resFreeWorker = new ResourceWorkerSlot();
                resFreeWorker.setPort(worker.getPort());
                resFreeWorker.setHostname(worker.getHostname());
                resFreeWorker.setNodeId(worker.getNodeId());
                assignedWorkers.add(resFreeWorker);
                cnt++;
            } else {
                break;
            }
        }
    }

    LOG.info("Get workers from user define and old assignments: " + assignedWorkers);

    int restWorkerNum = workersNum - assignedWorkers.size(); // 还需要分配的 worker 数目
    if (restWorkerNum < 0) {
        throw new FailedAssignTopologyException(
                "Too many workers are required for user define or old assignments. " +
                        "workersNum=" + workersNum + ", assignedWorkersNum=" + assignedWorkers.size());
    }

    // 2.4 对于剩下需要的 worker,直接添加 ResourceWorkerSlot 实例对象
    for (int i = 0; i < restWorkerNum; i++) {
        assignedWorkers.add(new ResourceWorkerSlot());
    }

    /*
     * 3. 遍历将 worker 分配给相应的 supervisor
     * - 如果 worker 指定了 supervisor,则优先分配给指定 supervisor
     * - 依据 supervisor 的负载情况优先选择负载较低的进行分配
     */
    List<SupervisorInfo> isolationSupervisors = this.getIsolationSupervisors(context);
    if (isolationSupervisors.size() != 0) {
        this.putAllWorkerToSupervisor(assignedWorkers, this.getResAvailSupervisors(isolationSupervisors));
    } else {
        // 为 worker 分配对应的 supervisor
        this.putAllWorkerToSupervisor(assignedWorkers, this.getResAvailSupervisors(context.getCluster()));
    }
    this.setAllWorkerMemAndCpu(context.getStormConf(), assignedWorkers);
    LOG.info("Assigned workers=" + assignedWorkers);
    return assignedWorkers;
}

为 worker 分配 supervisor 节点的过程可以概括为:

  1. 计算需要分配的 worker 数目,如果可用的 worker 数目不满足要求则会抛出异常
  2. 为需要分配的 worker 创建 ResourceWorkerSlot 分配单元信息,主要分为四种情况:
    • 用户自定义 worker slot 分配
    • 配置指定复用旧的分配信息则优先从旧的分配中选出合适的 worker slot
    • 对于 rebalance 任务分配类型,如果允许则复用原来的 worker slot
    • 剩余情况,创建新的 work slot
  3. 为 worker 分配相应的 supervisor 节点

下面主要来看一下步骤 3,相应实现位于 WorkerScheduler#putAllWorkerToSupervisor 方法中:

private void putAllWorkerToSupervisor(List<ResourceWorkerSlot> assignedWorkers, List<SupervisorInfo> supervisors) {

    // 遍历处理 worker,如果指定了 supervisor,且 supervisor 存在空闲端口,则将其分配给该 supervisor
    for (ResourceWorkerSlot worker : assignedWorkers) {
        if (worker.getHostname() != null) {
            for (SupervisorInfo supervisor : supervisors) {
                // 如果当前 worker 对应的 hostname 是该 supervisor,且 supervisor 存在空闲的 worker
                if (NetWorkUtils.equals(supervisor.getHostName(), worker.getHostname())
                        && supervisor.getAvailableWorkerPorts().size() > 0) {
                    /*
                     * 基于当前 supervisor 信息更新对应的 worker 信息:
                     *  1. 保证 worker 对应的端口号是当前 supervisor 空闲的,否则选一个 supervisor 空闲的给 worker
                     *  2. 设置 worker 对应的 nodeId 为当前 supervisor 的 ID
                     */
                    this.putWorkerToSupervisor(supervisor, worker);
                    break;
                }
            }
        }
    }

    // 更新 supervisor 列表,移除没有空闲端口的 supervisor
    supervisors = this.getResAvailSupervisors(supervisors);

    // 对 supervisor 按照空闲端口数由大到小排序
    Collections.sort(supervisors, new Comparator<SupervisorInfo>() {

        @Override
        public int compare(SupervisorInfo o1, SupervisorInfo o2) {
            return -NumberUtils.compare(o1.getAvailableWorkerPorts().size(), o2.getAvailableWorkerPorts().size());
        }

    });

    /*
     * 按照 supervisor 的负载对 worker 进行分配:
     * 1. 优先选择负载较低的 supervisor 进分配
     * 2. 如果 supervisor 都已经过载但还有未分配的 worker,则从过载 supervisor 优先选择空闲端口较多的进行分配
     */
    this.putWorkerToSupervisor(assignedWorkers, supervisors);
}

如果 worker 指定了 supervisor 节点,则会将其分配给对应的 supervisor,对于剩余的 worker 来说会考虑 supervisor 节点的负载进行分配,以保证集群中 supervisor 负载的均衡性。Storm 依据集群中 supervisor 节点的平均空闲端口数作为标准来衡量 supervisor 节点的负载,如果一个 supervisor 节点的空闲端口数小于该值则认为该 supervisor 过载。集群负载均衡性的保证主要参考以下两个规则:

  1. 优先选择负载较低的 supervisor 节点进分配
  2. 如果 supervisor 节点都处于过载状态,但还有未分配的 worker,则从过载 supervisor 节点中优先选择空闲端口较多的节点进行分配

再来看一下为 task 分配 worker 进程的过程,实现位于 TaskScheduler#assign 方法中,该方法按照组件的类别分先后对 task 进行 worker 分配,顺序如下:

task.on.differ.node=true

方法实现如下:

public List<ResourceWorkerSlot> assign() {
    if (tasks.size() == 0) { // 没有需要再分配的任务
        assignments.addAll(this.getRestAssignedWorkers());
        return assignments;
    }

    // 1. 处理设置了 task.on.differ.node=true 的组件,为其在不同 supervisor 节点上分配 worker
    Set<Integer> assignedTasks = this.assignForDifferNodeTask();

    // 2. 为剩余 task 分配 worker,不包含系统组件
    tasks.removeAll(assignedTasks);
    Map<Integer, String> systemTasks = new HashMap<>();
    for (Integer task : tasks) {
        String name = context.getTaskToComponent().get(task);
        if (Common.isSystemComponent(name)) {
            systemTasks.put(task, name);
            continue;
        }
        this.assignForTask(name, task);
    }

    // 3. 为系统组件 task 分配 worker, e.g. acker, topology master...
    for (Entry<Integer, String> entry : systemTasks.entrySet()) {
        this.assignForTask(entry.getValue(), entry.getKey());
    }

    // 记录所有分配了 task 的 worker 集合
    assignments.addAll(this.getRestAssignedWorkers());
    return assignments;
}

对于设置了 task.on.differ.node=true 的组件,要求名下的 task 需要运行在不同的 supervisor 节点上,所以需要优先进行分配,否则如果一些 supervisor 因为配额已满从资源池移除之后,很可能导致没有足够多的 supervisor 节点来满足此类组件的 task 分配需求。

对于这三类组件的 task 分配过程基本过程类似,基本流程可以概括如下:

  1. 基于多重选择器为当前 task 选择最优 worker 进行分配
  2. 将 task 加入到被分配 worker 的 task 列表,并更新 worker 持有的 task 数目
  3. 检查当前 worker 分配的 task 数目,如果配额已满则将其从资源池移除,不再分配新的 task
  4. 更新 task 所属组件分配在指定 worker 上的 task 数目

完成了 task 到 worker,以及 worker 到 supervisor 的配置关系,也就相当于完成了对当前 topology 的任务分配过程,紧接着 storm 会将任务分配信息记录到 ZK 对应的任务分配路径下面。需要清楚的一点是当前的分配还只是一个方案,storm 集群并没有开始真正执行当前 topology,如果需要真正启动方案的执行,storm 还需要调度各个 supervisor 节点按照方案启动相应的 worker 进程,并在每个 worker 进程上启动相应数量的线程来执行 task,相应过程我们后面会逐一进行分析。

(本篇完)


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

中国机器人

中国机器人

[中]王鸿鹏、[中]马娜 / 辽宁人民出版社 / 2017-1-1 / 48.00元

本书对中国机器人领域的发展历史做了引人入胜的介绍,中国机器人成长的过程也是中国经济由弱到强的历程。本书实际是选择了一个独特的视角来解读中国数十年的政治、经济、国家战略问题。中国的未来充满了多重可能性,本书对想了解中国当代与未来发展战略的读者是难得的读本,对智能制造这一当今世界*受关注的高科技领域在战略层面和科技伦理层面进行了深入地剖析和思考,其中提出的诸多前沿性观点是全球都将面对的问题,对中国科学......一起来看看 《中国机器人》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

URL 编码/解码
URL 编码/解码

URL 编码/解码

SHA 加密
SHA 加密

SHA 加密工具