内容简介:对一个 topology,JStorm 最终会调度成一个或多个 worker,每个 worker 即为一个真正的操作系统执行进程,分布到一个集群的一台或者多台机器上并行执行。 而每个 worker 中,又可以有多个 task,分别代表一个执行线程。每个 task 就是上面提到的组件(component)的实现,要么是 spout 要么是 bolt 。如上图所示, worker 是一个独立 JVM 的进程, 它其实是由 Supervisor 通过命令行执行 Worker#main 方法来启动. worker
对一个 topology,JStorm 最终会调度成一个或多个 worker,每个 worker 即为一个真正的操作系统执行进程,分布到一个集群的一台或者多台机器上并行执行。 而每个 worker 中,又可以有多个 task,分别代表一个执行线程。每个 task 就是上面提到的组件(component)的实现,要么是 spout 要么是 bolt 。
如上图所示, worker 是一个独立 JVM 的进程, 它其实是由 Supervisor 通过命令行执行 Worker#main 方法来启动. worker 进程内部, 运行着许多线程, 包括: Task 线程、序列化/反序列化线程等. 其对应的代码为: com.alibaba.jstorm.daemon.worker.Worker
注意: Storm 与 JStorm 的 worker 模型有所不同, JStorm 移除了 executor 的概念, 详见“ JStorm 中 task 与 executor 的关系 ”.
启动
worker 有两种被启动的方式, 这个在 supervisor 一节已经提到.
- (本地调试) 调用 mk_worker 方法来启动
- (线上部署) 通过命令行调用 main, 在新的 JVM 中启动 Worker
在 mk_worker 中, 执行了如下操作:
- 打日志( sb 这个变量名我很喜欢)
- new Worker ,并设置必要的属性 ( 这里设置的属性真的太多了, 得再开一个章节细讲 ). 总的来说是以下事情:
- 算出当前worker会启动的task
- todo
- 执行 execute 在 execute 方法内, worker 完成了初始化和启动 task 的工作, 主要做了以下事情:
- worker 之间互连并启动一个线程监控变化,如果worker任务变更会与启停 worker 重连
- 监控 topology 是否 active 并将这个状态赋给 storm-active-atom 变量,task 根据这个变量决定是否调用 spout 的 nextTuple
- worker 启动线程来执行具体的 tasks
让我们详细看一下 execute 方法内部
在第一步中: 创建并启动分发控制信息的线程
// create recv connection, reduce the count of netty client reconnect AsyncLoopThread controlRvthread = startDispatchThread(); threads.add(controlRvthread);
- 使用Disruptor的队列建立了一个属于worker的 recvControlQueue . 不过到目前为止,没有足够的信息告诉我们这个队列是在worker的生命周期中起到什么作用的.
- 对recvControlQueue 的性能指标进行跟踪( 使用dropwizard/metrics,详见jstorm metric )
- 使用netty创建了一个server, 对指定端口进行监听. NettyServer 会将收到的消息放入 recvControlQueue 和 deserializeQueues ( 忘了 deserializeQueues 是啥? 看下这篇“tuple 在 整个拓扑中的流转过程” )中
- 新建并启动了一个 VirtulPortCtrlDispatch , 这又是个啥东西呢. 似乎只是用来在启动的过程中, 向各个 task 发送控制信息. 上面所述的 recvControlQueue 中的元素会按照taskId 被分发到 VirtulPortCtrlDispatch 持有的 controlQueues 存放的task的 ctrlQueue 中. 也就是 recvControlQueue 中装的东东会被放到 VirtulPortCtrlDispatch.controlQueues 的 ctrlQueue中.那么问题来了, recvControlQueue 中的元素又是哪来的呢? (这个控制信息猜测是后续的)
在第二步中: 创建并启动”维护task之间连接”的线程
创建了一个 RefreshConnections .
为了配置这个类,需要从上下文中取出本次拓扑的拓扑结构,以及当前 worker 将会启动的 task 的taskid 列表 然后通过这两个信息算出当前 worker 会和那些 task 进行连接,不区分是本 worker 和其他 worker 主要工作: 该类会在 worker 的运行过程中,定时去做下面这些事: 维护 task 之间的通信(目前使用的是 netty ):包括新建连接,移除旧的连接。(因为 worker 是可能挂掉掉并重启的,此时 task 之间的链接就需要重新维护) 其中会去计算出 task 的下游 task 在哪个 worker 中. 然后启动 NettyClient 去 connect 下游 worke r 的 port.
在第三步中:创建并启动维护zk状态的线程
在第四步中:创建并启动发送控制信息的线程(DrainerCtrlRunable)这名字取得挺有意思的, 排水控制?
主要工作:
//从transferCtrlQueue里消费 super(workerData.getTransferCtrlQueue (), idStr)
这家伙会发送控制信息
TaskMessage message = new TaskMessage(TaskMessage.CONTROL_MESSAGE, targetTask, tupleMessage); conn.sendDirect(message);
第五步中:创建并启动心跳维护线程
??维护心跳,谁的心跳?怎么维护?为什么要维护?
第六步中:为worker创建一个metric统计的类
(如果后续datacore想要维护好性能的话,可以考虑加入这个东西来统计各个因子的数据。但是这个对代码改动还挺多的,因此还是看情况吧)
第七步中: 创建并启动task线程. (终于创建了task)
这里已经创建并启动了 task 线程.
第八步中:创建并启动n个序列化,反序列化线程。
这个n还挺讲究的,有专门的算法,代码先贴在下面,可以考虑下为啥要这么计算
最后一步: 初始化步骤结束
经过哐哐哐一通创建,现在有了好多个不同的线程,这些线程被塞到workData里,然后一起返回给了 mk_worker 方法的调用者。 sd.join是说,当worker所创建的所有线程都运行结束后,worker线程才结束.
那么到这里,worker已经完成了所有启动需要的操作.
运行时
todo
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- Spark 源码系列(三)作业运行过程
- 简单梳理Redux的源码与运行机制
- Kafka 源码解析:生产者运行机制
- Kafka 源码解析:消费者运行机制
- Composer 的 Autoload 源码实现——注册与运行
- JStorm 源码分析 - Task 的启动与运行
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Don't Make Me Think
Steve Krug / New Riders Press / 18 August, 2005 / $35.00
Five years and more than 100,000 copies after it was first published, it's hard to imagine anyone working in Web design who hasn't read Steve Krug's "instant classic" on Web usability, but people are ......一起来看看 《Don't Make Me Think》 这本书的介绍吧!