JStorm 源码解析:拓扑任务的资源分配过程

栏目: 编程工具 · 发布时间: 6年前

内容简介:上一篇我们分析了 topology 构建和提交过程在客户端的逻辑,并最终通过Storm 集群的任务提交主要分为三种类型:新任务提交、热部署,以及灰度发布。首先来看灰度发布的情况,当客户端请求灰度发布时,nimbus 节点会检查对应 topology 在服务端的运行情况,只有状态为 ACTIVE 时才允许执行灰度发布。灰度发布的相关实现如下:

上一篇我们分析了 topology 构建和提交过程在客户端的逻辑,并最终通过 submitTopology 方法向 storm 集群的 nimbus 节点提交任务。Nimbus 以 Thrift RPC 服务的方式运行,相应 thrift 接口方法实现位于 ServiceHandler 类中,下面我们从 ServiceHandler#submitTopology 方法切入,分析 nimbus 节点之于客户端提交任务的资源分配过程,该方法包装了 ServiceHandler#submitTopologyWithOpts 方法。

Storm 集群的任务提交主要分为三种类型:新任务提交、热部署,以及灰度发布。 ServiceHandler#submitTopologyWithOpts 方法统一处理这三种情况,但是不管哪种提交方式都会首先验证 topology 名称和配置的合法性,然后基于具体提交类型分而治之。

灰度发布 & 热部署

首先来看灰度发布的情况,当客户端请求灰度发布时,nimbus 节点会检查对应 topology 在服务端的运行情况,只有状态为 ACTIVE 时才允许执行灰度发布。灰度发布的相关实现如下:

// 获取指定 topology 的运行数据
TopologyInfo topologyInfo = this.getTopologyInfo(topologyId);
if (topologyInfo == null) {
    throw new TException("Failed to get topology info");
}

// 获取指定的 worker 数目:${topology.upgrade.worker.num}
int workerNum = ConfigExtension.getUpgradeWorkerNum(serializedConf);
// 获取指定的组件名称:${topology.upgrade.component}
String component = ConfigExtension.getUpgradeComponent(serializedConf);
// 获取指定的 worker 列表:${topology.upgrade.workers}
Set<String> workers = ConfigExtension.getUpgradeWorkers(serializedConf);

// 判定 topology master 是不是使用独立的 worker
if (!ConfigExtension.isTmSingleWorker(serializedConf, topologyInfo.get_topology().get_numWorkers())) {
    throw new TException("Gray upgrade requires that topology master to be a single worker, cannot perform the upgrade!");
}

// 灰度发布
return this.grayUpgrade(topologyId, uploadedJarLocation, topology, serializedConf, component, workers, workerNum);

对于允许灰度发布的场景,storm 会基于当前提交 topology 的配置首先会尝试获取以下 3 个参数用于挑选 worker 进行发布:

  • topology.upgrade.worker.num
  • topology.upgrade.component
  • topology.upgrade.workers

如果同时指定了多个参数,方法会基于一定的优先级进行决策,具体如下:

  • 如果参数 topology.upgrade.workers 不为空则忽略其他参数,挑选指定的 worker 进行发布,需要注意的是这些 worker 发布完之后,这个参数就自动置空
  • 否则查看参数 topology.upgrade.component 是否为空,如果不为空还需要查看参数 topology.upgrade.worker.num 是否为 0, 如果不为 0 则挑选指定工作组件下 topology.upgrade.worker.num 指定数目的 worker 进行发布,否则对这些工作组件下所有 worker 进行发布
  • 如果上面两个都为空,则随机挑选 topology.upgrade.worker.num 个 worker 进行发布

灰度发布的具体执行流程位于 ServiceHandler#grayUpgrade 方法中,该方法实现比较冗长,故不在此贴出,下面参考源码和官网文档对发布的过程进行说明:

  1. 方法首先尝试从 ZK 获取当前 topology 对应的基本信息(路径: /topology/${topology_id} )和灰度发布信息(路径: /gray_upgrade/${topology_id} ),以及任务分配信息(路径: assignments/${topology_id} )。
  2. 如果存在灰度发布信息,则判断对应的灰度状态(已过期 / 已完成 / 进行中),如果正在灰度中则拒绝本次灰度请求,否则(包含不存在灰度发布信息的情况)继续执行灰度发布。
  3. 方法利用 GrayUpgradeConfig 对象封装灰度发布信息,并写入到 ZK 的 /gray_upgrade/${topology_id} 路径下,同时设置 config.continueUpgrading=true
  4. Topology Master 有一个线程 GrayUpgradeHandler 会定时读取该节点的配置,检测到有灰度发布配置且 continueUpgrading=true 时,将分配指定数目的 worker,添加到 ZK 的 /gray_upgrade/${topology_id}/upgrading_workers 路径下,并设置 continueUpgrading=false (防止自动进行后续的灰度发布)。
  5. SyncSupervisorEvent 会定时检查每个拓扑的 upgrading_workers 节点,一旦有数据就和自身的 IP 和端口列表进行对比,如果有属于该 supervisor 节点的灰度发布就下载最新的 storm-code 和 storm-jar,然后重启 worker,同时将 worker 添加到 ZK 的 upgraded_workers 节点下。
  6. GrayUpgradeHandler 检测 ZK,如果 upgraded_workers 的 worker 数大于等于当前总 worker 数减 1(topology master 组件占用),则认为此次灰度发布已经完成,删除 ZK 上的灰度发布配置、upgrading_workers,以及 upgraded_workers。

如果只想升级部分 worker 或特定组件,可以用 complete_upgrade 强制完成升级。灰度发布过程中使用单独的 upgrading_workers 和 upgraded_workers 的设计主要是为了避免同步问题。如果将这些信息写在 GrayUpgradeConfig 类中可能会涉及到多个 supervisor 节点同时更新 workers 的情况,而使用单独的节点则只需要在这个节点下添加和删除子节点,不会有同步问题。

热部署和灰度发布从形式上来看都是对运行中 topology 的更新替换操作,但是对于 nimbus 来说,在处理上却是两条不同的分支,实际上热部署与新任务提交在处理过程上更加形似,毕竟热部署的过程就是杀死处于运行中的 topology 然后执行新任务提交的过程,所以接下来我们主要分析新任务的调度细节。

新任务提交

对于新提交的任务来说,storm 会为该 topology 执行一些准备和验证工作,并在 ZK 上创建相应的结点记录该 topology 的元数据和任务分配信息,然后为该 topology 生成一个事件提交给任务分配队列等待 nimbus 节点为当前 topology 制定运行方案,并在执行成功后发送相应的通知事件。相关实现如下:

// 对当前 topology 配置进行规范化,并附加一些必要的配置
Map<Object, Object> stormConf = NimbusUtils.normalizeConf(conf, serializedConf, topology);
LOG.info("Normalized configuration:" + stormConf);

// 合并集群配置和拓扑配置
Map<Object, Object> totalStormConf = new HashMap<>(conf);
totalStormConf.putAll(stormConf);

// 确定 topology 中各个组件的并行度,保证不超过当前 topology 允许的最大值
StormTopology normalizedTopology = NimbusUtils.normalizeTopology(stormConf, topology, true);

/*
 * 验证 topology 的基本结构信息:
 * 1. 验证 topologyName,组件 ID 是否合法
 * 2. 验证是否存在缺失 input 声明的 spout
 * 3. 验证 woker 和 acker 数目参数配置
 */
Common.validate_basic(normalizedTopology, totalStormConf, topologyId);

StormClusterState stormClusterState = data.getStormClusterState();

// 创建 /local-dir/nimbus/${topology_id}/xxxx 文件,并将元数据同步到 ZK
this.setupStormCode(topologyId, uploadedJarLocation, stormConf, normalizedTopology, false);

// wait for blob replication before activate topology
this.waitForDesiredCodeReplication(conf, topologyId);

// generate TaskInfo for every bolt or spout in ZK : /ZK/tasks/topoologyId/xxx
// 为当前 topology 在 ZK 上生成 task 信息:/tasks/${topology_id}
this.setupZkTaskInfo(conf, topologyId, stormClusterState);

// mkdir topology error directory : taskerrors/${topology_id}
String path = Cluster.taskerror_storm_root(topologyId);
stormClusterState.mkdir(path);

String grayUpgradeBasePath = Cluster.gray_upgrade_base_path(topologyId); // gray_upgrade/${topology_id}
stormClusterState.mkdir(grayUpgradeBasePath);
// gray_upgrade/${topology_id}/upgraded_workers
stormClusterState.mkdir(Cluster.gray_upgrade_upgraded_workers_path(topologyId));
// gray_upgrade/${topology_id}/upgrading_workers
stormClusterState.mkdir(Cluster.gray_upgrade_upgrading_workers_path(topologyId));

// 为当前 topology 执行任务分配
LOG.info("Submit topology {} with conf {}", topologyName, serializedConf);
this.makeAssignment(topologyName, topologyId, options.get_initial_status());

// push start event after startup
double metricsSampleRate = ConfigExtension.getMetricSampleRate(stormConf); // ${topology.metric.sample.rate},默认是 0.05
StartTopologyEvent.pushEvent(topologyId, metricsSampleRate);

this.notifyTopologyActionListener(topologyName, "submitTopology");

下面对源码中涉及到的相关流程进行进一步分析。对于新提交的任务首先会执行一些准备工作,包括:

  1. 规范化 topology 的配置信息
  2. 确定 topology 各个组件的并行度,保证不超过允许的最大值
  3. 验证 topology 的基本结构信息(topology 名称和组件 ID 的合法性、spout 是否缺失 input 声明,以及验证 worker 和 acker 的参数配置等)

然后 storm 会创建或更新当前 topology 对象的序列化文件(stormcode.ser)和配置信息文件(stormconf.ser)到 blobstore 中,如果是采用 nimbus 本地模式存储,还需要将对应的元数据写入 ZK 来保证数据一致性。

接下来会为当前 topology 生成 task 信息,并记录到 ZK 上(路径: /tasks/${topology_id} ),对于一个 topology 的同一个组件来说,如果并行度大于 1,那么 storm 会为其创建对应数量的 task,并保证 taskId 是连续的,相应实现位于 ServiceHandler#setupZkTaskInfo 方法中:

public void setupZkTaskInfo(Map<Object, Object> conf, String topologyId, StormClusterState stormClusterState) throws Exception {
    // 为当前 topology 追加系统组件,同时基于并行度创建组件对应的 task 信息,同一个组件的多个 task 信息具备连续的 ID
    Map<Integer, TaskInfo> taskToTaskInfo = this.mkTaskComponentAssignments(conf, topologyId);

    // 获取 topology master 的 ID (这里使用的是其对应的 task ID)
    int masterId = NimbusUtils.getTopologyMasterId(taskToTaskInfo);
    TopologyTaskHbInfo topoTaskHbInfo = new TopologyTaskHbInfo(topologyId, masterId);
    data.getTasksHeartbeat().put(topologyId, topoTaskHbInfo);
    // 创建 /ZK/taskbeats/${topology_id},并写入 topologyId 和 topologyMasterId
    stormClusterState.topology_heartbeat(topologyId, topoTaskHbInfo);

    if (taskToTaskInfo == null || taskToTaskInfo.size() == 0) {
        throw new InvalidTopologyException("Failed to generate TaskIDs map");
    }
    // key is task id, value is task info
    // 记录 task 信息到 ZK : /ZK/tasks/${topology_id}
    stormClusterState.set_task(topologyId, taskToTaskInfo);
}

该方法主要做了 3 件事情:

  1. 为当前 topology 追加系统组件(acker-bolt、master-bolt,以及 system-bolt)
  2. 为当前 topology 生成 task 分配信息,并记录到 ZK 相应结点
  3. 为当前 topology 在 ZK 上创建对应的 task 心跳记录文件

其中 1 和 2 位于 ServiceHandler#mkTaskComponentAssignments 方法中:

public Map<Integer, TaskInfo> mkTaskComponentAssignments(Map<Object, Object> conf, String topologyId)
        throws IOException, InvalidTopologyException, KeyNotFoundException {
    // 从 blobstore 中获取当前 topology 的配置信息
    Map<Object, Object> stormConf = StormConfig.read_nimbus_topology_conf(topologyId, data.getBlobStore());
    // 从 blobstore 中获取当前 topology 的 StormTopology 对象
    StormTopology rawTopology = StormConfig.read_nimbus_topology_code(topologyId, data.getBlobStore());
    // 追加一些系统组件到当前 topology 中
    StormTopology topology = Common.system_topology(stormConf, rawTopology);
    // 为当前 topology 生成 task 信息,key 是 taskId
    return Common.mkTaskInfo(stormConf, topology, topologyId);
}

方法首先会从 blobstore 中获取 topology 的配置信息和 StormTopology 对象,然后调用 Common#system_topology 方法添加一些系统组件,包括 acker-bolt、master-bolt,以及 system-bolt 等。

然后调用 Common#mkTaskInfo 方法为当前 topology 中的各个组件生成 task 分配信息。方法实现比较简单,返回的结果是一个 map 类型,其中 key 是 taskId,对于同一个组件来说为其分配的 taskId 是连续的,value 是对应的 TaskInfo 对象,包含两个字段:componentId 和 componentType。前者对应系统组件 ID 和用户自定义组件 ID,后者对应组件类型,也就是 bolt 和 spout。

完成了 topology 组件 task 分配信息的创建,接下来方法为当前任务创建对应的 TopologyAssignEvent 事件对象,并将事件添加到队列中,等待集群为其分配资源。这一过程位于 ServiceHandler#makeAssignment 方法中,等待的过程采用了 CountDownLatch 机制,count 值设置为 1,并设置 5 分钟上限等待集群分配资源,超时则返回 false 表示本次任务提交失败。

队列的维护和消费过程位于 TopologyAssign 类中,该类实现了 Runnable 接口,并以单例的形式对外提供服务。Nimbus 节点在启动的时候会创建并初始化 TopologyAssign 对象,并以守护线程的方式启动队列的消费过程。线程的 run 方法会循环的从队列头部以阻塞的方式获取对应的 TopologyAssignEvent 事件对象,并调用 TopologyAssign#doTopologyAssignment 方法为相应的 topology 创建任务分配信息(Assignment 对象)和基本运行信息(StormBase 对象),并将任务分配信息和基本运行信息写入 ZK,其中关键的资源分配过程位于 TopologyAssign#mkAssignment 方法中,实现如下:

public Assignment mkAssignment(TopologyAssignEvent event) throws Exception {
    String topologyId = event.getTopologyId();
    LOG.info("Determining assignment for " + topologyId);

    // 1. 基于配置和当前集群运行状态创建 topology 任务分配的上下文信息
    TopologyAssignContext context = this.prepareTopologyAssign(event);

    // 2. 依据当前的运行模式基于对应节点负载为当前 topology 中的 task 分配 worker
    Set<ResourceWorkerSlot> assignments;
    if (!StormConfig.local_mode(nimbusData.getConf())) {
        // 集群模式,获取模式的调度器
        ITopologyScheduler scheduler = schedulers.get(DEFAULT_SCHEDULER_NAME); // DefaultTopologyScheduler
        // 为当前 topology 中的 task 分配 worker
        assignments = scheduler.assignTasks(context);
    } else {
        // 本地模式
        assignments = mkLocalAssignment(context);
    }

    // 3. 记录任务分配信息到 ZK: assignments/${topology_id}
    Assignment assignment = null;
    if (assignments != null && assignments.size() > 0) {
        // 获取服务中的 supervisorId 及其 hostname 映射信息
        Map<String, String> nodeHost = getTopologyNodeHost(context.getCluster(), context.getOldAssignment(), assignments);
        // 获取 task 的启动时间:<taskId, start_second>
        Map<Integer, Integer> startTimes = getTaskStartTimes(
                context, nimbusData, topologyId, context.getOldAssignment(), assignments);

        String codeDir = (String) nimbusData.getConf().get(Config.STORM_LOCAL_DIR);

        assignment = new Assignment(codeDir, assignments, nodeHost, startTimes);

        //  the topology binary changed.
        if (event.isScaleTopology()) {
            assignment.setAssignmentType(Assignment.AssignmentType.ScaleTopology);
        }
        StormClusterState stormClusterState = nimbusData.getStormClusterState();
        // 写入 assignment 信息到 ZK: assignments/${topology_id}
        stormClusterState.set_assignment(topologyId, assignment);

        // update task heartbeat's start time
        NimbusUtils.updateTaskHbStartTime(nimbusData, assignment, topologyId);
        NimbusUtils.updateTopologyTaskTimeout(nimbusData, topologyId);

        LOG.info("Successfully make assignment for topology id " + topologyId + ": " + assignment);
    }
    return assignment;
}

TopologyAssign#mkAssignment 方法主要做了下面三件事情:

  1. 基于配置和当前集群运行状态为当前 topology 创建任务分配的上下文信息
  2. 依据当前的运行模式基于对应节点负载为当前 topology 中的 task 分配 worker
  3. 将 topology 的任务分配信息 Assignment 对象记录到 ZK 相应节点上

方法一开始会为当前 topology 创建任务分配的上下文信息 TopologyAssignContext 对象,该对象主要包含一下信息:

  • 当前 topology 的 topologyId 和 topologyMasterId
  • 当前 topology 对应的 StormTopology 对象
  • 配置信息,包括 nimbus 节点配置和 topology 配置
  • 当前集群可用的 supervisor 节点信息(不包含位于黑名单中的和已经死亡的)
  • 当前 topology 范围内所有 taskId 与其对应的组件 ID 之间的映射关系
  • 当前 topology 范围内所有 task 的状态信息
  • 其他信息,包括任务分配类型、老的任务分配信息、是否是 reassign,以及未停止的 worker 列表等

完成任务分配的上下文信息创建之后,storm 会基于该信息为当前 topology 分配 worker,集群模式下该过程的实现位于 DefaultTopologyScheduler#assignTasks 方法中,该方法会先计算需要分配的 worker 数目,然后分别为每个 worker 分配对应的 supervisor 节点,最后为 topology 范围内所有组件(包括系统组件)的 task 分配对应的 worker 进程。下面先来看一下 为 worker 分配 supervisor 节点的过程:

public List<ResourceWorkerSlot> getAvailableWorkers(
        DefaultTopologyAssignContext context, Set<Integer> needAssign, int allocWorkerNum) {

    // 1. 计算需要分配的 worker 数目
    int reserveWorkers = context.getReserveWorkerNum(); // 需要保留的 worker 数目
    int workersNum = this.getAvailableWorkersNum(context); // 当前集群总的可用的 worker 数目
    if ((workersNum - reserveWorkers) < allocWorkerNum) {
        // 没有足够的 worker 可以分配:可用 worker 数目 - 保留的 worker 数目 < 需要分配的数目
        throw new FailedAssignTopologyException("there's no enough worker. allocWorkerNum="
                + allocWorkerNum + ", availableWorkerNum=" + workersNum + ",reserveWorkerNum=" + reserveWorkers);
    }
    workersNum = allocWorkerNum;

    // 记录分配到的 worker
    List<ResourceWorkerSlot> assignedWorkers = new ArrayList<>();

    // 2. 分配 worker

    // 2.1 处理用户自定义分配的情况
    // 从 needAssign 中移除已经分配的 task,并记录分配的 worker 到 assignedWorkers 中
    this.getRightWorkers(context, needAssign, assignedWorkers, workersNum,
            // 获取用户自定义分配 worker slot 信息,排除状态为 unstopped 的 worker
            this.getUserDefineWorkers(context, ConfigExtension.getUserDefineAssignment(context.getStormConf())));

    if (ConfigExtension.isUseOldAssignment(context.getStormConf())) {
        // 2.2 如果配置指定要复用旧的分配,则优先从旧的分配中选出合适的 worker
        this.getRightWorkers(context, needAssign, assignedWorkers, workersNum, context.getOldWorkers());
    } else if (context.getAssignType() == TopologyAssignContext.ASSIGN_TYPE_REBALANCE && !context.isReassign()) {
        // 2.3 如果是 rebalance 任务分配类型,且可以复用原来的 worker 则将原来分配的 worker 记录下来
        int cnt = 0;
        for (ResourceWorkerSlot worker : context.getOldWorkers()) {
            if (cnt < workersNum) {
                ResourceWorkerSlot resFreeWorker = new ResourceWorkerSlot();
                resFreeWorker.setPort(worker.getPort());
                resFreeWorker.setHostname(worker.getHostname());
                resFreeWorker.setNodeId(worker.getNodeId());
                assignedWorkers.add(resFreeWorker);
                cnt++;
            } else {
                break;
            }
        }
    }

    LOG.info("Get workers from user define and old assignments: " + assignedWorkers);

    int restWorkerNum = workersNum - assignedWorkers.size(); // 还需要分配的 worker 数目
    if (restWorkerNum < 0) {
        throw new FailedAssignTopologyException(
                "Too many workers are required for user define or old assignments. " +
                        "workersNum=" + workersNum + ", assignedWorkersNum=" + assignedWorkers.size());
    }

    // 2.4 对于剩下需要的 worker,直接添加 ResourceWorkerSlot 实例对象
    for (int i = 0; i < restWorkerNum; i++) {
        assignedWorkers.add(new ResourceWorkerSlot());
    }

    /*
     * 3. 遍历将 worker 分配给相应的 supervisor
     * - 如果 worker 指定了 supervisor,则优先分配给指定 supervisor
     * - 依据 supervisor 的负载情况优先选择负载较低的进行分配
     */
    List<SupervisorInfo> isolationSupervisors = this.getIsolationSupervisors(context);
    if (isolationSupervisors.size() != 0) {
        this.putAllWorkerToSupervisor(assignedWorkers, this.getResAvailSupervisors(isolationSupervisors));
    } else {
        // 为 worker 分配对应的 supervisor
        this.putAllWorkerToSupervisor(assignedWorkers, this.getResAvailSupervisors(context.getCluster()));
    }
    this.setAllWorkerMemAndCpu(context.getStormConf(), assignedWorkers);
    LOG.info("Assigned workers=" + assignedWorkers);
    return assignedWorkers;
}

为 worker 分配 supervisor 节点的过程可以概括为:

  1. 计算需要分配的 worker 数目,如果可用的 worker 数目不满足要求则会抛出异常
  2. 为需要分配的 worker 创建 ResourceWorkerSlot 分配单元信息,主要分为四种情况:
    • 用户自定义 worker slot 分配
    • 配置指定复用旧的分配信息则优先从旧的分配中选出合适的 worker slot
    • 对于 rebalance 任务分配类型,如果允许则复用原来的 worker slot
    • 剩余情况,创建新的 work slot
  3. 为 worker 分配相应的 supervisor 节点

下面主要来看一下步骤 3,相应实现位于 WorkerScheduler#putAllWorkerToSupervisor 方法中:

private void putAllWorkerToSupervisor(List<ResourceWorkerSlot> assignedWorkers, List<SupervisorInfo> supervisors) {

    // 遍历处理 worker,如果指定了 supervisor,且 supervisor 存在空闲端口,则将其分配给该 supervisor
    for (ResourceWorkerSlot worker : assignedWorkers) {
        if (worker.getHostname() != null) {
            for (SupervisorInfo supervisor : supervisors) {
                // 如果当前 worker 对应的 hostname 是该 supervisor,且 supervisor 存在空闲的 worker
                if (NetWorkUtils.equals(supervisor.getHostName(), worker.getHostname())
                        && supervisor.getAvailableWorkerPorts().size() > 0) {
                    /*
                     * 基于当前 supervisor 信息更新对应的 worker 信息:
                     *  1. 保证 worker 对应的端口号是当前 supervisor 空闲的,否则选一个 supervisor 空闲的给 worker
                     *  2. 设置 worker 对应的 nodeId 为当前 supervisor 的 ID
                     */
                    this.putWorkerToSupervisor(supervisor, worker);
                    break;
                }
            }
        }
    }

    // 更新 supervisor 列表,移除没有空闲端口的 supervisor
    supervisors = this.getResAvailSupervisors(supervisors);

    // 对 supervisor 按照空闲端口数由大到小排序
    Collections.sort(supervisors, new Comparator<SupervisorInfo>() {

        @Override
        public int compare(SupervisorInfo o1, SupervisorInfo o2) {
            return -NumberUtils.compare(o1.getAvailableWorkerPorts().size(), o2.getAvailableWorkerPorts().size());
        }

    });

    /*
     * 按照 supervisor 的负载对 worker 进行分配:
     * 1. 优先选择负载较低的 supervisor 进分配
     * 2. 如果 supervisor 都已经过载但还有未分配的 worker,则从过载 supervisor 优先选择空闲端口较多的进行分配
     */
    this.putWorkerToSupervisor(assignedWorkers, supervisors);
}

如果 worker 指定了 supervisor 节点,则会将其分配给对应的 supervisor,对于剩余的 worker 来说会考虑 supervisor 节点的负载进行分配,以保证集群中 supervisor 负载的均衡性。Storm 依据集群中 supervisor 节点的平均空闲端口数作为标准来衡量 supervisor 节点的负载,如果一个 supervisor 节点的空闲端口数小于该值则认为该 supervisor 过载。集群负载均衡性的保证主要参考以下两个规则:

  1. 优先选择负载较低的 supervisor 节点进分配
  2. 如果 supervisor 节点都处于过载状态,但还有未分配的 worker,则从过载 supervisor 节点中优先选择空闲端口较多的节点进行分配

再来看一下为 task 分配 worker 进程的过程,实现位于 TaskScheduler#assign 方法中,该方法按照组件的类别分先后对 task 进行 worker 分配,顺序如下:

task.on.differ.node=true

方法实现如下:

public List<ResourceWorkerSlot> assign() {
    if (tasks.size() == 0) { // 没有需要再分配的任务
        assignments.addAll(this.getRestAssignedWorkers());
        return assignments;
    }

    // 1. 处理设置了 task.on.differ.node=true 的组件,为其在不同 supervisor 节点上分配 worker
    Set<Integer> assignedTasks = this.assignForDifferNodeTask();

    // 2. 为剩余 task 分配 worker,不包含系统组件
    tasks.removeAll(assignedTasks);
    Map<Integer, String> systemTasks = new HashMap<>();
    for (Integer task : tasks) {
        String name = context.getTaskToComponent().get(task);
        if (Common.isSystemComponent(name)) {
            systemTasks.put(task, name);
            continue;
        }
        this.assignForTask(name, task);
    }

    // 3. 为系统组件 task 分配 worker, e.g. acker, topology master...
    for (Entry<Integer, String> entry : systemTasks.entrySet()) {
        this.assignForTask(entry.getValue(), entry.getKey());
    }

    // 记录所有分配了 task 的 worker 集合
    assignments.addAll(this.getRestAssignedWorkers());
    return assignments;
}

对于设置了 task.on.differ.node=true 的组件,要求名下的 task 需要运行在不同的 supervisor 节点上,所以需要优先进行分配,否则如果一些 supervisor 因为配额已满从资源池移除之后,很可能导致没有足够多的 supervisor 节点来满足此类组件的 task 分配需求。

对于这三类组件的 task 分配过程基本过程类似,基本流程可以概括如下:

  1. 基于多重选择器为当前 task 选择最优 worker 进行分配
  2. 将 task 加入到被分配 worker 的 task 列表,并更新 worker 持有的 task 数目
  3. 检查当前 worker 分配的 task 数目,如果配额已满则将其从资源池移除,不再分配新的 task
  4. 更新 task 所属组件分配在指定 worker 上的 task 数目

完成了 task 到 worker,以及 worker 到 supervisor 的配置关系,也就相当于完成了对当前 topology 的任务分配过程,紧接着 storm 会将任务分配信息记录到 ZK 对应的任务分配路径下面。需要清楚的一点是当前的分配还只是一个方案,storm 集群并没有开始真正执行当前 topology,如果需要真正启动方案的执行,storm 还需要调度各个 supervisor 节点按照方案启动相应的 worker 进程,并在每个 worker 进程上启动相应数量的线程来执行 task,相应过程我们后面会逐一进行分析。

(本篇完)


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Apache Tomcat 6高级编程

Apache Tomcat 6高级编程

Vivek Chopra、Sing Li、Jeff Genender / 人民邮电出版社 / 2009-3 / 79.00元

《Apache Tomcat 6高级编程》全面介绍了安装、配置和运行Apache Tomcat服务器的知识。书中不仅提供了配置选项的逐行分析,还探究了Tomcat的特性和功能,可以帮助读者解决出现在系统管理的各个阶段的各种问题,包括共享主机、安全、系统测试和性能测试及调优。 《Apache Tomcat 6高级编程》重点讲解Tomcat 6的应用知识。从基本的Tomcat和Web应用程序配置......一起来看看 《Apache Tomcat 6高级编程》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试