MapReduce编程实例

栏目: 服务器 · 发布时间: 5年前

MapReducer

public class RatingMR {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();

        @SuppressWarnings("deprecation")
        Job job = new Job(conf, "videorating");
        job.setJarByClass(RatingMR.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FloatWritable.class);
        //设置 map ,reduce 类及 class
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FloatWritable.class);

        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        //设置输入输出路径,这里指定的输路径是 HDFS,输出路径是  Linux  的本地文件系统
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        FileInputFormat.addInputPath(job,
            new Path("hdfs://localhost:9000/youtubedata.txt"));
        FileOutputFormat.setOutputPath(job, new Path("/tmp/hadoop/out"));
        job.waitForCompletion(true);
    }

    //Map类,封装KV键值对 <k1,v1,k2,v2> , k1 是偏移量,v1 是这行数据,输出类型是这行文本 k2 ,浮点类型 v2
    public static class Map extends Mapper<LongWritable, Text, Text, FloatWritable> {
        //创建Text,FloatWritable 对象
        private Text video_name = new Text();
        private FloatWritable rating = new FloatWritable();

        public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
            String line = value.toString();
            String[] str = line.split("\t");

            //使用正则表达式匹配字符串只应包含浮点数,过滤掉小于 7 的无效字段
            if (str.length > 7) {
                video_name.set(str[0]);

                if (str[6].matches("\\d+.+")) {
                    //强制类型转换                    
                    float f = Float.parseFloat(str[6]);
                    rating.set(f);
                }
            }

            context.write(video_name, rating);
        }
    }

    public static class Reduce extends Reducer<Text, FloatWritable, Text, FloatWritable> {
        public void reduce(Text key, Iterable<FloatWritable> values,
            Context context) throws IOException, InterruptedException {
            float sum = 0;
            int l = 0;

            for (FloatWritable val : values) {
                l += 1;
                //累加value
                sum += val.get();
            }

            sum = sum / l;
            //取平均值
            context.write(key, new FloatWritable(sum));
        }
    }
}
public class VideoCount {
    //主函数
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();

        @SuppressWarnings("deprecation")
        Job job = new Job(conf, "categories");
        //设置生产 jar 包所使用的类
        job.setJarByClass(VideoCount.class);
        //设置 Map 类的输入输出类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //设置 Reduce 类的输入输出类型    
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //设置 Map, Reduce 类的 class 参数
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        //指定格式化用的类型
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        FileInputFormat.addInputPath(job,
            new Path("hdfs://localhost:9000/youtubedata.txt"));
        FileOutputFormat.setOutputPath(job, new Path("/tmp/hadoop/out2"));
        //等待完成
        job.waitForCompletion(true);
    }

    public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);

        //构造文本类Text对象、IntWritable 对象,也可以直接以匿名函数的方式创建
        private Text tx = new Text();

        //map 的逻辑,使用tab“\ t”分隔符来分割行,并将值存储在String Array中,以使一行中的所有列都存储在字符串数组中
        public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
            //拿到每一行数据
            String line = value.toString();
            String[] str = line.split("\t");

            //过滤字段
            if (str.length > 5) {
                tx.set(str[3]);
            }

            //输出key,value
            context.write(tx, one);
        }
    }

    //编写 reduce,接收 map 阶段传来的 kv 键值对,输出的类型和传进来的类型一致
    public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
        //reduce????
        public void reduce(Text key, Iterable<IntWritable> values,
            Context context) throws IOException, InterruptedException {
            int sum = 0;

            //累加求和评分
            for (IntWritable v : values) {
                sum += v.get()
            }

            //写出去
            context.write(key, new IntWritable(sum));
        }
    }
}

以上所述就是小编给大家介绍的《MapReduce编程实例》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

参与感

参与感

黎万强 / 中信出版社 / 2014-8 / 56.00元

◆雷军亲笔作序,小米联合创始人黎万强著。 ◆揭开小米4年600亿奇迹背后的理念、方法和案例。 ◆了解小米,必读本书! ◆迄今为止关于小米最权威、最透彻、最全面的著作! ◆新一代营销圣经。 ◆2014年最重磅图书! ◆再掀企业界全民学习小米热潮! ◆引爆出版界、财经界、IT界、科技界大震荡的“现象级热书”! ◆全书四色印刷,包含46张海报级插图。 ......一起来看看 《参与感》 这本书的介绍吧!

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具