JStorm 源码解析:worker 的启动和运行机制

栏目: 编程工具 · 发布时间: 6年前

内容简介:上一篇我们分析了 supervisor 节点的启动和运行过程,提及到 supervisor 的核心工作就是基于 ZK 从 nimbus 节点领取分配给它的任务,并启动 worker 执行。一个 worker 就是一个 JVM 进程,运行在 supervisor 节点上,多个 task 可以同时运行在一个 worker 进程之中,每个 task 都对应一个线程。Worker 进程的启动位于 Worker 类中,前面我们在分析 supervisor 节点的启动过程时提及到了对于 Worker 类 main 函

上一篇我们分析了 supervisor 节点的启动和运行过程,提及到 supervisor 的核心工作就是基于 ZK 从 nimbus 节点领取分配给它的任务,并启动 worker 执行。一个 worker 就是一个 JVM 进程,运行在 supervisor 节点上,多个 task 可以同时运行在一个 worker 进程之中,每个 task 都对应一个线程。

Worker 进程的启动位于 Worker 类中,前面我们在分析 supervisor 节点的启动过程时提及到了对于 Worker 类 main 函数的触发,supervisor 在启动相应 worker 进程时会指定 topologyId、supervisorId、workerPort、workerId,以及 classpath 等参数,worker 在拿到这些参数之后会先获取当前机器上端口对应的老进程,并逐一 kill 掉,然后调用 Worker#mk_worker 方法创建并启动对应的 worker 实例,该方法的核心实现如下:

Worker w = new Worker(conf, context, topologyId, supervisorId, port, workerId, jarPath);
return w.execute();

Worker 类仅包含一个实例属性 WorkerData,它封装了所有与 worker 运行相关的属性,实例化 Worker 对象的过程也是初始化 WorkerData 属性的过程,该过程主要包含以下工作:

workers/${worker_id}/pids

初始化完成之后会调用 Worker#execute 方法创建并启动 worker 进程,该方法主要的执行流程可以概括如下:

  1. 为当前 worker 创建并启动一个 socket 连接,用于接收消息并分发给名下的 task 线程
  2. 启动一个线程用于维护当前 worker 状态变更时,更新与其它 worker 之间的连接关系
  3. 启动一个线程用于定期获取当前 topology 在 ZK 上的基本信息,当 topology 状态发生变更时触发本地相应操作
  4. 启动一个线程循环消费当前 worker 的 tuple 队列发送给对应的下游 task 线程
  5. 启动一个线程用于定期更新本地的 worker 心跳信息
  6. 创建并启动当前 worker 下所有的 task 任务

方法实现如下:

public WorkerShutdown execute() throws Exception {
    List<AsyncLoopThread> threads = new ArrayList<>();

    // 1. 为 worker 创建一个 socket 连接,接收和分发消息给对应的 task
    AsyncLoopThread controlRvThread = this.startDispatchThread();
    threads.add(controlRvThread);

    // 2. 创建线程用于在 worker 关闭或者新启动时更新与其他 worker 之间的连接信息
    RefreshConnections refreshConn = this.makeRefreshConnections();
    AsyncLoopThread refreshConnLoopThread = new AsyncLoopThread(refreshConn, false, Thread.MIN_PRIORITY, true);
    threads.add(refreshConnLoopThread);

    // 3. 获取 topology 在 ZK 上的状态,当状态发生变更时更新本地 task 状态
    RefreshActive refreshZkActive = new RefreshActive(workerData);
    AsyncLoopThread refreshZk = new AsyncLoopThread(refreshZkActive, false, Thread.MIN_PRIORITY, true);
    threads.add(refreshZk);

    // 4. 创建一个线程循环消费 tuple 队列发送给对应的下游 task
    DrainerCtrlRunnable drainerCtrlRunnable = new DrainerCtrlRunnable(workerData, MetricDef.SEND_THREAD);
    AsyncLoopThread controlSendThread = new AsyncLoopThread(drainerCtrlRunnable, false, Thread.MAX_PRIORITY, true);
    threads.add(controlSendThread);

    // Sync heartbeat to Apsara Container
    AsyncLoopThread syncContainerHbThread = SyncContainerHb.mkWorkerInstance(workerData.getStormConf());
    if (syncContainerHbThread != null) {
        threads.add(syncContainerHbThread);
    }

    JStormMetricsReporter metricReporter = new JStormMetricsReporter(workerData);
    metricReporter.init();
    workerData.setMetricsReporter(metricReporter);

    // 5. 更新本地心跳信息
    RunnableCallback heartbeatFn = new WorkerHeartbeatRunnable(workerData);
    AsyncLoopThread hb = new AsyncLoopThread(heartbeatFn, false, null, Thread.NORM_PRIORITY, true);
    threads.add(hb);

    // 6. 创建并启动当前 worker 下所有的 task
    List<TaskShutdownDaemon> shutdownTasks = this.createTasks();
    workerData.setShutdownTasks(shutdownTasks);

    List<AsyncLoopThread> serializeThreads = workerData.setSerializeThreads();
    threads.addAll(serializeThreads);
    List<AsyncLoopThread> deserializeThreads = workerData.setDeserializeThreads();
    threads.addAll(deserializeThreads);

    return new WorkerShutdown(workerData, threads);
}

一. 消息接收与分发

Storm 会为 worker 基于 Netty 创建并返回一个 socket 连接用于接收消息,同时 worker 与名下所有 task 之间会维持一个传输队列,并启动一个线程循环消费接收到的消息投递给对应 task 的传输队列中。该过程位于 Worker#startDispatchThread 方法中,该方法实现如下(去掉了一些非关键代码):

private AsyncLoopThread startDispatchThread() {
    IContext context = workerData.getContext(); // 获取消息上下文:NettyContext
    String topologyId = workerData.getTopologyId();

    // 1. 创建一个接收消息的消息队列(disruptor)
    Map stormConf = workerData.getStormConf();
    long timeout = JStormUtils.parseLong(stormConf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT), 10); // 默认 10ms
    WaitStrategy waitStrategy = new TimeoutBlockingWaitStrategy(timeout, TimeUnit.MILLISECONDS); // 10ms
    int queueSize = JStormUtils.parseInt(stormConf.get(Config.TOPOLOGY_CTRL_BUFFER_SIZE), 256);
    DisruptorQueue recvControlQueue = DisruptorQueue.mkInstance("Dispatch-control", ProducerType.MULTI, queueSize, waitStrategy, false, 0, 0);

    // 2. 为当前 worker 基于 Netty 创建并返回一个 Socket 连接用于接收消息
    IConnection recvConnection = context.bind(
            topologyId, workerData.getPort(), workerData.getDeserializeQueues(), recvControlQueue, false, workerData.getTaskIds());
    workerData.setRecvConnection(recvConnection);

    // 3. 启动一个线程循环消费 worker 接收到的消息,并应用 DisruptorRunnable.onEvent 方法,
    //    最终调用的是 VirtualPortCtrlDispatch.handleEvent 方法,将消息投递给指定 task 的消息队列
    RunnableCallback recvControlDispatcher = new VirtualPortCtrlDispatch(
            workerData, recvConnection, recvControlQueue, MetricDef.RECV_THREAD);
    return new AsyncLoopThread(recvControlDispatcher, false, Thread.MAX_PRIORITY, true);
}

这里的消息队列底层都依赖于 disruptor 实现,最终对于接收到的消息都会调用 VirtualPortCtrlDispatch#handleEvent 方法进行处理:

public void handleEvent(Object event, boolean endOfBatch) throws Exception {
    TaskMessage message = (TaskMessage) event;
    int task = message.task(); // 获取当前消息对应的 taskId

    // 消息反序列化
    Object tuple = null;
    try {
        // there might be errors when calling update_topology
        tuple = this.deserialize(message.message(), task);
    } catch (Throwable e) {
        if (Utils.exceptionCauseIsInstanceOf(KryoException.class, e)) {
            throw new RuntimeException(e);
        }
        LOG.warn("serialize msg error", e);
    }

    // 获取 taskId 对应的消息通道
    DisruptorQueue queue = controlQueues.get(task);
    if (queue == null) {
        LOG.warn("Received invalid control message for task-{}, Dropping...{} ", task, tuple);
        return;
    }
    if (tuple != null) {
        // 将消息投递给对应的 task 传输队列
        queue.publish(tuple);
    }
}

二. 创建并启动用于维护 worker 之间连接关系的线程

在这一步会创建一个 RefreshConnections 对象,它继承了 RunnableCallback 类,所以同样是被异步循环线程模型接管(按照指定间隔循环调用其 RefreshConnections#run 方法),storm 会定期检测 ZK 上的 topology 任务分配信息是否有更新,如果有比本地更新的任务分配(依赖于任务分配时间戳进行判定)则会判断新任务分配的类型来相应的更新本地的信息。

如果当前的任务分配类型仅仅是更新集群上已有的 topology,则 storm 会遍历通知各个 task 执行相应的更新操作,同时会回调已注册的所有更新监听器以更新配置信息,实现如下:

// 当前任务分配已经更新且是更新 topology 操作,则通知所有的 task
List<TaskShutdownDaemon> taskShutdowns = workerData.getShutdownTasks();
Map newConf = StormConfig.read_supervisor_topology_conf(conf, topologyId);
workerData.getStormConf().putAll(newConf);
for (TaskShutdownDaemon taskSD : taskShutdowns) {
    // 通知所有的 task
    taskSD.update(newConf);
}
// disable/enable metrics on the fly
workerData.getUpdateListener().update(newConf); // 回调更新监听器,更新配置
workerData.setAssignmentType(AssignmentType.UpdateTopology);

如果当前是更新以外的任务分配类型(Assign、ScaleTopology),则 storm 会从新的任务分配信息中分别获取新增的、待删除的,以及需要更新的 taskId 列表,并执行相应的创建、删除,以及更新 task 操作,同时会更新 worker 上所有 task 的下游 task 列表信息。部分代码实现如下:

// 获取新增的 taskId 列表
Set<Integer> addedTasks = this.getAddedTasks(assignment);
// 获取待删除的 taskId 列表
Set<Integer> removedTasks = this.getRemovedTasks(assignment);
// 获取待更新的 taskId 列表
Set<Integer> updatedTasks = this.getUpdatedTasks(assignment);

// 基于新任务分配信息更新 workerData
workerData.updateWorkerData(assignment);
workerData.updateKryoSerializer();

// 关闭需要移除的 task
this.shutdownTasks(removedTasks);
// 创建新增的 task
this.createTasks(addedTasks);
// 更新已有需要被更新的 task
this.updateTasks(updatedTasks);

// 更新当前 worker 上所有 task 的下游 task 列表信息
Set<Integer> tmpOutboundTasks = Worker.worker_output_tasks(workerData);
if (!outboundTasks.equals(tmpOutboundTasks)) {
    for (int taskId : tmpOutboundTasks) {
        if (!outboundTasks.contains(taskId)) {
            workerData.addOutboundTaskStatusIfAbsent(taskId);
        }
    }
    for (int taskId : workerData.getOutboundTaskStatus().keySet()) {
        if (!tmpOutboundTasks.contains(taskId)) {
            workerData.removeOutboundTaskStatus(taskId);
        }
    }
    workerData.setOutboundTasks(tmpOutboundTasks);
    outboundTasks = tmpOutboundTasks;
}
workerData.setAssignmentType(AssignmentType.Assign);

三. 创建并启动定期获取 topology 基本信息的线程

在这一步会创建一个 RefreshActive 对象,它同样继承了 RunnableCallback 类,所以同样也是被异步循环线程模型接管(按照指定间隔循环调用其 RefreshActive#run 方法),storm 会定期获取当前 topology 在 ZK 上的基本信息,当 topology 状态发生变更时触发本地执行相应的操作。

如果 topology 状态信息变为 active、upgrading,或者 rollback 时,storm 会依次将本地 task 的状态设置为 TaskStatus.RUN ,如果当前 task 对应的组件是 spout,则会触发 ISpout#activate 方法。如果当前 topology 状态不为 inactive 时,storm 会依次将本地的 task 状态设置为 TaskStatus.PAUSE ,如果当前 task 对应的组件是 spout,则会触发 ISpout#deactivate 方法。最后更新本地记录的 topology 状态。相关实现如下:

if (newTopologyStatus.equals(StatusType.active) // 激活
        || newTopologyStatus.equals(StatusType.upgrading) // 灰度
        || newTopologyStatus.equals(StatusType.rollback)) { // 回滚
    for (TaskShutdownDaemon task : tasks) {
        if (task.getTask().getTaskStatus().isInit()) {
            task.getTask().getTaskStatus().setStatus(TaskStatus.RUN);
        } else {
            task.active();
        }
    }
} else if (oldTopologyStatus == null || !oldTopologyStatus.equals(StatusType.inactive)) {
    for (TaskShutdownDaemon task : tasks) {
        if (task.getTask().getTaskStatus().isInit()) {
            task.getTask().getTaskStatus().setStatus(TaskStatus.PAUSE);
        } else {
            task.deactive();
        }
    }
}
workerData.setTopologyStatus(newTopologyStatus);

四. 创建并启动循环消费 worker tuple 队列的线程

在这一步会创建一个 DrainerCtrlRunnable 对象,它同样继承了 RunnableCallback 类,所以同样也是被异步循环线程模型接管(按照指定间隔循环调用其 DrainerCtrlRunnable#run 方法),storm 会循环消费当前 worker 的 tuple 队列 transferCtrlQueue,并最终调用 DrainerCtrlRunnable#handleEvent 方法对拿到的消息进行处理,该方法的实现如下:

public void handleEvent(Object event, boolean endOfBatch) throws Exception {
    if (event == null) {
        return;
    }
    ITupleExt tuple = (ITupleExt) event;
    int targetTask = tuple.getTargetTaskId();

    // 获取与下游 task 的连接
    IConnection conn = this.getConnection(targetTask);
    if (conn != null) {
        byte[] tupleMessage = null;
        try {
            // there might be errors when calling update_topology
            tupleMessage = this.serialize(tuple); // 序列化数据
        } catch (Throwable e) {
            // 省略异常处理
        }
        // 基于 netty 发送数据
        TaskMessage message = new TaskMessage(TaskMessage.CONTROL_MESSAGE, targetTask, tupleMessage);
        conn.sendDirect(message);
    }
}

方法的逻辑比较简单,拿到当前 tuple 对应的下游 taskId,然后与之建立连接(netty)并将 tuple 发送给它。

五. 创建并启动当前 worker 下所有的 task 线程

方法 Worker#createTasks 用于为当前 worker 下的所有 task 任务创建一个 Task 对象,并为每个 task 启动一个线程执行,同时为每个 task 任务创建一个 TaskShutdownDaemon 对象用于管理对应的 task 线程,方法的实现如下:

private List<TaskShutdownDaemon> createTasks() throws Exception {
    List<TaskShutdownDaemon> shutdownTasks = new ArrayList<>();

    // 获取当前 worker 下所有的 taskId
    Set<Integer> taskIds = workerData.getTaskIds();

    Set<Thread> threads = new HashSet<>();
    List<Task> taskArrayList = new ArrayList<>();
    for (int taskId : taskIds) {
        // 创建并启动 task
        Task task = new Task(workerData, taskId);
        Thread thread = new Thread(task);
        threads.add(thread);
        taskArrayList.add(task);
        thread.start(); // 启动 task
    }
    for (Thread thread : threads) {
        thread.join();
    }
    for (Task t : taskArrayList) {
        shutdownTasks.add(t.getTaskShutdownDameon());
    }
    return shutdownTasks;
}

Task 类实现了 Runnable 接口,其 run 方法中简单调用了 Task#execute 方法,该方法首先会向系统 bolt 发送一条“startup”消息,然后依据当前的组件类型创建对应的任务执行器,创建的过程位于 Task#mkExecutor 方法中:

public BaseExecutors mkExecutor() {
    BaseExecutors baseExecutor = null;

    if (taskObj instanceof IBolt) {
        if (taskId == topologyContext.getTopologyMasterId()) {
            baseExecutor = new TopologyMasterBoltExecutors(this);
        } else {
            baseExecutor = new BoltExecutors(this);
        }
    } else if (taskObj instanceof ISpout) {
        if (this.isSingleThread(stormConf)) {
            baseExecutor = new SingleThreadSpoutExecutors(this);
        } else {
            baseExecutor = new MultipleThreadSpoutExecutors(this);
        }
    }

    return baseExecutor;
}

BaseExecutors 类是一个 RunnableCallback 类,所以其 run 方法会被异步循环调用。继承自 BaseExecutors 类有 5 个(如下),而 Task#mkExecutor 方法基于组件类型分别选择了相应的实现类进行实例化。

  • BoltExecutors
  • TopologyMasterBoltExecutors
  • SpoutExecutors
  • SingleThreadSpoutExecutors
  • MultipleThreadSpoutExecutors

先来看一下 BoltExecutors 和 TopologyMasterBoltExecutors,这是 bolt 组件的任务执行器,其中 TopologyMasterBoltExecutors 继承自 BoltExecutors,所以接下来我们主要来看一下 BoltExecutors 的实现。BoltExecutors 类的 run 方法实现如下:

public void run() {
    if (!isFinishInit) {
        // 执行初始化操作,主要是调用了 IBolt.prepare 方法
        this.initWrapper();
    }
    while (!taskStatus.isShutdown()) {
        try {
            // 循环消费当前 task 的消息队列
            this.consumeExecuteQueue();
        } catch (Throwable e) {
            // 省略异常处理逻辑
        }
    }
}

方法首先会判定是否完成了初始化操作,如果未完成则会调用 BaseExecutors#initWrapper 执行初始化,这期间主要是调用了 IBolt#prepare 方法,这也是我们在实现一个 bolt 时执行初始化的方法。如果当前 task 线程没有被销毁,则会一直循环调用 BoltExecutors#consumeExecuteQueue 消费当前 task 的消息队列。前面的分析我们知道 worker 会对接收到的消息按照 taskId 投递给对应 task 的消息队列,而消息队列的消费过程就在这里发生。针对接收到消息会逐条进行处理,这里最终调用的是 BoltExecutors#onEvent 方法,处理的消息就是我们熟悉的 Tuple 对象,而该方法的核心就是调用 IBolt#execute 方法,也就是调用用户自定义的策略对收到的 tuple 进行处理。

再来看一下 SingleThreadSpoutExecutors 和 MultipleThreadSpoutExecutors,这两类都继承自 SpoutExecutors 类,区别仅在于对于消息的附加处理和正常的业务逻辑是否位于同一个线程中,而核心逻辑都是调用 ISpout#nextTuple 方法,也就是执行用户自定义的业务逻辑。

针对 worker 的运行机制就分析到这里,但是 storm 对于消息的处理并没有结束,下一篇我们将一起探寻 ack 机制,看看 storm 如何保证消息至少被执行一次(at least once)。

(本篇完)

转载声明 : 版权所有,商业转载请联系作者,非商业转载请注明出处

本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

数据库索引设计与优化

数据库索引设计与优化

【美】Tapio Lahdenmaki、【美】Michael Leach / 曹怡倩、赵建伟 / 电子工业出版社 / 2015-6 / 79.00元

《数据库索引设计与优化》提供了一种简单、高效、通用的关系型数据库索引设计方法。作者通过系统的讲解及大量的案例清晰地阐释了关系型数据库的访问路径选择原理,以及表和索引的扫描方式,详尽地讲解了如何快速地估算SQL 运行的CPU 时间及执行时间,帮助读者从原理上理解SQL、表及索引结构、访问方式等对关系型数据库造成的影响,并能够运用量化的方法进行判断和优化,指导关系型数据库的索引设计。 《数据库索......一起来看看 《数据库索引设计与优化》 这本书的介绍吧!

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具