Storm 整合 Hbase

栏目: 数据库 · 发布时间: 7年前

1.依赖

<dependency>
      <groupId>org.apache.storm</groupId>
      <artifactId>storm-hbase</artifactId>
      <version>1.1.1</version>
      <exclusions>
        <exclusion>
          <groupId>com.google.guava</groupId>
          <artifactId>guava</artifactId>
        </exclusion>
      </exclusions>
      <type>jar</type>
    </dependency>
<dependency>
      <groupId>com.google.guava</groupId>
      <artifactId>guava</artifactId>
      <version>21.0</version>
    </dependency>
<dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>${hadoop.version}</version>
      <exclusions>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-hdfs</artifactId>
      <version>${hadoop.version}</version>
      <exclusions>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>${hadoop.version}</version>
      <exclusions>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

2. hbase 创建表

create 'wc','cf'

3. 实现

package com.waiting;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.io.IOException;
import java.util.*;

public class LocalWordCountHbaseStormTopology {


    public static class DataSourceSpout extends BaseRichSpout {

        private SpoutOutputCollector collector;

        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.collector = collector;
        }


        public static final String[] words = new String[]{"apple", "orange", "pineapple", "bannaer"};

        @Override
        public void nextTuple() {
             Random random = new Random();
             String word = words[random.nextInt(words.length)];

             this.collector.emit(new Values(word));

             System.out.println("word:" + word);

            Utils.sleep(1000);

        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {

            declarer.declare(new Fields("line")
            );
        }
    }

    public static class SplitBolt extends BaseRichBolt{

        private OutputCollector collector;
        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }

        @Override
        public void execute(Tuple input) {
            String word = input.getStringByField("line");
            this.collector.emit(new Values(word));
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
           declarer.declare(new Fields("word"));
        }
    }

    public static class CountBolt extends BaseRichBolt{

        private OutputCollector collector;
        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }

        Map<String, Integer> map = new HashMap<String, Integer>();
        @Override
        public void execute(Tuple input) {
            String word = input.getStringByField("word");
            Integer count = map.get(word);
            if(count == null){
                count = 0;
            }
            count ++;
            map.put(word, count);

            this.collector.emit(new Values(word, map.get(word)));
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word", "count"));
        }
    }




    public static class MyHBaseBolt extends BaseBasicBolt {
        private Connection connection;
        private Table table;

        @Override
        public void prepare(Map stormConf, TopologyContext context) {
            Configuration config = HBaseConfiguration.create();
            config.set("hbase.rootdir","hdfs://localhost:9000/hbase");
            try {
                connection = ConnectionFactory.createConnection(config);
//示例都是对同一个table进行操作,因此直接将Table对象的创建放在了prepare,在bolt执行过程中可以直接重用。
                table = connection.getTable(TableName.valueOf("wc"));
            } catch (IOException e) {
                //do something to handle exception
                e.printStackTrace();
            }
        }
        @Override
        public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
            String word = tuple.getStringByField("word");
            Integer count = tuple.getIntegerByField("count");
            try {
                //以各个单词作为row key
                Put put = new Put(Bytes.toBytes(word));
                //将被计数的单词写入cf:words列
                put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("word"), Bytes.toBytes(word));
                //将单词的计数写入cf:counts列
                put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("count"), Bytes.toBytes(count));
                table.put(put);
            } catch (IOException e) {
                //do something to handle exception
                e.printStackTrace();
            }
        }
        @Override
        public void cleanup() {
            //关闭table
            try {
                if(table != null) table.close();
            } catch (Exception e){
                //do something to handle exception
            } finally {
                //在finally中关闭connection
                try {
                    connection.close();
                } catch (IOException e) {
                    //do something to handle exception
                }
            }
        }
        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            //示例中本bolt不向外发射数据,所以没有再做声明
        }
    }


    public static void main(String[] args){

        Config config = new Config();

        Map<String, Object> hbaseConf = new HashMap<String, Object>();

        hbaseConf.put("hbase.rootdir", "hdfs://localhost:9000/hbase");
        config.put("hbase.conf", hbaseConf);

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("DataSourceSpout", new DataSourceSpout());
        builder.setBolt("SplitBolt", new SplitBolt()).shuffleGrouping("DataSourceSpout");
        builder.setBolt("CountBolt", new CountBolt()).shuffleGrouping("SplitBolt");

        builder.setBolt("MyHBaseBolt", new MyHBaseBolt()).shuffleGrouping("CountBolt");

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("LocalWordCountStormTopology", config, builder.createTopology());
    }
}
4551

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Web Applications (Hacking Exposed)

Web Applications (Hacking Exposed)

Joel Scambray、Mike Shema / McGraw-Hill Osborne Media / 2002-06-19 / USD 49.99

Get in-depth coverage of Web application platforms and their vulnerabilities, presented the same popular format as the international bestseller, Hacking Exposed. Covering hacking scenarios across diff......一起来看看 《Web Applications (Hacking Exposed)》 这本书的介绍吧!

MD5 加密
MD5 加密

MD5 加密工具

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具