MapReduce编程实例

栏目: 服务器 · 发布时间: 6年前

MapReducer

public class RatingMR {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();

        @SuppressWarnings("deprecation")
        Job job = new Job(conf, "videorating");
        job.setJarByClass(RatingMR.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FloatWritable.class);
        //设置 map ,reduce 类及 class
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FloatWritable.class);

        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        //设置输入输出路径,这里指定的输路径是 HDFS,输出路径是  Linux  的本地文件系统
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        FileInputFormat.addInputPath(job,
            new Path("hdfs://localhost:9000/youtubedata.txt"));
        FileOutputFormat.setOutputPath(job, new Path("/tmp/hadoop/out"));
        job.waitForCompletion(true);
    }

    //Map类,封装KV键值对 <k1,v1,k2,v2> , k1 是偏移量,v1 是这行数据,输出类型是这行文本 k2 ,浮点类型 v2
    public static class Map extends Mapper<LongWritable, Text, Text, FloatWritable> {
        //创建Text,FloatWritable 对象
        private Text video_name = new Text();
        private FloatWritable rating = new FloatWritable();

        public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
            String line = value.toString();
            String[] str = line.split("\t");

            //使用正则表达式匹配字符串只应包含浮点数,过滤掉小于 7 的无效字段
            if (str.length > 7) {
                video_name.set(str[0]);

                if (str[6].matches("\\d+.+")) {
                    //强制类型转换                    
                    float f = Float.parseFloat(str[6]);
                    rating.set(f);
                }
            }

            context.write(video_name, rating);
        }
    }

    public static class Reduce extends Reducer<Text, FloatWritable, Text, FloatWritable> {
        public void reduce(Text key, Iterable<FloatWritable> values,
            Context context) throws IOException, InterruptedException {
            float sum = 0;
            int l = 0;

            for (FloatWritable val : values) {
                l += 1;
                //累加value
                sum += val.get();
            }

            sum = sum / l;
            //取平均值
            context.write(key, new FloatWritable(sum));
        }
    }
}
public class VideoCount {
    //主函数
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();

        @SuppressWarnings("deprecation")
        Job job = new Job(conf, "categories");
        //设置生产 jar 包所使用的类
        job.setJarByClass(VideoCount.class);
        //设置 Map 类的输入输出类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //设置 Reduce 类的输入输出类型    
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //设置 Map, Reduce 类的 class 参数
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        //指定格式化用的类型
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        FileInputFormat.addInputPath(job,
            new Path("hdfs://localhost:9000/youtubedata.txt"));
        FileOutputFormat.setOutputPath(job, new Path("/tmp/hadoop/out2"));
        //等待完成
        job.waitForCompletion(true);
    }

    public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);

        //构造文本类Text对象、IntWritable 对象,也可以直接以匿名函数的方式创建
        private Text tx = new Text();

        //map 的逻辑,使用tab“\ t”分隔符来分割行,并将值存储在String Array中,以使一行中的所有列都存储在字符串数组中
        public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
            //拿到每一行数据
            String line = value.toString();
            String[] str = line.split("\t");

            //过滤字段
            if (str.length > 5) {
                tx.set(str[3]);
            }

            //输出key,value
            context.write(tx, one);
        }
    }

    //编写 reduce,接收 map 阶段传来的 kv 键值对,输出的类型和传进来的类型一致
    public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
        //reduce????
        public void reduce(Text key, Iterable<IntWritable> values,
            Context context) throws IOException, InterruptedException {
            int sum = 0;

            //累加求和评分
            for (IntWritable v : values) {
                sum += v.get()
            }

            //写出去
            context.write(key, new IntWritable(sum));
        }
    }
}

以上所述就是小编给大家介绍的《MapReduce编程实例》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

JavaScript实战

JavaScript实战

Frank W. Zammetti / 张皛珏 / 人民邮电出版社 / 2009-8 / 59.00元

随着Ajax的兴起,JavaScript迅速地从改进网站的配角晋升为开发专业级高质量应用的主角,成为了Web开发中不可缺少的一员。 本书主要通过10个具体项目,包括构建可扩展的JavaScript库、使用GUI窗口小部件框架、开发支持拖放的购物车和编写JavaScript游戏等,讲述JavaScript最佳实践、Ajax技术,以及一些流行的JavaScript库,如Rico、Dojo、scr......一起来看看 《JavaScript实战》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

MD5 加密
MD5 加密

MD5 加密工具