MapReduce编程实例

栏目: 服务器 · 发布时间: 5年前

MapReducer

public class RatingMR {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();

        @SuppressWarnings("deprecation")
        Job job = new Job(conf, "videorating");
        job.setJarByClass(RatingMR.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FloatWritable.class);
        //设置 map ,reduce 类及 class
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FloatWritable.class);

        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        //设置输入输出路径,这里指定的输路径是 HDFS,输出路径是  Linux  的本地文件系统
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        FileInputFormat.addInputPath(job,
            new Path("hdfs://localhost:9000/youtubedata.txt"));
        FileOutputFormat.setOutputPath(job, new Path("/tmp/hadoop/out"));
        job.waitForCompletion(true);
    }

    //Map类,封装KV键值对 <k1,v1,k2,v2> , k1 是偏移量,v1 是这行数据,输出类型是这行文本 k2 ,浮点类型 v2
    public static class Map extends Mapper<LongWritable, Text, Text, FloatWritable> {
        //创建Text,FloatWritable 对象
        private Text video_name = new Text();
        private FloatWritable rating = new FloatWritable();

        public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
            String line = value.toString();
            String[] str = line.split("\t");

            //使用正则表达式匹配字符串只应包含浮点数,过滤掉小于 7 的无效字段
            if (str.length > 7) {
                video_name.set(str[0]);

                if (str[6].matches("\\d+.+")) {
                    //强制类型转换                    
                    float f = Float.parseFloat(str[6]);
                    rating.set(f);
                }
            }

            context.write(video_name, rating);
        }
    }

    public static class Reduce extends Reducer<Text, FloatWritable, Text, FloatWritable> {
        public void reduce(Text key, Iterable<FloatWritable> values,
            Context context) throws IOException, InterruptedException {
            float sum = 0;
            int l = 0;

            for (FloatWritable val : values) {
                l += 1;
                //累加value
                sum += val.get();
            }

            sum = sum / l;
            //取平均值
            context.write(key, new FloatWritable(sum));
        }
    }
}
public class VideoCount {
    //主函数
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();

        @SuppressWarnings("deprecation")
        Job job = new Job(conf, "categories");
        //设置生产 jar 包所使用的类
        job.setJarByClass(VideoCount.class);
        //设置 Map 类的输入输出类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //设置 Reduce 类的输入输出类型    
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //设置 Map, Reduce 类的 class 参数
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        //指定格式化用的类型
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        FileInputFormat.addInputPath(job,
            new Path("hdfs://localhost:9000/youtubedata.txt"));
        FileOutputFormat.setOutputPath(job, new Path("/tmp/hadoop/out2"));
        //等待完成
        job.waitForCompletion(true);
    }

    public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);

        //构造文本类Text对象、IntWritable 对象,也可以直接以匿名函数的方式创建
        private Text tx = new Text();

        //map 的逻辑,使用tab“\ t”分隔符来分割行,并将值存储在String Array中,以使一行中的所有列都存储在字符串数组中
        public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
            //拿到每一行数据
            String line = value.toString();
            String[] str = line.split("\t");

            //过滤字段
            if (str.length > 5) {
                tx.set(str[3]);
            }

            //输出key,value
            context.write(tx, one);
        }
    }

    //编写 reduce,接收 map 阶段传来的 kv 键值对,输出的类型和传进来的类型一致
    public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
        //reduce????
        public void reduce(Text key, Iterable<IntWritable> values,
            Context context) throws IOException, InterruptedException {
            int sum = 0;

            //累加求和评分
            for (IntWritable v : values) {
                sum += v.get()
            }

            //写出去
            context.write(key, new IntWritable(sum));
        }
    }
}

以上所述就是小编给大家介绍的《MapReduce编程实例》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

创业维艰

创业维艰

本·霍洛维茨 Ben Horowitz / 杨晓红、钟莉婷 / 中信出版社 / 2015-2 / 49

本·霍洛维茨,硅谷顶级投资人,与网景之父马克·安德森联手合作18年,有着丰富的创业和管理经验。2009年创立风险投资公司A16Z,被外媒誉为“硅谷最牛的50个天使投资人”之一,先后在初期投资了Facebook、Twitter、Groupon、Skype,是诸多硅谷新贵的创业导师。 在《创业维艰》中,本·霍洛维茨从自己的创业经历讲起,以自己在硅谷近20余年的创业、管理和投资经验,对创业公司(尤......一起来看看 《创业维艰》 这本书的介绍吧!

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器