1.依赖
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-hbase</artifactId> <version>1.1.1</version> <exclusions> <exclusion> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </exclusion> </exclusions> <type>jar</type> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>21.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency>
2. hbase 创建表
create 'wc','cf'
3. 实现
package com.waiting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Utils; import java.io.IOException; import java.util.*; public class LocalWordCountHbaseStormTopology { public static class DataSourceSpout extends BaseRichSpout { private SpoutOutputCollector collector; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } public static final String[] words = new String[]{"apple", "orange", "pineapple", "bannaer"}; @Override public void nextTuple() { Random random = new Random(); String word = words[random.nextInt(words.length)]; this.collector.emit(new Values(word)); System.out.println("word:" + word); Utils.sleep(1000); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("line") ); } } public static class SplitBolt extends BaseRichBolt{ private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple input) { String word = input.getStringByField("line"); this.collector.emit(new Values(word)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } } public static class CountBolt extends BaseRichBolt{ private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } Map<String, Integer> map = new HashMap<String, Integer>(); @Override public void execute(Tuple input) { String word = input.getStringByField("word"); Integer count = map.get(word); if(count == null){ count = 0; } count ++; map.put(word, count); this.collector.emit(new Values(word, map.get(word))); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } } public static class MyHBaseBolt extends BaseBasicBolt { private Connection connection; private Table table; @Override public void prepare(Map stormConf, TopologyContext context) { Configuration config = HBaseConfiguration.create(); config.set("hbase.rootdir","hdfs://localhost:9000/hbase"); try { connection = ConnectionFactory.createConnection(config); //示例都是对同一个table进行操作,因此直接将Table对象的创建放在了prepare,在bolt执行过程中可以直接重用。 table = connection.getTable(TableName.valueOf("wc")); } catch (IOException e) { //do something to handle exception e.printStackTrace(); } } @Override public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { String word = tuple.getStringByField("word"); Integer count = tuple.getIntegerByField("count"); try { //以各个单词作为row key Put put = new Put(Bytes.toBytes(word)); //将被计数的单词写入cf:words列 put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("word"), Bytes.toBytes(word)); //将单词的计数写入cf:counts列 put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("count"), Bytes.toBytes(count)); table.put(put); } catch (IOException e) { //do something to handle exception e.printStackTrace(); } } @Override public void cleanup() { //关闭table try { if(table != null) table.close(); } catch (Exception e){ //do something to handle exception } finally { //在finally中关闭connection try { connection.close(); } catch (IOException e) { //do something to handle exception } } } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { //示例中本bolt不向外发射数据,所以没有再做声明 } } public static void main(String[] args){ Config config = new Config(); Map<String, Object> hbaseConf = new HashMap<String, Object>(); hbaseConf.put("hbase.rootdir", "hdfs://localhost:9000/hbase"); config.put("hbase.conf", hbaseConf); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("DataSourceSpout", new DataSourceSpout()); builder.setBolt("SplitBolt", new SplitBolt()).shuffleGrouping("DataSourceSpout"); builder.setBolt("CountBolt", new CountBolt()).shuffleGrouping("SplitBolt"); builder.setBolt("MyHBaseBolt", new MyHBaseBolt()).shuffleGrouping("CountBolt"); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("LocalWordCountStormTopology", config, builder.createTopology()); } }4551
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- SpringBoot整合MybatisPlus的简单教程(简单整合)
- springmvc教程--整合mybatis开发(spring+springMVC+mybatis整合开发)
- springboot整合springsecurity从Hello World到源码解析(五):springsecurity+jwt整合restful服务
- SSM整合搭建(二)
- SSM整合
- storm 整合hdfs
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。