MapReduce实践 Youtube数据分析

栏目: 编程工具 · 发布时间: 6年前

内容简介:这篇博客是关于如何在Hadoop MapReduce中进行YouTube数据分析的。使用该数据集执行一些分析,并将提取一些有用的信息,例如YouTube上排名前10位的视频,他们上传了最多的视频。

这篇博客是关于如何在Hadoop MapReduce中进行YouTube数据分析的。

使用该数据集执行一些分析,并将提取一些有用的信息,例如YouTube上排名前10位的视频,他们上传了最多的视频。

数据

数据下载

数据展示

MapReduce实践 Youtube数据分析

数据说明

Column 1: Video id of 11 characters.

Column 2: uploader of the video

Column 3: Interval between the day of establishment of Youtube and the date of uploading of the video.

Column 4: Category of the video.

Column 5: Length of the video.

Column 6: Number of views for the video.

Column 7: Rating on the video.

Column 8: Number of ratings given for the video

Column 9: Number of comments done on the videos.

Column 10: Related video ids with the uploaded video.

问题1:寻找Top 5视频类别

Mapper

对每一行数据进行划分,统计各个视频类别的数量(Column 4)。数据集中部分数据缺失,因此忽略了划分后少于5个属性的数据。

public static class CategoryMapper extends Mapper<Object, Text, Text, IntWritable>
{
    private final static IntWritable one = new IntWritable(1); // 值为1
    private Text category = new Text();

    public void map(Object key, Text value, Context context) throws IOException, InterruptedException
    {
        String[] attributeArray = value.toString().split("\t");// 对字符串进行切分
        if (attributeArray.length > 5)  // 忽略属性值少于5的错误数据  
        {
            category.set(attributeArray[3]);
            context.write(category, one);
        }
    }
}

Combiner

Combiner的作用就是对map端的输出先做一次合并,以减少在map和reduce节点之间的数据传输量,以提高网络IO性能,是MapReduce的一种优化手段之一。

Combiner实质就是在本地端先运行的一次Reducer。

public static class SumReducer extends Reducer<Text, IntWritable, Text, IntWritable>
{
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException
    {
        int sum = 0;
        for (IntWritable val : values)
        {
            sum += val.get();

        }
        context.write(key, new IntWritable(sum));
    }
}

Reducer

因为需要对, [视频类别, 视频数] 数组进行 排序 比较,因此首先定义一个二元组类,包含视频类别和视频数,分别对应first和second。并定义了Comparable接口,用于后面排序的需要。

public static class TwoTuple implements Comparable<TwoTuple>
{
    public  String first;
    public  int second;

    public TwoTuple(String a, int b)
    {
        first = a;
        second = b;
    }

    public String toString()
    {
        return "(" + first + ", " + second + ")";
    }

    @Override
    public int compareTo(TwoTuple tt)
    {
        return second - tt.second;
    }
}

使用Reducer实现提取Top N值的算法。

首先需要介绍 setup() 函数和 cleanup() 函数,与 reduce() 函数不同,不会根据key的数目多次执行,只会执行1次。

setup()此方法被MapReduce框架仅且执行一次,在执行Map任务前,进行相关变量或者资源的集中初始化工作。若是将资源初始化工作放在方法map()中,导致Mapper任务在解析每一行输入时都会进行资源初始化工作,导致重复,程序运行效率不高。

cleanup()此方法被MapReduce框架仅且执行一次,在执行完毕Map任务后,进行相关变量或资源的释放工作。若是将释放资源工作放入方法map()中,也会导致Mapper任务在解析、处理每一行文本后释放资源,而且在下一行文本解析前还要重复初始化,导致反复重复,程序运行效率不高。

算法介绍

setup() 函数中,主要用来从配置中获取需要提取Top N的N值,并初始化 top[] 数组;

reduce() 函数中,计算出每个Category的视频总数后覆盖放入top[0]数组并进行排序;

cleanup() 函数中,将覆盖排序多次后的top数组写入output。

public static class TopNReducer extends Reducer<Text, IntWritable, Text, IntWritable>
{

    int len;
    TwoTuple[] top;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException
    {
        len =context.getConfiguration().getInt("N", 10);  // 从配置中获取top N的N值,若无则默认为10
        top = new TwoTuple[len + 1];
        for (int i=0; i<=len; i++)
        {
            top[i] = new TwoTuple("null", 0);
        }
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException
    {
        for (int i = len; i > 0; i--)
        {
            context.write(new Text(top[i].first), new IntWritable(top[i].second));
        }
    }

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException
    {
        int sum = 0;
        for (IntWritable val : values)
        {
            sum += val.get();

        }
        add(key.toString(), sum);
    }

    private void add(String key, int val)
    {
        top[0].first = key;
        top[0].second = val;  // 替换掉最小值
        Arrays.sort(top); // 排序,从小到大顺序
    }
}

main&conf

public static void main(String[] args) throws Exception
{
    Configuration conf = new Configuration();
    conf.addResource("classpath:/hadoop/core-site.xml");
    conf.addResource("classpath:/hadoop/hdfs-site.xml");
    conf.addResource("classpath:/hadoop/mapred-site.xml");
    conf.setInt("N", 5);
    // String[] otherArgs = new GenericOptionsParser(conf,
    // args).getRemainingArgs();
    String[] otherArgs = { "/youtube", "/youtube_category_Top5" };
    if (otherArgs.length != 2)
    {
        System.err.println("Usage: wordcount <in> <out>");
        System.exit(2);
    }
    Job job = new Job(conf, "youtube");
    job.setJarByClass(FindMaxCategory.class);
    job.setMapperClass(CategoryMapper.class);
    job.setCombinerClass(SumReducer.class);
//		job.setReducerClass(SumReducer.class);  // 统计每个类别的总量
    job.setReducerClass(TopNReducer.class);  // 统计TopN的类别的总量
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.setInputDirRecursive(job, true);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Web Design for ROI

Web Design for ROI

Lance Loveday、Sandra Niehaus / New Riders Press / 2007-10-27 / USD 39.99

Your web site is a business--design it like one. Billions of dollars in spending decisions are influenced by web sites. So why aren't businesses laser-focused on designing their sites to maximize thei......一起来看看 《Web Design for ROI》 这本书的介绍吧!

SHA 加密
SHA 加密

SHA 加密工具

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具