MapReducer
public class RatingMR { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); @SuppressWarnings("deprecation") Job job = new Job(conf, "videorating"); job.setJarByClass(RatingMR.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FloatWritable.class); //设置 map ,reduce 类及 class job.setOutputKeyClass(Text.class); job.setOutputValueClass(FloatWritable.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); //设置输入输出路径,这里指定的输路径是 HDFS,输出路径是 Linux 的本地文件系统 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/youtubedata.txt")); FileOutputFormat.setOutputPath(job, new Path("/tmp/hadoop/out")); job.waitForCompletion(true); } //Map类,封装KV键值对 <k1,v1,k2,v2> , k1 是偏移量,v1 是这行数据,输出类型是这行文本 k2 ,浮点类型 v2 public static class Map extends Mapper<LongWritable, Text, Text, FloatWritable> { //创建Text,FloatWritable 对象 private Text video_name = new Text(); private FloatWritable rating = new FloatWritable(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] str = line.split("\t"); //使用正则表达式匹配字符串只应包含浮点数,过滤掉小于 7 的无效字段 if (str.length > 7) { video_name.set(str[0]); if (str[6].matches("\\d+.+")) { //强制类型转换 float f = Float.parseFloat(str[6]); rating.set(f); } } context.write(video_name, rating); } } public static class Reduce extends Reducer<Text, FloatWritable, Text, FloatWritable> { public void reduce(Text key, Iterable<FloatWritable> values, Context context) throws IOException, InterruptedException { float sum = 0; int l = 0; for (FloatWritable val : values) { l += 1; //累加value sum += val.get(); } sum = sum / l; //取平均值 context.write(key, new FloatWritable(sum)); } } }
public class VideoCount { //主函数 public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); @SuppressWarnings("deprecation") Job job = new Job(conf, "categories"); //设置生产 jar 包所使用的类 job.setJarByClass(VideoCount.class); //设置 Map 类的输入输出类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //设置 Reduce 类的输入输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //设置 Map, Reduce 类的 class 参数 job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); //指定格式化用的类型 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/youtubedata.txt")); FileOutputFormat.setOutputPath(job, new Path("/tmp/hadoop/out2")); //等待完成 job.waitForCompletion(true); } public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); //构造文本类Text对象、IntWritable 对象,也可以直接以匿名函数的方式创建 private Text tx = new Text(); //map 的逻辑,使用tab“\ t”分隔符来分割行,并将值存储在String Array中,以使一行中的所有列都存储在字符串数组中 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //拿到每一行数据 String line = value.toString(); String[] str = line.split("\t"); //过滤字段 if (str.length > 5) { tx.set(str[3]); } //输出key,value context.write(tx, one); } } //编写 reduce,接收 map 阶段传来的 kv 键值对,输出的类型和传进来的类型一致 public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> { //reduce???? public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; //累加求和评分 for (IntWritable v : values) { sum += v.get() } //写出去 context.write(key, new IntWritable(sum)); } } }
以上所述就是小编给大家介绍的《MapReduce编程实例》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- python编程(类变量和实例变量)
- Python网络编程socket模块实例解析
- Python网络编程之socket模块基础实例
- Python多进程并发与多线程并发编程实例总结
- 木兰编程语言重现——儿歌查询实例,引用模块、字符串列表操作
- 木兰编程语言重现:完善函数功能,常用字拆分数据处理实例
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。