内容简介:这包括新的Apache Kafka消费者API。兼容性 Apache Kafka版本0.10起 引入jar包原文链接:通过使用KafkaSpoutConfig类来配置spout实现。此类使用Builder模式,可以通过调用其中一个Builders构造函数或通过调用KafkaSpoutConfig类中的静态方法构建器来启动。
这包括新的Apache Kafka消费者API。兼容性 Apache Kafka版本0.10起 引入jar包
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>1.2.0</version>
</dependency>
复制代码
原文链接: a870439570.github.io/interview-d…
从kafka中订阅消息读取
通过使用KafkaSpoutConfig类来配置spout实现。此类使用Builder模式,可以通过调用其中一个Builders构造函数或通过调用KafkaSpoutConfig类中的静态方法构建器来启动。
用法示例
创建一个简单的不kafka数据源 以下将使用发布到“topic”的所有事件,并将它们发送到MyBolt,其中包含“topic”,“partition”,“offset”,“key”,“value”字段。
TopologyBuilder tp = new TopologyBuilder();
tp.setSpout("kafka_spout", new KafkaSpout(KafkaSpoutConfig.builder("localhost:9092" , "qxw").build()), 1);
tp.setBolt("bolt", new MyBolt()).shuffleGrouping("kafka_spout");
Config cfg=new Config();
cfg.setNumWorkers(1);//指定工作进程数 (jvm数量,分布式环境下可用,本地模式设置无意义)
cfg.setDebug(true);
LocalCluster locl=new LocalCluster();
locl.submitTopology("kkafka-topo",cfg,tp.createTopology());
复制代码
public static class MyBolt extends BaseBasicBolt{
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
System.err.println("接受订阅kafka消息: "+tuple.getStringByField("topic"));
System.err.println("接受订阅kafka消息: "+tuple.getStringByField("value"));
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}
复制代码
以上所述就是小编给大家介绍的《Storm系列(六)storm和kafka集成》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- 持续集成:数据库集成及快速构建
- ShareSDK集成及集成后遇到的一些问题【原创】
- 持续集成与持续部署宝典Part 3:创建集成环境
- 持续集成与持续部署宝典Part 2:创建持续集成流水线
- 禅道 12.3.stable 版本发布,全面集成八种单元测试框架,打通持续集成闭环
- 持续集成将死
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
人人都是架构师:分布式系统架构落地与瓶颈突破
高翔龙 / 电子工业出版社 / 2017-5 / 69
《人人都是架构师:分布式系统架构落地与瓶颈突破》并没有过多渲染系统架构的理论知识,而是切切实实站在开发一线角度,为各位读者诠释了大型网站在架构演变过程中出现一系列技术难题时的解决方案。《人人都是架构师:分布式系统架构落地与瓶颈突破》首先从分布式服务案例开始介绍,重点为大家讲解了大规模服务化场景下企业应该如何实施服务治理;然后在大流量限流/消峰案例中,笔者为大家讲解了应该如何有效地对流量实施管制,避......一起来看看 《人人都是架构师:分布式系统架构落地与瓶颈突破》 这本书的介绍吧!