Storm入门经典文章:本地模式运行storm的demo 单机模式跑直一个Word Count & kafka to Storm

栏目: 编程工具 · 发布时间: 5年前

内容简介:Storm是一个实时计算框架,有开源的大神为我们搭好了平台,按照大神的玩法,Storm的作业是topology,而topolgy是由spout,blot组成,spout是取数据,blot是处理数据,一个topology由一个spout加多个blot组成。将topology丢到storm上就能跑起来,可以在本地模式下跑,也可以在集群模式下跑。Storm的使用,可以查看小伙伴田海龙的经典Blog示例。一个经典的wordcount示例。这个示范了取数据到处理数据的过程。实际使用Storm的时候, 通常是Storm
Storm入门经典文章:本地模式运行storm的demo 单机模式跑直一个Word Count & kafka to Storm
storm hello world & kafka storm

Storm是一个实时计算框架,有开源的大神为我们搭好了平台,按照大神的玩法,Storm的作业是topology,而topolgy是由spout,blot组成,spout是取数据,blot是处理数据,一个topology由一个spout加多个blot组成。将topology丢到storm上就能跑起来,可以在本地模式下跑,也可以在集群模式下跑。

Storm的使用,可以查看小伙伴田海龙的经典Blog示例。一个经典的wordcount示例。这个示范了取数据到处理数据的过程。实际使用Storm的时候, 通常是Storm+kafka,按照我们公司的日志系统的情况来看,是这么搭配。

玩Storm能增值不?

当然可以,提升自身身价。同时,也可以考虑 星火理财 专业手段直接增值 。新技术能提升个人的技术身价,不过,玩Storm,玩到什么程度又是一个境界了。这个要看如何平衡了。一般日志成型后,可能也不会怎么动了,一直在不断的接触新东西,而storm的源代码读过否?

Storm入门经典文章:本地模式运行storm的demo 单机模式跑直一个Word Count & kafka to Storm
storm与财富增值

Storm难吗?

如果说使用storm,相对说讲,还是比较easy。比如用storm写一个word count,或者用storm读取kafka的消息。一般来讲,拿来用是比较简单。如果要去深入看storm,就另说了。

Storm如何集成Kafka?

其实有的时候找来找去,很多答案就在github.com上面,Storm的官方源码中,有kafka的集成示范代码,如果看不明白的话,其实也没有很大的关系。因为我们不还可以找国内小伙伴分享的一些经典。

我的经验分享,也是我们结合网上教程实践一些心得。我们知道topology是由spout与bolt组成的,那么跟kafka集成,必然要一个kafkaSpout,这个有现成的,网上的代码只需要实现一个Scheme,对kafka的输出进行转化,把字节流转成字符串。简单的说,kafkaspout别人都写好了。只需要写一个bytebuffer to string的方法。

小坑之一: String 转UTF8

网上的代码报错,找了一段bytebuffer to string替换掉

小坑之二:zookeeper

到底是跟kafka共用一个,还是独立一套zookeeper给storm用。这个问题纠结了一下,因为zookeeper要是每人上集群都配置一套的话,会有好多zookeeper,不过,老司机校友也是这么推荐的,那就听老司机的。单独给storm部署一的大套。插曲:同时听我要zookeeper的同事吓一跳,因为TA觉得是给kafka用的,不想给我这个Storm用。而我们今天只是想把storm与kafka跑通来,并不想再去部署一个zookeeper。哈哈,最好还是蹭的kafka的zookeeper!

关注公众号回复【kafka】可以获得我实战整理的storm连kafka源代码包包哦

Storm入门经典文章:本地模式运行storm的demo 单机模式跑直一个Word Count & kafka to Storm

有的人写Blog,帖很多代码,都不带写些文字,也有的人喜欢写很多文字,看不到代码。我喜欢全二为一,尽量把自己的一些想法加在里面。还记录一下坑啊什么的,可以查看田海龙的一个入门级demo  http://www.tianhailong.com/?p=1358

WordCounter.java:

package com.youku.demo.bolts;

import java.util.HashMap;
import java.util.Map;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;

public class WordCounter extends BaseBasicBolt {

    Integer id;
    String name;
    Map<String, Integer> counters;

    /**
     * At the end of the spout (when the cluster is shutdown
     * We will show the word counters
     */
    @Override
    public void cleanup() {
        System.out.println("-- Word Counter ["+name+"-"+id+"] --");
        for(Map.Entry<String, Integer> entry : counters.entrySet()){
            System.out.println(entry.getKey()+": "+entry.getValue());
        }
    }

    /**
     * On create
     */
    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        this.counters = new HashMap<String, Integer>();
        this.name = context.getThisComponentId();
        this.id = context.getThisTaskId();
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {}

    public void execute(Tuple input, BasicOutputCollector collector) {
        String str = input.getString(0);
        /**
         * If the word dosn't exist in the map we will create
         * this, if not We will add 1
         */
        if(!counters.containsKey(str)){
            counters.put(str, 1);
        }else{
            Integer c = counters.get(str) + 1;
            counters.put(str, c);
        }
    }
}

WordNormalizer.java:

package com.youku.demo.bolts;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class WordNormalizer extends BaseBasicBolt {

    public void cleanup() {}

    /**
     * The bolt will receive the line from the
     * words file and process it to Normalize this line
     *
     * The normalize will be put the words in lower case
     * and split the line to get all words in this
     */
    public void execute(Tuple input, BasicOutputCollector collector) {
        String sentence = input.getString(0);
        String[] words = sentence.split(" ");
        for(String word : words){
            word = word.trim();
            if(!word.isEmpty()){
                word = word.toLowerCase();
                collector.emit(new Values(word));
            }
        }
    }

    /**
     * The bolt will only emit the field "word"
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

WordReader.java:

package com.youku.demo.spouts;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class WordReader extends BaseRichSpout {

    private SpoutOutputCollector collector;
    private FileReader fileReader;
    private boolean completed = false;
    public void ack(Object msgId) {
        System.out.println("OK:"+msgId);
    }
    public void close() {}
    public void fail(Object msgId) {
        System.out.println("FAIL:"+msgId);
    }

    /**
     * The only thing that the methods will do It is emit each
     * file line
     */
    public void nextTuple() {
        /**
         * The nextuple it is called forever, so if we have been readed the file
         * we will wait and then return
         */
        if(completed){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                //Do nothing
            }
            return;
        }
        String str;
        //Open the reader
        BufferedReader reader = new BufferedReader(fileReader);
        try{
            //Read all lines
            while((str = reader.readLine()) != null){
                /**
                 * By each line emmit a new value with the line as a their
                 */
                this.collector.emit(new Values(str),str);
            }
        }catch(Exception e){
            throw new RuntimeException("Error reading tuple",e);
        }finally{
            completed = true;
        }
    }

    /**
     * We will create the file and get the collector object
     */
    public void open(Map conf, TopologyContext context,
            SpoutOutputCollector collector) {
        try {
            this.fileReader = new FileReader(conf.get("wordsFile").toString());
        } catch (FileNotFoundException e) {
            throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");
        }
        this.collector = collector;
    }

    /**
     * Declare the output field "word"
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("line"));
    }
}

TopologyMain.java:

package com.youku.demo;

import com.youku.demo.bolts.WordCounter;
import com.youku.demo.bolts.WordNormalizer;
import com.youku.demo.spouts.WordReader;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

public class TopologyMain {
    public static void main(String[] args) throws InterruptedException {

        //Topology definition
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("word-reader",new WordReader());
        builder.setBolt("word-normalizer", new WordNormalizer())
            .shuffleGrouping("word-reader");
        builder.setBolt("word-counter", new WordCounter(),1)
            .fieldsGrouping("word-normalizer", new Fields("word"));

        //Configuration
        Config conf = new Config();
        conf.put("wordsFile", args[0]);
        conf.setDebug(true);
        //Topology run
        conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());
        Thread.sleep(2000);
        cluster.shutdown();
    }
}

pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.youku.demo</groupId>
    <artifactId>demo-storm</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>demo-storm</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                    <compilerVersion>1.6</compilerVersion>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <repositories>
        <!-- Repository where we can found the storm dependencies -->
        <repository>
            <id>clojars.org</id>
            <url>http://clojars.org/repo</url>
        </repository>
    </repositories>

    <dependencies>
        <!-- Storm Dependency -->
        <dependency>
            <groupId>storm</groupId>
            <artifactId>storm</artifactId>
            <version>0.8.0</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>

words.txt:

storm
test
are
great
is
an
storm
simple
application
but
very
powerfull
really
StOrm
is
great

运行的时候需要配置参数:src/main/resources/words.txt

因为入门的wordcount单独展示的,会给人感觉没有太大的实用价值。将kafka与storm的放一起,可以更好的理解怎么与kafka集成。

关注公众号回复【kafka】可以获得我实战整理的storm连kafka源代码包包哦。

Storm入门经典文章:本地模式运行storm的demo 单机模式跑直一个Word Count & kafka to Storm

赞助商小伙伴链接:

如何享受宜信星火金服宜心理财:

  1. 扫码二维码
  2. 通过 宜信星火金服活动 链接  http://www.ixinghuo.com/qcode.php?yixinqcode
  3. 通过 宜信星火金服理财师店铺 链接: https://xinghuo.yixin.com/yiidea
  4. 通过 宜信星火金服理财师移动端邀请页 面  https://xinghuo.yixin.com/mobile/activityPage/shareShop/yiidea
    5.通过宜信星火金服理财师店铺宜心理财团队短链接:

http://yixin.hk

http://yixin.ceo

http://yue.ma

  1. 通过 宜信星火金服宜心理财团队 网站页面

http://www.yixinlicai.com.cn

  1. 通过   宜信.公司  |  宜信.网络  |  宜信.net

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

软件开发者路线图

软件开发者路线图

Dave H. Hoover、Adewale Oshineye / 王江平 / 机械工业出版社 / 2010年9月 / 35.00元

作为一名软件开发者,你在奋力推进自己的职业生涯吗?面对今天日新月异和不断拓展的技术,取得成功需要的不仅仅是技术专长。为了增强专业性,你还需要一些软技能以及高效的学习技能。本书的全部内容都是关于如何修炼这些技能的。两位作者Dave Hoover和Adewale Oshineye给出了数十种行为模式,来帮你提高主要的技能。 本书中的模式凝结了多年的调查研究、无数次的访谈以及来自O’Reilly在......一起来看看 《软件开发者路线图》 这本书的介绍吧!

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

MD5 加密
MD5 加密

MD5 加密工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具