MapReduce实践 Youtube数据分析

栏目: 编程工具 · 发布时间: 5年前

内容简介:这篇博客是关于如何在Hadoop MapReduce中进行YouTube数据分析的。使用该数据集执行一些分析,并将提取一些有用的信息,例如YouTube上排名前10位的视频,他们上传了最多的视频。

这篇博客是关于如何在Hadoop MapReduce中进行YouTube数据分析的。

使用该数据集执行一些分析,并将提取一些有用的信息,例如YouTube上排名前10位的视频,他们上传了最多的视频。

数据

数据下载

数据展示

MapReduce实践 Youtube数据分析

数据说明

Column 1: Video id of 11 characters.

Column 2: uploader of the video

Column 3: Interval between the day of establishment of Youtube and the date of uploading of the video.

Column 4: Category of the video.

Column 5: Length of the video.

Column 6: Number of views for the video.

Column 7: Rating on the video.

Column 8: Number of ratings given for the video

Column 9: Number of comments done on the videos.

Column 10: Related video ids with the uploaded video.

问题1:寻找Top 5视频类别

Mapper

对每一行数据进行划分,统计各个视频类别的数量(Column 4)。数据集中部分数据缺失,因此忽略了划分后少于5个属性的数据。

public static class CategoryMapper extends Mapper<Object, Text, Text, IntWritable>
{
    private final static IntWritable one = new IntWritable(1); // 值为1
    private Text category = new Text();

    public void map(Object key, Text value, Context context) throws IOException, InterruptedException
    {
        String[] attributeArray = value.toString().split("\t");// 对字符串进行切分
        if (attributeArray.length > 5)  // 忽略属性值少于5的错误数据  
        {
            category.set(attributeArray[3]);
            context.write(category, one);
        }
    }
}

Combiner

Combiner的作用就是对map端的输出先做一次合并,以减少在map和reduce节点之间的数据传输量,以提高网络IO性能,是MapReduce的一种优化手段之一。

Combiner实质就是在本地端先运行的一次Reducer。

public static class SumReducer extends Reducer<Text, IntWritable, Text, IntWritable>
{
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException
    {
        int sum = 0;
        for (IntWritable val : values)
        {
            sum += val.get();

        }
        context.write(key, new IntWritable(sum));
    }
}

Reducer

因为需要对, [视频类别, 视频数] 数组进行 排序 比较,因此首先定义一个二元组类,包含视频类别和视频数,分别对应first和second。并定义了Comparable接口,用于后面排序的需要。

public static class TwoTuple implements Comparable<TwoTuple>
{
    public  String first;
    public  int second;

    public TwoTuple(String a, int b)
    {
        first = a;
        second = b;
    }

    public String toString()
    {
        return "(" + first + ", " + second + ")";
    }

    @Override
    public int compareTo(TwoTuple tt)
    {
        return second - tt.second;
    }
}

使用Reducer实现提取Top N值的算法。

首先需要介绍 setup() 函数和 cleanup() 函数,与 reduce() 函数不同,不会根据key的数目多次执行,只会执行1次。

setup()此方法被MapReduce框架仅且执行一次,在执行Map任务前,进行相关变量或者资源的集中初始化工作。若是将资源初始化工作放在方法map()中,导致Mapper任务在解析每一行输入时都会进行资源初始化工作,导致重复,程序运行效率不高。

cleanup()此方法被MapReduce框架仅且执行一次,在执行完毕Map任务后,进行相关变量或资源的释放工作。若是将释放资源工作放入方法map()中,也会导致Mapper任务在解析、处理每一行文本后释放资源,而且在下一行文本解析前还要重复初始化,导致反复重复,程序运行效率不高。

算法介绍

setup() 函数中,主要用来从配置中获取需要提取Top N的N值,并初始化 top[] 数组;

reduce() 函数中,计算出每个Category的视频总数后覆盖放入top[0]数组并进行排序;

cleanup() 函数中,将覆盖排序多次后的top数组写入output。

public static class TopNReducer extends Reducer<Text, IntWritable, Text, IntWritable>
{

    int len;
    TwoTuple[] top;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException
    {
        len =context.getConfiguration().getInt("N", 10);  // 从配置中获取top N的N值,若无则默认为10
        top = new TwoTuple[len + 1];
        for (int i=0; i<=len; i++)
        {
            top[i] = new TwoTuple("null", 0);
        }
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException
    {
        for (int i = len; i > 0; i--)
        {
            context.write(new Text(top[i].first), new IntWritable(top[i].second));
        }
    }

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException
    {
        int sum = 0;
        for (IntWritable val : values)
        {
            sum += val.get();

        }
        add(key.toString(), sum);
    }

    private void add(String key, int val)
    {
        top[0].first = key;
        top[0].second = val;  // 替换掉最小值
        Arrays.sort(top); // 排序,从小到大顺序
    }
}

main&conf

public static void main(String[] args) throws Exception
{
    Configuration conf = new Configuration();
    conf.addResource("classpath:/hadoop/core-site.xml");
    conf.addResource("classpath:/hadoop/hdfs-site.xml");
    conf.addResource("classpath:/hadoop/mapred-site.xml");
    conf.setInt("N", 5);
    // String[] otherArgs = new GenericOptionsParser(conf,
    // args).getRemainingArgs();
    String[] otherArgs = { "/youtube", "/youtube_category_Top5" };
    if (otherArgs.length != 2)
    {
        System.err.println("Usage: wordcount <in> <out>");
        System.exit(2);
    }
    Job job = new Job(conf, "youtube");
    job.setJarByClass(FindMaxCategory.class);
    job.setMapperClass(CategoryMapper.class);
    job.setCombinerClass(SumReducer.class);
//		job.setReducerClass(SumReducer.class);  // 统计每个类别的总量
    job.setReducerClass(TopNReducer.class);  // 统计TopN的类别的总量
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.setInputDirRecursive(job, true);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Python金融衍生品大数据分析:建模、模拟、校准与对冲

Python金融衍生品大数据分析:建模、模拟、校准与对冲

【德】Yves Hilpisch(伊夫·希尔皮斯科) / 蔡立耑 / 电子工业出版社 / 2017-8 / 99.00

Python 在衍生工具分析领域占据重要地位,使机构能够快速、有效地提供定价、交易及风险管理的结果。《Python金融衍生品大数据分析:建模、模拟、校准与对冲》精心介绍了有效定价期权的四个领域:基于巿场定价的过程、完善的巿场模型、数值方法及技术。书中的内容分为三个部分。第一部分着眼于影响股指期权价值的风险,以及股票和利率的相关实证发现。第二部分包括套利定价理论、离散及连续时间的风险中性定价,并介绍......一起来看看 《Python金融衍生品大数据分析:建模、模拟、校准与对冲》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试