内容简介:本篇主要介绍task的创建与运行过程和worker的启动方式类似 (JStorm对初始化方法和启动方法使用了相同风格的命名, 减少了理解成本 ^_^)
本篇主要介绍task的创建与运行过程
文章开头, 先抛出一些疑问:
- 为什么TaskTransfer、TaskReceiver 要在初始化Task的时候创建, 为什么不在Worker里直接创建好? 这么做有什么好处呢?
- TaskTransfer/Receiver 与 Task 是1对1的关系么? 如果是,为什么? 如果不是,那设置了怎样的值, 为什么?
- start up bolt 发给系统 bolt 是做什么用的?
- 至于在创建拓扑时的并行度等属性, 在提交拓扑时(?确定是这个时候么?)就已经分配好了. 例如 BoltA 并行度为 3, 那么会创建3个Task对象, 每个对象都持有BoltA? 而不是一个Task 持有多个 Executor ,每个Executor使用执行bolt的业务代码 ?
- 为什么创建executor时,要传递当前task进去,如 baseExecutor = new BoltExecutors(this) . 而不是传递必须的参数进去.这是否是为了强调task和executor之间的某种联系?如果是,那么又是什么联系呢? 在较早版本的源代码里面,是否有反映这个联系呢?
创建过程:
和worker的启动方式类似 (JStorm对初始化方法和启动方法使用了相同风格的命名, 减少了理解成本 ^_^)
- 执行 Task#new, 将必需的参数从workerData中取出 比较重要的一步: 获取taskObj. 是通过Common#get_task_object获取, 这个对象就是在建立拓扑时 new 出的Bolt/Spout对象 (原汁原味,没有执行过prepare)
- 执行 Task#execute, 进行必要的初始化工作
- 发送“start up” bolt 给系统bolt
- 创建 TaskTransfer. 作用是当 Bolt 中的业务代码调用 collect#emit 时, collect 内部使用 TaskTransfer 来发送消息给下游 Task.
- 创建 Executor 并在 AsyncLoopThread 中启动 详见 jstrom executor
Executor 是真正运行 Bolt/Spout 的地方.- 调用 Bolt/Spout#prepare 来初始化, Collector 也是在此时初始化并传递给 Bolt 的
- 循环调用 Bolt/Spout#execute 来对消息进行处理
- 若 Bolt/Spout 需发送消息, 需调用 Collector#emit, 消息会经过 TaskTransfer 投递给下游 Task
- 创建 TaskReceiver. 作用是反序列化其他 Worker 中的上游 Task 发来的消息 (当前 Worker 的消息在投递时没有序列化过,自然更不需要反序列化),并交由 Executor 消费.
运行过程:
上文写到:task启动时,创建 Executor 并在 AsyncLoopThread 中启动. AsyncLoopThread 启动后, 会在 while 循环中, 不断地执行 Executor 对象的 run 方法.
Executor 对象有 3 个子类, 因为篇幅有限(为了偷懒), 这里仅介绍 BoltExecutors 所作的操作, 其他子类的原理类似, 可以自己研究.
首先来查看 BoltExecutors 的继承关系:
- 由于实现了Runnable 接口, 因此 BoltExecutors 可以被 AsyncLoopThread 执行, 可以知道 BoltExecutors 代码执行的入口是 run 方法.
- 继承了 EventHandler 接口 (这是LMAX 开发的 Disruptor 框架中提供的一个消息消费接口), 这告诉我们 BoltExecutors 具有从DisruptorQueue 消费消息的功能.
看run方法
BoltExecutors 会在 while 循环中不断批量消费 exeQueue(DisruptorQueue) 中的消息, 且调用时传入的 eventHandler 是自身. exeQueue 中存放的是待消费的消息 ( 上游Task emit的消息最终会放在这个队列中 ). 因此这里的 consumeBatchWhenAvailable 会调用 BoltExecutors#onEvent 方法,如下所示:
代码过长,只截一部分.
BoltExecutors的onEvent 方法做了以下操作:
- 当 event 为 Tuple 时, 会调用 IBolt#execute 进行处理.
- Tuple 的类型可能是single 或 batch, 如果是 batch, 那就 for 循环里调用 IBolt#execute
- 实际上会做一些更加细致的判断,例如:
- tuple 是否支持事务
- 是否是 IRichBolt 所 emit 的 tuple
- 是否是 system bolt
- 等等
- 实际上还会通过 Metric 做一些性能的记录
不过主要逻辑是1、2两点, 更加细致的操作, 后续有空可能会分享出来^_^.
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- Spark 源码系列(三)作业运行过程
- 简单梳理Redux的源码与运行机制
- Kafka 源码解析:生产者运行机制
- Kafka 源码解析:消费者运行机制
- Composer 的 Autoload 源码实现——注册与运行
- JStorm 源码分析 - Worker 的启动与运行
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
500 Lines or Less
Amy Brown、Michael DiBernardo / 2016-6-28 / USD 35.00
This book provides you with the chance to study how 26 experienced programmers think when they are building something new. The programs you will read about in this book were all written from scratch t......一起来看看 《500 Lines or Less》 这本书的介绍吧!