内容简介:对一个 topology,JStorm 最终会调度成一个或多个 worker,每个 worker 即为一个真正的操作系统执行进程,分布到一个集群的一台或者多台机器上并行执行。 而每个 worker 中,又可以有多个 task,分别代表一个执行线程。每个 task 就是上面提到的组件(component)的实现,要么是 spout 要么是 bolt 。如上图所示, worker 是一个独立 JVM 的进程, 它其实是由 Supervisor 通过命令行执行 Worker#main 方法来启动. worker
对一个 topology,JStorm 最终会调度成一个或多个 worker,每个 worker 即为一个真正的操作系统执行进程,分布到一个集群的一台或者多台机器上并行执行。 而每个 worker 中,又可以有多个 task,分别代表一个执行线程。每个 task 就是上面提到的组件(component)的实现,要么是 spout 要么是 bolt 。
如上图所示, worker 是一个独立 JVM 的进程, 它其实是由 Supervisor 通过命令行执行 Worker#main 方法来启动. worker 进程内部, 运行着许多线程, 包括: Task 线程、序列化/反序列化线程等. 其对应的代码为: com.alibaba.jstorm.daemon.worker.Worker
注意: Storm 与 JStorm 的 worker 模型有所不同, JStorm 移除了 executor 的概念, 详见“ JStorm 中 task 与 executor 的关系 ”.
启动
worker 有两种被启动的方式, 这个在 supervisor 一节已经提到.
- (本地调试) 调用 mk_worker 方法来启动
- (线上部署) 通过命令行调用 main, 在新的 JVM 中启动 Worker
在 mk_worker 中, 执行了如下操作:
- 打日志( sb 这个变量名我很喜欢)
- new Worker ,并设置必要的属性 ( 这里设置的属性真的太多了, 得再开一个章节细讲 ). 总的来说是以下事情:
- 算出当前worker会启动的task
- todo
- 执行 execute 在 execute 方法内, worker 完成了初始化和启动 task 的工作, 主要做了以下事情:
- worker 之间互连并启动一个线程监控变化,如果worker任务变更会与启停 worker 重连
- 监控 topology 是否 active 并将这个状态赋给 storm-active-atom 变量,task 根据这个变量决定是否调用 spout 的 nextTuple
- worker 启动线程来执行具体的 tasks
让我们详细看一下 execute 方法内部
在第一步中: 创建并启动分发控制信息的线程
// create recv connection, reduce the count of netty client reconnect AsyncLoopThread controlRvthread = startDispatchThread(); threads.add(controlRvthread);
- 使用Disruptor的队列建立了一个属于worker的 recvControlQueue . 不过到目前为止,没有足够的信息告诉我们这个队列是在worker的生命周期中起到什么作用的.
- 对recvControlQueue 的性能指标进行跟踪( 使用dropwizard/metrics,详见jstorm metric )
- 使用netty创建了一个server, 对指定端口进行监听. NettyServer 会将收到的消息放入 recvControlQueue 和 deserializeQueues ( 忘了 deserializeQueues 是啥? 看下这篇“tuple 在 整个拓扑中的流转过程” )中
- 新建并启动了一个 VirtulPortCtrlDispatch , 这又是个啥东西呢. 似乎只是用来在启动的过程中, 向各个 task 发送控制信息. 上面所述的 recvControlQueue 中的元素会按照taskId 被分发到 VirtulPortCtrlDispatch 持有的 controlQueues 存放的task的 ctrlQueue 中. 也就是 recvControlQueue 中装的东东会被放到 VirtulPortCtrlDispatch.controlQueues 的 ctrlQueue中.那么问题来了, recvControlQueue 中的元素又是哪来的呢? (这个控制信息猜测是后续的)
在第二步中: 创建并启动”维护task之间连接”的线程
创建了一个 RefreshConnections .
为了配置这个类,需要从上下文中取出本次拓扑的拓扑结构,以及当前 worker 将会启动的 task 的taskid 列表 然后通过这两个信息算出当前 worker 会和那些 task 进行连接,不区分是本 worker 和其他 worker 主要工作: 该类会在 worker 的运行过程中,定时去做下面这些事: 维护 task 之间的通信(目前使用的是 netty ):包括新建连接,移除旧的连接。(因为 worker 是可能挂掉掉并重启的,此时 task 之间的链接就需要重新维护) 其中会去计算出 task 的下游 task 在哪个 worker 中. 然后启动 NettyClient 去 connect 下游 worke r 的 port.
在第三步中:创建并启动维护zk状态的线程
在第四步中:创建并启动发送控制信息的线程(DrainerCtrlRunable)这名字取得挺有意思的, 排水控制?
主要工作:
//从transferCtrlQueue里消费 super(workerData.getTransferCtrlQueue (), idStr)
这家伙会发送控制信息
TaskMessage message = new TaskMessage(TaskMessage.CONTROL_MESSAGE, targetTask, tupleMessage); conn.sendDirect(message);
第五步中:创建并启动心跳维护线程
??维护心跳,谁的心跳?怎么维护?为什么要维护?
第六步中:为worker创建一个metric统计的类
(如果后续datacore想要维护好性能的话,可以考虑加入这个东西来统计各个因子的数据。但是这个对代码改动还挺多的,因此还是看情况吧)
第七步中: 创建并启动task线程. (终于创建了task)
这里已经创建并启动了 task 线程.
第八步中:创建并启动n个序列化,反序列化线程。
这个n还挺讲究的,有专门的算法,代码先贴在下面,可以考虑下为啥要这么计算
最后一步: 初始化步骤结束
经过哐哐哐一通创建,现在有了好多个不同的线程,这些线程被塞到workData里,然后一起返回给了 mk_worker 方法的调用者。 sd.join是说,当worker所创建的所有线程都运行结束后,worker线程才结束.
那么到这里,worker已经完成了所有启动需要的操作.
运行时
todo
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- Spark 源码系列(三)作业运行过程
- 简单梳理Redux的源码与运行机制
- Kafka 源码解析:生产者运行机制
- Kafka 源码解析:消费者运行机制
- Composer 的 Autoload 源码实现——注册与运行
- JStorm 源码分析 - Task 的启动与运行
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Mathematica演示项目笔记
吴飞 / 清华大学出版社 / 2010-7 / 39.00元
Mathematica是由美国科学家斯蒂芬·沃尔夫勒姆(Stephen Wolfram)领导的Wolfram Research Inc.研究公司所开发的一体化计算引擎。《Mathematica演示项目笔记》结合Mathematica演示项目以及第6版和第7版的最新功能(动态交互性、即时三维图形、数值模拟和仿真、高效实时计算、集成语言系统、多核并行计算和数字图像处理等),讲解了符号计算、程序设计、算......一起来看看 《Mathematica演示项目笔记》 这本书的介绍吧!