1.依赖
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-hdfs</artifactId> <version>${storm.version}</version> <type>jar</type> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency>
2. 代码
package com.waiting; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.hdfs.bolt.HdfsBolt; import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat; import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat; import org.apache.storm.hdfs.bolt.format.FileNameFormat; import org.apache.storm.hdfs.bolt.format.RecordFormat; import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy; import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy; import org.apache.storm.hdfs.bolt.sync.SyncPolicy; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.ITuple; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Utils; import java.util.HashMap; import java.util.Map; import java.util.Random; public class LocalWordCountHDFSStormTopology { public static class DataSourceSpout extends BaseRichSpout { private SpoutOutputCollector collector; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } public static final String[] words = new String[]{"apple", "orange", "pineapple", "bannaer"}; @Override public void nextTuple() { Random random = new Random(); String word = words[random.nextInt(words.length)]; this.collector.emit(new Values(word)); System.out.println("word:" + word); Utils.sleep(1000); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("line") ); } } public static class SplitBolt extends BaseRichBolt{ private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple input) { String word = input.getStringByField("line"); this.collector.emit(new Values(word)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } } public static void main(String[] args){ TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("DataSourceSpout", new DataSourceSpout()); builder.setBolt("SplitBolt", new SplitBolt()).shuffleGrouping("DataSourceSpout"); // use "|" instead of "," for field delimiter RecordFormat format = new DelimitedRecordFormat() .withFieldDelimiter("|"); // sync the filesystem after every 1k tuples SyncPolicy syncPolicy = new CountSyncPolicy(10); // rotate files when they reach 5MB FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB); FileNameFormat fileNameFormat = new DefaultFileNameFormat() .withPath("/foo/"); HdfsBolt bolt = new HdfsBolt() .withFsUrl("hdfs://localhost:9000") .withFileNameFormat(fileNameFormat) .withRecordFormat(format) .withRotationPolicy(rotationPolicy) .withSyncPolicy(syncPolicy); builder.setBolt("HdfsBolt", bolt).shuffleGrouping("SplitBolt"); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("LocalWordCountStormTopology", new Config(), builder.createTopology()); } }4542
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- SpringBoot整合MybatisPlus的简单教程(简单整合)
- springmvc教程--整合mybatis开发(spring+springMVC+mybatis整合开发)
- springboot整合springsecurity从Hello World到源码解析(五):springsecurity+jwt整合restful服务
- SSM整合搭建(二)
- SSM整合
- Storm 整合 Hbase
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。