MapReduce编程实例

栏目: 服务器 · 发布时间: 5年前

MapReducer

public class RatingMR {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();

        @SuppressWarnings("deprecation")
        Job job = new Job(conf, "videorating");
        job.setJarByClass(RatingMR.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FloatWritable.class);
        //设置 map ,reduce 类及 class
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FloatWritable.class);

        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        //设置输入输出路径,这里指定的输路径是 HDFS,输出路径是  Linux  的本地文件系统
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        FileInputFormat.addInputPath(job,
            new Path("hdfs://localhost:9000/youtubedata.txt"));
        FileOutputFormat.setOutputPath(job, new Path("/tmp/hadoop/out"));
        job.waitForCompletion(true);
    }

    //Map类,封装KV键值对 <k1,v1,k2,v2> , k1 是偏移量,v1 是这行数据,输出类型是这行文本 k2 ,浮点类型 v2
    public static class Map extends Mapper<LongWritable, Text, Text, FloatWritable> {
        //创建Text,FloatWritable 对象
        private Text video_name = new Text();
        private FloatWritable rating = new FloatWritable();

        public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
            String line = value.toString();
            String[] str = line.split("\t");

            //使用正则表达式匹配字符串只应包含浮点数,过滤掉小于 7 的无效字段
            if (str.length > 7) {
                video_name.set(str[0]);

                if (str[6].matches("\\d+.+")) {
                    //强制类型转换                    
                    float f = Float.parseFloat(str[6]);
                    rating.set(f);
                }
            }

            context.write(video_name, rating);
        }
    }

    public static class Reduce extends Reducer<Text, FloatWritable, Text, FloatWritable> {
        public void reduce(Text key, Iterable<FloatWritable> values,
            Context context) throws IOException, InterruptedException {
            float sum = 0;
            int l = 0;

            for (FloatWritable val : values) {
                l += 1;
                //累加value
                sum += val.get();
            }

            sum = sum / l;
            //取平均值
            context.write(key, new FloatWritable(sum));
        }
    }
}
public class VideoCount {
    //主函数
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();

        @SuppressWarnings("deprecation")
        Job job = new Job(conf, "categories");
        //设置生产 jar 包所使用的类
        job.setJarByClass(VideoCount.class);
        //设置 Map 类的输入输出类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //设置 Reduce 类的输入输出类型    
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //设置 Map, Reduce 类的 class 参数
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        //指定格式化用的类型
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        FileInputFormat.addInputPath(job,
            new Path("hdfs://localhost:9000/youtubedata.txt"));
        FileOutputFormat.setOutputPath(job, new Path("/tmp/hadoop/out2"));
        //等待完成
        job.waitForCompletion(true);
    }

    public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);

        //构造文本类Text对象、IntWritable 对象,也可以直接以匿名函数的方式创建
        private Text tx = new Text();

        //map 的逻辑,使用tab“\ t”分隔符来分割行,并将值存储在String Array中,以使一行中的所有列都存储在字符串数组中
        public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
            //拿到每一行数据
            String line = value.toString();
            String[] str = line.split("\t");

            //过滤字段
            if (str.length > 5) {
                tx.set(str[3]);
            }

            //输出key,value
            context.write(tx, one);
        }
    }

    //编写 reduce,接收 map 阶段传来的 kv 键值对,输出的类型和传进来的类型一致
    public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
        //reduce????
        public void reduce(Text key, Iterable<IntWritable> values,
            Context context) throws IOException, InterruptedException {
            int sum = 0;

            //累加求和评分
            for (IntWritable v : values) {
                sum += v.get()
            }

            //写出去
            context.write(key, new IntWritable(sum));
        }
    }
}

以上所述就是小编给大家介绍的《MapReduce编程实例》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Agile Web Application Development with Yii 1.1 and PHP5

Agile Web Application Development with Yii 1.1 and PHP5

Jeffrey Winesett / Packt Publishing / 2010-08-27

In order to understand the framework in the context of a real-world application, we need to build something that will more closely resemble the types of applications web developers actually have to bu......一起来看看 《Agile Web Application Development with Yii 1.1 and PHP5》 这本书的介绍吧!

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

在线进制转换器
在线进制转换器

各进制数互转换器

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具