Storm 整合 Hbase

栏目: 数据库 · 发布时间: 7年前

1.依赖

<dependency>
      <groupId>org.apache.storm</groupId>
      <artifactId>storm-hbase</artifactId>
      <version>1.1.1</version>
      <exclusions>
        <exclusion>
          <groupId>com.google.guava</groupId>
          <artifactId>guava</artifactId>
        </exclusion>
      </exclusions>
      <type>jar</type>
    </dependency>
<dependency>
      <groupId>com.google.guava</groupId>
      <artifactId>guava</artifactId>
      <version>21.0</version>
    </dependency>
<dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>${hadoop.version}</version>
      <exclusions>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-hdfs</artifactId>
      <version>${hadoop.version}</version>
      <exclusions>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>${hadoop.version}</version>
      <exclusions>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

2. hbase 创建表

create 'wc','cf'

3. 实现

package com.waiting;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.io.IOException;
import java.util.*;

public class LocalWordCountHbaseStormTopology {


    public static class DataSourceSpout extends BaseRichSpout {

        private SpoutOutputCollector collector;

        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.collector = collector;
        }


        public static final String[] words = new String[]{"apple", "orange", "pineapple", "bannaer"};

        @Override
        public void nextTuple() {
             Random random = new Random();
             String word = words[random.nextInt(words.length)];

             this.collector.emit(new Values(word));

             System.out.println("word:" + word);

            Utils.sleep(1000);

        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {

            declarer.declare(new Fields("line")
            );
        }
    }

    public static class SplitBolt extends BaseRichBolt{

        private OutputCollector collector;
        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }

        @Override
        public void execute(Tuple input) {
            String word = input.getStringByField("line");
            this.collector.emit(new Values(word));
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
           declarer.declare(new Fields("word"));
        }
    }

    public static class CountBolt extends BaseRichBolt{

        private OutputCollector collector;
        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }

        Map<String, Integer> map = new HashMap<String, Integer>();
        @Override
        public void execute(Tuple input) {
            String word = input.getStringByField("word");
            Integer count = map.get(word);
            if(count == null){
                count = 0;
            }
            count ++;
            map.put(word, count);

            this.collector.emit(new Values(word, map.get(word)));
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word", "count"));
        }
    }




    public static class MyHBaseBolt extends BaseBasicBolt {
        private Connection connection;
        private Table table;

        @Override
        public void prepare(Map stormConf, TopologyContext context) {
            Configuration config = HBaseConfiguration.create();
            config.set("hbase.rootdir","hdfs://localhost:9000/hbase");
            try {
                connection = ConnectionFactory.createConnection(config);
//示例都是对同一个table进行操作,因此直接将Table对象的创建放在了prepare,在bolt执行过程中可以直接重用。
                table = connection.getTable(TableName.valueOf("wc"));
            } catch (IOException e) {
                //do something to handle exception
                e.printStackTrace();
            }
        }
        @Override
        public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
            String word = tuple.getStringByField("word");
            Integer count = tuple.getIntegerByField("count");
            try {
                //以各个单词作为row key
                Put put = new Put(Bytes.toBytes(word));
                //将被计数的单词写入cf:words列
                put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("word"), Bytes.toBytes(word));
                //将单词的计数写入cf:counts列
                put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("count"), Bytes.toBytes(count));
                table.put(put);
            } catch (IOException e) {
                //do something to handle exception
                e.printStackTrace();
            }
        }
        @Override
        public void cleanup() {
            //关闭table
            try {
                if(table != null) table.close();
            } catch (Exception e){
                //do something to handle exception
            } finally {
                //在finally中关闭connection
                try {
                    connection.close();
                } catch (IOException e) {
                    //do something to handle exception
                }
            }
        }
        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            //示例中本bolt不向外发射数据,所以没有再做声明
        }
    }


    public static void main(String[] args){

        Config config = new Config();

        Map<String, Object> hbaseConf = new HashMap<String, Object>();

        hbaseConf.put("hbase.rootdir", "hdfs://localhost:9000/hbase");
        config.put("hbase.conf", hbaseConf);

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("DataSourceSpout", new DataSourceSpout());
        builder.setBolt("SplitBolt", new SplitBolt()).shuffleGrouping("DataSourceSpout");
        builder.setBolt("CountBolt", new CountBolt()).shuffleGrouping("SplitBolt");

        builder.setBolt("MyHBaseBolt", new MyHBaseBolt()).shuffleGrouping("CountBolt");

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("LocalWordCountStormTopology", config, builder.createTopology());
    }
}
4551

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Agile Web Development with Rails, Third Edition

Agile Web Development with Rails, Third Edition

Sam Ruby、Dave Thomas、David Heinemeier Hansson / Pragmatic Bookshelf / 2009-03-17 / USD 43.95

Rails just keeps on changing. Rails 2, released in 2008, brings hundreds of improvements, including new support for RESTful applications, new generator options, and so on. And, as importantly, we’ve a......一起来看看 《Agile Web Development with Rails, Third Edition》 这本书的介绍吧!

SHA 加密
SHA 加密

SHA 加密工具

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具