内容简介:Storm topology 是由 spout 和 bolt 构建的有向无环图,其中 spout 是图的起始节点,用于发送数据,而 bolt 是图的中间节点和末端节点,用于对数据进行处理。下面我们先用一个简单的 wordcount 示例来回忆一下 storm 的基本使用,然后对示例中涉及到的 storm 编程接口从源码层面分析其内在实现。本节中我们将实现一个 wordcount 程序,其中 spout 用于发送句子,bolt 负责对句子进行切分、单词统计,以及最终打印等工作。实现一个 spout 最常见的
Storm topology 是由 spout 和 bolt 构建的有向无环图,其中 spout 是图的起始节点,用于发送数据,而 bolt 是图的中间节点和末端节点,用于对数据进行处理。下面我们先用一个简单的 wordcount 示例来回忆一下 storm 的基本使用,然后对示例中涉及到的 storm 编程接口从源码层面分析其内在实现。
一. 简单示例:Word Count
本节中我们将实现一个 wordcount 程序,其中 spout 用于发送句子,bolt 负责对句子进行切分、单词统计,以及最终打印等工作。
实现 spout
实现一个 spout 最常见的方式是继承 BaseRichSpout
抽象类,我们的句子发送 spout 实现如下:
public class SentenceSpout extends BaseRichSpout implements FieldName {
private static final long serialVersionUID = -2075595983328401544L;
private static final Logger log = LoggerFactory.getLogger(SentenceSpout.class);
private static final String[] SENTENCES = {
"It was getting dark, and we weren’t there yet.",
"Don't step on the broken glass.",
"The book is in front of the table.",
"Yeah, I think it's a good environment for learning English.",
"I will never be this young again. Ever. Oh damn… I just got older.",
"Joe made the sugar cookies; Susan decorated them.",
"I really want to go to work, but I am too sick to drive.",
"There was no ice cream in the freezer, nor did they have money to go to the store.",
"I want to buy a onesie, but know it won’t suit me."
};
private SpoutOutputCollector collector;
private int index;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
/*
* conf: storm 配置信息
* context: topology 中组件信息
*/
this.collector = collector;
}
@Override
public void nextTuple() {
String sentence = SENTENCES[index++ % SENTENCES.length];
collector.emit(new Values(sentence));
log.info("spout emit sentence[{}]", sentence);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(SENTENCE));
}
}
实现 bolt
实现一个 bolt 最常见的方式是继承 BaseRichBolt
抽象类,我们的句子切分、单词统计、结果打印三个 bolt 的实现分别如下:
- 句子切分:SentenceSplitBolt
public class SentenceSplitBolt extends BaseRichBolt implements FieldName {
private static final long serialVersionUID = -8048664019619422801L;
private static final Logger log = LoggerFactory.getLogger(SentenceSplitBolt.class);
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
String sentence = input.getStringByField(SENTENCE);
if (StringUtils.isBlank(sentence)) {
collector.ack(input);
return;
}
String[] words = sentence.split("\\s*");
log.info("sentence split bolt emit words[{}]", Arrays.toString(words));
Arrays.stream(words).forEach(word -> collector.emit(new Values(word)));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(WORD));
}
}
- 单词统计:WordCountBolt
public class WordCountBolt extends BaseRichBolt implements FieldName {
private static final long serialVersionUID = -1872418993716384947L;
private OutputCollector collector;
private Map<String, Integer> countMap = new HashMap<>();
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
String world = input.getStringByField(WORD);
countMap.put(world, countMap.getOrDefault(world, 0) + 1);
collector.emit(new Values(world, countMap.get(world)));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(COUNT));
}
}
- 结果打印:ReportBolt
public class ReportBolt extends BaseRichBolt implements FieldName {
private static final long serialVersionUID = -4646679841956175658L;
private static final Logger log = LoggerFactory.getLogger(ReportBolt.class);
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
String word = input.getStringByField(WORD);
int count = input.getIntegerByField(COUNT);
log.info("report bolt, word[{}], count[{}]", word, count);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// emit nothing
}
}
构建 topology
public class WordCountTopology implements ComponentId, FieldName {
private static final String TOPOLOGY_NAME = "wordcount-topology";
public static void main(String[] args) throws Exception {
SentenceSpout sentenceSpout = new SentenceSpout();
SentenceSplitBolt sentenceSplitBolt = new SentenceSplitBolt();
WordCountBolt wordCountBolt = new WordCountBolt();
ReportBolt reportBolt = new ReportBolt();
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(SENTENCE_SPOUT_ID, sentenceSpout);
builder.setBolt(SENTENCE_SPLIT_BOLT_ID, sentenceSplitBolt).shuffleGrouping(SENTENCE_SPOUT_ID);
builder.setBolt(WORD_COUNT_BOLT_ID, wordCountBolt).fieldsGrouping(SENTENCE_SPLIT_BOLT_ID, new Fields(WORD));
builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(WORD_COUNT_BOLT_ID);
Config config = new Config();
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
TimeUnit.MINUTES.sleep(10);
localCluster.killTopology(TOPOLOGY_NAME);
localCluster.shutdown();
}
}
二. 编程接口实现分析
2.1 基础实现类
- Fields
Fields 类用于指定消息的输出字段名称,它本质上是对 List 的包装,同时定义了一个 Map<String, Integer>
类型的 _index
属性用于记录字段及其索引之间的映射关系,同时提供了一些方法方便查询、获取指定字段下标,以及判断字段是否存在等操作。
- Values
Values 类与 Fields 相对应,后者用于指定输出字段的名称列表,而字段对应的值则由 Values 进行存储,它继承自 ArrayList。
- Tuple
Tuple 是对 topology 中传输数据的封装,提供了对于 Fields 和 Values 中数据获取(包括按类型获取)的接口。除此之外还提供了以下接口用于获取当前 tuple 的关联信息:
public interface Tuple extends ITuple {
/**
* 获取与该 tuple 对应的 GlobalStreamId 对象
*/
GlobalStreamId getSourceGlobalStreamId();
/**
* 获取创建当前 tuple 的组件 ID
*/
String getSourceComponent();
/**
* 获取创建当前 tuple 的 taskId
*/
int getSourceTask();
/**
* 获取当前 tuple 被 emit 的目标 streamId
*/
String getSourceStreamId();
/**
* 获取当前 tuple 的消息序号,用来追踪消息是否被成功处理
*/
MessageId getMessageId();
}
相对于原生 storm,在 jstorm 中还定义了 ITupleExt 接口,补充了一些额外的方法用于获取当前 tuple 的目标 taskId、创建时间、是否是 batch 类型,以及设置 batchId 等操作。
2.2 Thrift 数据结构
- GlobalStreamId
struct GlobalStreamId {
1: required string componentId; // 标识当前流所属组件
2: required string streamId; // 流的标识
}
流(stream)是 storm 中十分重要的一个概念,是消息传输的渠道,一个组件可以向多个流发送消息,也可以接收来自多个流的消息。
- StreamInfo
struct StreamInfo {
1: required list<string> output_fields; // 输出的字段名称列表
2: required bool direct; // 标识是否是直接流
}
- Grouping
union Grouping {
1: list<string> fields; // empty list means global grouping
2: NullStruct shuffle; // tuple is sent to random task
3: NullStruct all; // tuple is sent to every task
4: NullStruct none; // tuple is sent to a single task (storm's choice) -> allows storm to optimize the topology by bundling tasks into a single process
5: NullStruct direct; // this bolt expects the source bolt to send tuples directly to it
6: JavaObject custom_object;
7: binary custom_serialized;
8: NullStruct local_or_shuffle; // prefer sending to tasks in the same worker process, otherwise shuffle
9: NullStruct localFirst; // local worker shuffle > local node shuffle > other node shuffle
}
Grouping 表示消息的分组方式,被定义为 union 类型,意味着每个组件只能选择一种消息分组方式。各分组类型示意如下:
| 序号 | 分组方式 | 说明 |
|---|---|---|
| 1 | global | 将所有的 tuple 发送给目标组件的第一个 task |
| 2 | fields | 依据指定字段的值进行分组, 保证指定字段具有相同的值时会发送给同一个 task, 原理是对某个或几个字段值做哈希,然后对哈希值求模得出目标 task |
| 3 | shuffle | 轮询方式,随机平均发送 tuple 给下游组件 |
| 4 | all | 将 tuple 复制后发送给所有目标组件的所有 task |
| 5 | none | 随机发送 tuple 到目标组件,相对于 shuffle 而言无法保证平均 |
| 6 | direct | 调用 emitDirect 方法将 tuple 发送给指定的下游 task |
| 7 | custom | 使用用户接口 CustomStreamGrouping 选择目标 task |
| 8 | localOrShuffle | 本地 worker 优先,如果本地有目标组件的 task,则随机从本地内部的目标组件 task 列表中进行选择,否则就和 shuffle 分组方式一样,用于减少网络传输 |
| 9 | localFirst | 本地 worker 优先级最高,如果本地有目标组件的 task,则随机从本地内部的目标组件的 task 列表中进行选择;本节点优先级其次,当本地 worker 不能满足条件时,如果本地 supervisor 节点下其他 worker 有目标组件的 task,则随机从中选择一个 task 进行发送;当这两种情况都不满足时,则从其他 supervisor 节点的目标 task 中随机选择一个 task 进行发送 |
- ComponentCommon
struct ComponentCommon {
// 组件将从哪些 GlobalStreamId 以何种分组方式接收数据
1: required map<GlobalStreamId, Grouping> inputs;
// 组件要输出的所有流,key 是 streamId
2: required map<string, StreamInfo> streams;
// 组件并行度(即多少个线程),这些线程可能分布在不同的机器或进程空间中
3: optional i32 parallelism_hint;
// 组件相关配置项
4: optional string json_conf;
}
ComponentCommon 是 topology 的基础对象,用于描述一个组件。在 storm 中将 spout 和 bolt 统称为组件,TopologyBuilder 在构建 topology 时会将我们定义的 spout 和 bolt 封装成 ComponentCommon 对象进行存储。
- SpoutSpec
struct SpoutSpec {
// 存储 spout 的序列化对象
1: required ComponentObject spout_object;
// 描述 spout 输入输出的 ComponentCommon 对象
2: required ComponentCommon common;
}
- Bolt
struct Bolt {
// 存储 bolt 的序列化对象
1: required ComponentObject bolt_object;
// 描述 bolt 输入输出的 ComponentCommon 对象
2: required ComponentCommon common;
}
- StormTopology
struct StormTopology {
1: required map<string, SpoutSpec> spouts; // topology 中的 spout 集合
2: required map<string, Bolt> bolts; // topology 中的 bolt 集合
3: required map<string, StateSpoutSpec> state_spouts; // topology 中的 state spout 集合
}
StormTopology 用于描述 topology 的组成,包含 spout 和 bolt 组件,我们在使用 TopologyBuilder 类编程构建 topology 时,最终都是为了创建 StormTopology 对象。
- TopologySummary
struct TopologySummary {
1: required string id;
2: required string name;
3: required string status; // 状态信息
4: required i32 uptimeSecs; // 运行时长
5: required i32 numTasks; // task 数目
6: required i32 numWorkers; // worker 数目
7: optional string errorInfo;
}
当我们提交一个任务给 storm 集群时,nimbus 节点会生成当前任务对应 topology 的状态信息(TopologySummary),用于在 UI 上进行显示。
- NimbusSummary
struct NimbusStat {
1: required string host;
2: required string uptimeSecs; // 运行时间
}
struct NimbusSummary {
1: required NimbusStat nimbusMaster; // master 节点信息
2: required list<NimbusStat> nimbusSlaves; // slave 节点信息
3: required i32 supervisorNum;
4: required i32 totalPortNum;
5: required i32 usedPortNum;
6: required i32 freePortNum;
7: required string version;
}
NimbusSummary 用于描述一个 nimbus 节点的基本信息。
- SupervisorSummary
struct SupervisorSummary {
1: required string host; // 所属主机名
2: required string supervisorId;
3: required i32 uptimeSecs; // 运行时间
4: required i32 numWorkers; // 可以使用的 worker 数目
5: required i32 numUsedWorkers; // 已经使用的 worker 数目
6: optional string version;
7: optional string buildTs;
8: optional i32 port;
9: optional string errorMessage;
}
SupervisorSummary 用于描述一个 supervisor 节点的基本信息。
- ClusterSummary
struct ClusterSummary {
1: required NimbusSummary nimbus;
2: required list<SupervisorSummary> supervisors;
3: required list<TopologySummary> topologies;
}
ClusterSummary 用于描述整个集群的运行信息,包含 nimbus 节点、supervisor 节点,以及在整个集群上运行着的 topology 的摘要信息。
2.3 组件接口
IComponent 接口
Spout 和 bolt 在 storm 中统称为组件,IComponent 接口是对组件的顶层抽象,实现如下:
public interface IComponent extends Serializable {
/**
* 定义组件输出的 Schema,包括输出的 streamId、输出的字段名称列表,以及标识是否是直接流
*/
void declareOutputFields(OutputFieldsDeclarer declarer);
/**
* 获取与组件相关的配置
*/
Map<String, Object> getComponentConfiguration();
}
ISpout 接口
ISpout 是对 spout 组件的顶层抽象,声明了一个 spout 应该具备的基本操作,实现如下:
public interface ISpout extends Serializable {
void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
void close();
void activate();
void deactivate();
void nextTuple();
void ack(Object msgId);
void fail(Object msgId);
}
open 方法会在 spout 组件所属 task 被所在 worker 初始化时进行调用。我们一般在该方法中实现一些初始化逻辑,而不是在 spout 类的构造方法中进行,因为相应节点通过反序列化的方式获取 spout 对象,其构造方法不一定会被调用。
close 方法会在 spout 被销毁时调用,但是 storm 并不保证该方法一定会被执行。
activate 方法和 deactivate 方法会在 spout 被置为活跃和非活跃状态时分别被调用,用户可以在其中实现相应的感知逻辑。
ack 和 fail 方法用于实现计算的可靠性,保证消息至少被消费一次(at least once)。
nextTuple 是 spout 的核心方法,用户可以实现该方法来向下游 bolt 发送消息,storm 会循环调用该方法从数据源拉取数据,并传递给业务执行。在原生 storm 中 ack、fail,以及 nextTuple 三个方法在同一个线程中被循环调用,所以三个方法都不应该是阻塞的,而 JStorm 则做了针对性的优化,将 nextTuple 和 ack/fail 逻辑分离开。
IBolt 接口
IBolt 是常见的对 bolt 组件的顶层抽象,声明了 bolt 应该具备的基本功能,实现如下:
public interface IBolt extends Serializable {
void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
void execute(Tuple input);
void cleanup();
}
prepare 方法类似于 ISpout 的 open 方法,会在 bolt 组件被反序列化时被调用,用于实现一些初始化逻辑,同样我们不应该将初始化逻辑实现在构造方法中。
cleanup 方法类似于 ISpout 的 close 方法,会在 bolt 对象被销毁时调用,storm 同样不保证该方法一定会被执行。
execute 方法是 bolt 的核心方法,用户可以在该方法中实现对数据的处理和发送给下游 bolt,如果开启了 ack 机制,那么对消息的 ack 和 fail 也同样在该方法中进行,以保证消息被正确不丢失的处理。
public interface IRichBolt extends IBolt, IComponent { }
IRichBolt 接口继承自 IBolt 和 IComponent,组合了这两个接口所定义的功能,所以它并不是一种新的 bolt 接口。
IBasicBolt 接口
IBasicBolt 也是一个顶层的 bolt 组件抽象,区别 IBolt 的地方在于使用了 BasicOutputCollector 作为输出收集器,并且是在 execute 方法中传入的,接口定义如下:
public interface IBasicBolt extends IComponent {
void prepare(Map stormConf, TopologyContext context);
void execute(Tuple input, BasicOutputCollector collector);
void cleanup();
}
IBasicBolt 接口存在的意义在于为用户提供一种更加简单的方式实现 bolt,用户不需要考虑消息的 ack 和 fail 等操作,而是由 storm 自动完成。在利用 TopologyBuilder 构建 topology 时,如果输入的是 IBasicBolt 类型,那么 TopologyBuilder 会自动用 BasicBoltExecutor 对用户自定义的 bolt 进行包装:
public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism_hint) throws IllegalArgumentException {
return this.setBolt(id, new BasicBoltExecutor(bolt), parallelism_hint);
}
而 BasicBoltExecutor#execute
方法在执行的时候自动处理了 ack 和 fail 逻辑,用到的输出收集器就是 BasicOutputCollector,实现如下:
public void execute(Tuple input) {
_collector.setContext(input);
try {
_bolt.execute(input, _collector);
_collector.getOutputter().ack(input);
} catch (FailedException e) {
if (e instanceof ReportedFailedException) {
_collector.reportError(e);
}
_collector.getOutputter().fail(input);
}
}
IBatchBolt 接口
IBatchBolt 接口主要用于定义支持批处理的 bolt,例如 trident、事务处理等。接口定义如下:
public interface IBatchBolt<T> extends Serializable, IComponent {
void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, T id);
void execute(Tuple tuple);
void finishBatch();
}
相对于 IBolt 和 IBasicBolt 而言,IBatchBolt 去掉了 cleanup 方法,取而代之的是 finishBatch,该方法会在当前 batch 之前的所有 batch 都被处理成功时被调用。有一点与 IBasicBolt 相同,IBatchBolt 的用户也无需关心 ack 和 fail 逻辑,storm 会自动进行处理,相关实现位于 BatchBoltExecutor#execute
方法中:
public void execute(Tuple input) {
Object id = input.getValue(0);
IBatchBolt bolt = this.getBatchBolt(id);
try {
bolt.execute(input);
_collector.ack(input);
} catch (FailedException e) {
LOG.error("Failed to process tuple in batch", e);
_collector.fail(input);
}
}
这里的输出收集器是 BatchOutputCollector 的实现。
2.4 输出收集器
在 spout 和 bolt 的初始化方法中都有一个输出收集器参数 OutputCollector,输出收集器主要用于对当前组件收到的消息进行进一步的处理,包括 emit、ack,以及 fail 等操作。
Spout 输出收集器
ISpoutOutputCollector 接口定义了 spout 的输出收集器,用于 spout 对于消息的进一步控制。该接口定义如下:
public interface ISpoutOutputCollector {
List<Integer> emit(String streamId, List<Object> tuple, Object messageId);
void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId);
void reportError(Throwable error);
}
emit 和 emitDirect 都用于向下游发送数据,区别在于 emitDirect 发出的消息只会被 taskId 参数所指定的 Task 接收到,同时方法要求 streamId 对应的流必须被定义为直接流,接收端的 task 也必须以直接分组(Direct Grouping)的方式来接收消息。
后面在分析 worker 的启动与运行机制的时候将会看到 worker 在启动 task 时会调用 SpoutExecutors#init
方法,其中会调用 ISpout#open
方法将 SpoutOutputCollector 对象传递给对应的 spout。
Bolt 输出收集器
在介绍 Bolt 接口时我们知道围绕 bolt 定义了三种类型的接口:IBolt(or IRichBolt)、IBasicBolt,以及 IBatchBolt。针对每一种 bolt 接口类型也有对应的输出收集器,分别是 OutputCollector、BasicOutputCollector 和 BatchOutputCollector。
- OutputCollector
OutputCollector 实现了 IOutputCollector 接口,jstorm 又在此接口的基础上提供了抽象类 OutputCollectorCb,所以 OutputCollector 最终是继承自 OutputCollectorCb。IOutputCollector 接口定义如下:
public interface IOutputCollector extends IErrorReporter {
List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple);
void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple);
void ack(Tuple input);
void fail(Tuple input);
}
- BasicOutputCollector
BasicOutputCollector 实现了 IBasicOutputCollector 接口,由于不需要手动去调用 ack 和 fail,所以略去了相关方法。IBasicOutputCollector 接口定义如下:
public interface IBasicOutputCollector {
List<Integer> emit(String streamId, List<Object> tuple);
void emitDirect(int taskId, String streamId, List<Object> tuple);
void reportError(Throwable t);
}
- BatchOutputCollector
BatchOutputCollector 是一个抽象类,并提供了 BatchOutputCollectorImpl 实现,BatchOutputCollectorImpl 本质上包装了 OutputCollector。BatchOutputCollector 抽象类定义如下:
public abstract class BatchOutputCollector {
public List<Integer> emit(List<Object> tuple) {
return emit(Utils.DEFAULT_STREAM_ID, tuple);
}
public abstract List<Integer> emit(String streamId, List<Object> tuple);
public void emitDirect(int taskId, List<Object> tuple) {
emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple);
}
public abstract void emitDirect(int taskId, String streamId, List<Object> tuple);
public abstract void reportError(Throwable error);
}
本篇主要是对我们编程中直接接触到的 storm api 实现进行了简单的分析,从下一篇开始,我们将深入到 storm 的内部,一探这个流式计算引擎的运行机制。
(本篇完)
以上所述就是小编给大家介绍的《JStorm 源码解析:编程接口》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- ReactNative源码解析-初识源码
- Spring源码系列:BeanDefinition源码解析
- Spring源码分析:AOP源码解析(下篇)
- Spring源码分析:AOP源码解析(上篇)
- 注册中心 Eureka 源码解析 —— EndPoint 与 解析器
- 新一代Json解析库Moshi源码解析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
编程语言实现模式
Terence Parr / 李袁奎、尧飘海 / 华中科技大学出版社 / 2012-3-20 / 72.00元
《编程语言实现模式》旨在传授开发语言应用(工具)的经验和理念,帮助读者构建自己的语言应用。这里的语言应用并非特指用编译器或解释器实现编程语言,而是泛指任何处理、分析、翻译输入文件的程序,比如配置文件读取器、数据读取器、模型驱动的代码生成器、源码到源码的翻译器、源码分析工具、解释器,以及诸如此类的工具。为此,作者举例讲解已有语言应用的工作机制,拆解、归纳出31种易于理解且常用的设计模式(每种都包括通......一起来看看 《编程语言实现模式》 这本书的介绍吧!
XML 在线格式化
在线 XML 格式化压缩工具
HEX HSV 转换工具
HEX HSV 互换工具