内容简介:IComponent 是所有组件的接口,例如 IBasicBolt、IRichBolt、IBatchBolt 都继承自 IComponent,为拓扑中所有组件提供共同的方法。BaseComponent 是 Storm 提供的一个比较方便的抽象类,这个抽象类及其子类都或多或少实现了其接口定义的部分方法。IBolt 接口是 IRichBolt 要继承的接口。还有一些以 Base 开头的 Bolt 类,如 BaseBasicBolt,BaseRichBolt 等,在这些类中所实现的方法都为空,或者返回值为 NUL
IComponent 是所有组件的接口,例如 IBasicBolt、IRichBolt、IBatchBolt 都继承自 IComponent,为拓扑中所有组件提供共同的方法。BaseComponent 是 Storm 提供的一个比较方便的抽象类,这个抽象类及其子类都或多或少实现了其接口定义的部分方法。IBolt 接口是 IRichBolt 要继承的接口。还有一些以 Base 开头的 Bolt 类,如 BaseBasicBolt,BaseRichBolt 等,在这些类中所实现的方法都为空,或者返回值为 NULL。从下图中,可以从整体上看到这些类的关系图,从而理清这些类之间的关系及结构。
1. IComponent 与 BaseComponent
IComponent 继承 Serializable 接口,为拓扑中所有组件提供共同的方法,是所有组件的接口:
public interface IComponent extends Serializable { void declareOutputFields(OutputFieldsDeclarer declarer); Map<String, Object> getComponentConfiguration(); }
使用如下方法为拓扑中的流声明输出模式,OutputFieldsDeclarer 用于声明输出流ID,输出字段以及每个输出流是否是直接流:
void declareOutputFields(OutputFieldsDeclarer declarer);
使用如下方法声明针对当前组件的配置,只能覆盖 topology.*
配置。使用 TopologyBuilder 构建拓扑时,可以进一步覆盖该组件配置:
Map<String, Object> getComponentConfiguration();
BaseComponent 是 Storm 提供的一个比较方便的抽象类,这个抽象类及其子类都简单实现了其接口定义的部分方法,所实现的方法返回值为 NULL:
public abstract class BaseComponent implements IComponent { @Override public Map<String, Object> getComponentConfiguration() { return null; } }
2. IBolt
IBolt 继承了 Serializable 接口,输入元组经过处理后输出相应的元组,可以执行过滤,连接以及聚合等操作。IBolt 可以不必立即处理接收的元组,而是保留元组在内存中以便稍后处理。
public interface IBolt extends Serializable { void prepare(Map stormConf, TopologyContext context, OutputCollector collector); void execute(Tuple input); void cleanup(); }
IBolt 的生命周期如下:在客户端上创建 IBolt 对象。在 Nimbus 上提交 Topology 后,创建出来的 IBolt 在序列化后被发送到具体执行的 Worker 上。在 Worker 上执行时,先调用 prepare 方法传入当前执行的上下文,然后调用 execute 方法,对元组进行处理。如果要参数化 IBolt 对象,需要通过构造函数来设置参数,并将参数保存在实例变量中(然后将其序列化并传送到跨集群执行的每个任务上)。使用传入的 OutputCollector 的 ack 方法或 fail 方法来反馈处理结果。
当初始化 Worker 上该组件的一个任务时会调用如下方法,并提供执行环境。stormConf 为 Bolt 提供配置,并与集群提供的配置进行合并。context 用来获取有关此任务在拓扑中的位置信息,包括此任务的任务ID和组件ID,输入和输出信息等。collector 用来从 Bolt 向下游发送元组,元组可以在任何时间点发送,不必处理完立即发送。collector 是线程安全的,可以保存在一个实例变量:
void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
处理单个输入元组时会调用如下方法。Tuple 对象包含有关它来自哪个组件/流/任务的元数据。可以使用 Tuple.getValue
访问元组的值。IBolt 不必立即处理元组,而是挂起稍后处理。使用 prepare 方法提供的 OutputCollector 来发送元组。要求所有输入元组使用 OutputCollector 的 ack 或 fail 方法进行反馈。否则,Storm 无法确定从 Spout 发送的元组什么时候完成:
void execute(Tuple input);
当停掉 Bolt 实例时会调用如下方法,但是不保证一定会调用该方法:
void cleanup();
3. RichBolt VS BasicBolt
Storm 提供了两种不同类型的 Bolt,分别是 RichBolt(IRichBolt, BaseRichBolt) 和 BasicBolt(IBasicBolt, BaseBasicBolt),很多使用 Storm 的人无法分清 BasicBolt 和 RichBolt 之间的区别。我们的建议是尽可能的使用 BasicBolt。
这两个类继承的父类如第一个图所示,它们的共同之处是父类中都有 BaseComponent 和 ICompont。不同之处是 BaseRichBolt 实现有 IBolt 和 IRichBolt 接口,而 BaseBasicBolt 只有 IBasicBolt 接口。其实本质的区别在于 IBolt 和 IBasicBolt 的区别:
public interface IBasicBolt extends IComponent { void prepare(Map stormConf, TopologyContext context); void execute(Tuple input, BasicOutputCollector collector); void cleanup(); } public interface IBolt extends Serializable { void prepare(Map stormConf, TopologyContext context, OutputCollector collector); void execute(Tuple input); void cleanup(); }
RichBolt 继承 IBolt 接口,使用 OutputCollector 的如下方法来发送元组:
// 向指定数据流发送锚定的元组, 需要向 Acker 发送 ack 确认, 可靠传递 List<Integer> emit(String streamId, Tuple anchor, List<Object> tuple); // 向指定数据流发送未锚定的元组, 不需要向 Acker 发送 ack 确认, 是不可靠传递 List<Integer> emit(String streamId, List<Object> tuple); // 向默认数据流发送锚定的元组, 需要向 Acker 发送 ack 确认, 可靠传递 List<Integer> emit(Tuple anchor, List<Object> tuple); // 向默认数据流发送未锚定的元组, 不需要向 Acker 发送 ack 确认, 是不可靠传递 List<Integer> emit(List<Object> tuple);
BasicBolt 使用 BasicOutputCollector 的如下方法来发送元组:
// 向指定数据流发送锚定的元组, 需要向 Acker 发送 ack 确认, 可靠传递 List<Integer> emit(String streamId, List<Object> tuple); // 向默认数据流发送锚定的元组, 需要向 Acker 发送 ack 确认, 可靠传递 List<Integer> emit(List<Object> tuple);
两个 Bolt 都可以实现可靠性消息传递,不过 RichBolt 需要自己做很多周边的事情(例如,建立 Anchor 树,以及手动 ACK/FAIL 通知 Acker),而 BasicBolt 则由 Storm 帮忙实现了很多周边的事情,实现起来方便简单。
4. 实现(不)可靠性消息传递
下面我们看一下如何使用上面的 Bolt 来实现(不)可靠性消息传递。
(1) 使用 BaseRichBolt 实现不可靠的Bolt
public class SplitSentence extends BaseRichBolt { private OutputCollector collector; public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple tuple) { String sentence = tuple.getString(0); for(String word: sentence.split(" ")) { collector.emit(new Values(word)); } // 在这即使Ack也是没有用处的 // collector.ack(tuple); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
这种方式我们没有手动建立 Anchor 树以及手动 Ack 或者 Fail,所以这是一种不可靠的消息传递方式。
(2) 使用 BaseRichBolt 实现可靠的Bolt
public class SplitSentence extends BaseRichBolt { private OutputCollector collector; public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple tuple) { String sentence = tuple.getString(0); for(String word: sentence.split(" ")) { collector.emit(tuple, new Values(word)); } collector.ack(tuple); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
这种方式需要我们手动的建立 Anchor 树以及手动的 Ack 或者 Fail,所以这是一种可靠的消息传递方式。
(3) 使用 BaseBasicBolt 实现可靠的Bolt
public class SplitSentence extends BaseBasicBolt { public void execute(Tuple tuple, BasicOutputCollector collector) { String sentence = tuple.getString(0); for(String word: sentence.split(" ")) { collector.emit(new Values(word)); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
这种方式由 Storm 自动帮助我们建立 Anchor 树以及发送 Ack 或者 Fail。这是一种可靠的消息传递方式。我们只需要关心业务逻辑即可。
英译对照:
-
直接流: direct stream
欢迎关注我的公众号和博客:
参考:
-
Storm 的可靠性保证测试
-
IBasicBolt vs IRichBolt
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Python Web开发:测试驱动方法
Harry J.W. Percival / 安道 / 人民邮电出版社 / 2015-10 / 99
本书从最基础的知识开始,讲解Web开发的整个流程,展示如何使用Python做测试驱动开发。本书由三个部分组成。第一部分介绍了测试驱动开发和Django的基础知识。第二部分讨论了Web开发要素,探讨了Web开发过程中不可避免的问题,及如何通过测试解决这些问题。第三部分探讨了一些高级话题,如模拟技术、集成第三方插件、Ajax、测试固件、持续集成等。本书适合Web开发人员阅读。一起来看看 《Python Web开发:测试驱动方法》 这本书的介绍吧!