内容简介:这篇博客是关于如何在Hadoop MapReduce中进行YouTube数据分析的。使用该数据集执行一些分析,并将提取一些有用的信息,例如YouTube上排名前10位的视频,他们上传了最多的视频。
这篇博客是关于如何在Hadoop MapReduce中进行YouTube数据分析的。
使用该数据集执行一些分析,并将提取一些有用的信息,例如YouTube上排名前10位的视频,他们上传了最多的视频。
数据
数据展示
数据说明
Column 1: Video id of 11 characters.
Column 2: uploader of the video
Column 3: Interval between the day of establishment of Youtube and the date of uploading of the video.
Column 4: Category of the video.
Column 5: Length of the video.
Column 6: Number of views for the video.
Column 7: Rating on the video.
Column 8: Number of ratings given for the video
Column 9: Number of comments done on the videos.
Column 10: Related video ids with the uploaded video.
问题1:寻找Top 5视频类别
Mapper
对每一行数据进行划分,统计各个视频类别的数量(Column 4)。数据集中部分数据缺失,因此忽略了划分后少于5个属性的数据。
public static class CategoryMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); // 值为1 private Text category = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[] attributeArray = value.toString().split("\t");// 对字符串进行切分 if (attributeArray.length > 5) // 忽略属性值少于5的错误数据 { category.set(attributeArray[3]); context.write(category, one); } } }
Combiner
Combiner的作用就是对map端的输出先做一次合并,以减少在map和reduce节点之间的数据传输量,以提高网络IO性能,是MapReduce的一种优化手段之一。
Combiner实质就是在本地端先运行的一次Reducer。
public static class SumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } context.write(key, new IntWritable(sum)); } }
Reducer
因为需要对, [视频类别, 视频数]
数组进行 排序 比较,因此首先定义一个二元组类,包含视频类别和视频数,分别对应first和second。并定义了Comparable接口,用于后面排序的需要。
public static class TwoTuple implements Comparable<TwoTuple> { public String first; public int second; public TwoTuple(String a, int b) { first = a; second = b; } public String toString() { return "(" + first + ", " + second + ")"; } @Override public int compareTo(TwoTuple tt) { return second - tt.second; } }
使用Reducer实现提取Top N值的算法。
首先需要介绍 setup()
函数和 cleanup()
函数,与 reduce()
函数不同,不会根据key的数目多次执行,只会执行1次。
setup()此方法被MapReduce框架仅且执行一次,在执行Map任务前,进行相关变量或者资源的集中初始化工作。若是将资源初始化工作放在方法map()中,导致Mapper任务在解析每一行输入时都会进行资源初始化工作,导致重复,程序运行效率不高。
cleanup()此方法被MapReduce框架仅且执行一次,在执行完毕Map任务后,进行相关变量或资源的释放工作。若是将释放资源工作放入方法map()中,也会导致Mapper任务在解析、处理每一行文本后释放资源,而且在下一行文本解析前还要重复初始化,导致反复重复,程序运行效率不高。
算法介绍
在 setup()
函数中,主要用来从配置中获取需要提取Top N的N值,并初始化 top[]
数组;
在 reduce()
函数中,计算出每个Category的视频总数后覆盖放入top[0]数组并进行排序;
在 cleanup()
函数中,将覆盖排序多次后的top数组写入output。
public static class TopNReducer extends Reducer<Text, IntWritable, Text, IntWritable> { int len; TwoTuple[] top; @Override protected void setup(Context context) throws IOException, InterruptedException { len =context.getConfiguration().getInt("N", 10); // 从配置中获取top N的N值,若无则默认为10 top = new TwoTuple[len + 1]; for (int i=0; i<=len; i++) { top[i] = new TwoTuple("null", 0); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { for (int i = len; i > 0; i--) { context.write(new Text(top[i].first), new IntWritable(top[i].second)); } } @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } add(key.toString(), sum); } private void add(String key, int val) { top[0].first = key; top[0].second = val; // 替换掉最小值 Arrays.sort(top); // 排序,从小到大顺序 } }
main&conf
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.addResource("classpath:/hadoop/core-site.xml"); conf.addResource("classpath:/hadoop/hdfs-site.xml"); conf.addResource("classpath:/hadoop/mapred-site.xml"); conf.setInt("N", 5); // String[] otherArgs = new GenericOptionsParser(conf, // args).getRemainingArgs(); String[] otherArgs = { "/youtube", "/youtube_category_Top5" }; if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "youtube"); job.setJarByClass(FindMaxCategory.class); job.setMapperClass(CategoryMapper.class); job.setCombinerClass(SumReducer.class); // job.setReducerClass(SumReducer.class); // 统计每个类别的总量 job.setReducerClass(TopNReducer.class); // 统计TopN的类别的总量 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputDirRecursive(job, true); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- MapReduce实践 Uber数据分析
- 基于Spark的数据分析实践
- 数据分析之用户画像方法实践
- 干货 | 数据分析之用户画像方法与实践
- 神策数据徐美玲:数据分析之产品应用实践
- Uber永久定位系统实时数据分析过程实践!
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Python金融衍生品大数据分析:建模、模拟、校准与对冲
【德】Yves Hilpisch(伊夫·希尔皮斯科) / 蔡立耑 / 电子工业出版社 / 2017-8 / 99.00
Python 在衍生工具分析领域占据重要地位,使机构能够快速、有效地提供定价、交易及风险管理的结果。《Python金融衍生品大数据分析:建模、模拟、校准与对冲》精心介绍了有效定价期权的四个领域:基于巿场定价的过程、完善的巿场模型、数值方法及技术。书中的内容分为三个部分。第一部分着眼于影响股指期权价值的风险,以及股票和利率的相关实证发现。第二部分包括套利定价理论、离散及连续时间的风险中性定价,并介绍......一起来看看 《Python金融衍生品大数据分析:建模、模拟、校准与对冲》 这本书的介绍吧!