内容简介:这篇博客是关于如何在Hadoop MapReduce中进行YouTube数据分析的。使用该数据集执行一些分析,并将提取一些有用的信息,例如YouTube上排名前10位的视频,他们上传了最多的视频。
Column 1: Video id of 11 characters.
Column 2: uploader of the video
Column 3: Interval between the day of establishment of Youtube and the date of uploading of the video.
Column 4: Category of the video.
Column 5: Length of the video.
Column 6: Number of views for the video.
Column 7: Rating on the video.
Column 8: Number of ratings given for the video
Column 9: Number of comments done on the videos.
Column 10: Related video ids with the uploaded video.
问题1:寻找Top 5视频类别
对每一行数据进行划分,统计各个视频类别的数量(Column 4)。数据集中部分数据缺失,因此忽略了划分后少于5个属性的数据。
public static class CategoryMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); // 值为1 private Text category = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[] attributeArray = value.toString().split("\t");// 对字符串进行切分 if (attributeArray.length > 5) // 忽略属性值少于5的错误数据 { category.set(attributeArray[3]); context.write(category, one); } } }
public static class SumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } context.write(key, new IntWritable(sum)); } }
因为需要对, [视频类别, 视频数]
数组进行 排序 比较,因此首先定义一个二元组类,包含视频类别和视频数,分别对应first和second。并定义了Comparable接口,用于后面排序的需要。
public static class TwoTuple implements Comparable<TwoTuple> { public String first; public int second; public TwoTuple(String a, int b) { first = a; second = b; } public String toString() { return "(" + first + ", " + second + ")"; } @Override public int compareTo(TwoTuple tt) { return second - tt.second; } }
使用Reducer实现提取Top N值的算法。
首先需要介绍 setup()
函数和 cleanup()
函数,与 reduce()
在 setup()
函数中,主要用来从配置中获取需要提取Top N的N值,并初始化 top[]
在 reduce()
在 cleanup()
public static class TopNReducer extends Reducer<Text, IntWritable, Text, IntWritable> { int len; TwoTuple[] top; @Override protected void setup(Context context) throws IOException, InterruptedException { len =context.getConfiguration().getInt("N", 10); // 从配置中获取top N的N值,若无则默认为10 top = new TwoTuple[len + 1]; for (int i=0; i<=len; i++) { top[i] = new TwoTuple("null", 0); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { for (int i = len; i > 0; i--) { context.write(new Text(top[i].first), new IntWritable(top[i].second)); } } @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } add(key.toString(), sum); } private void add(String key, int val) { top[0].first = key; top[0].second = val; // 替换掉最小值 Arrays.sort(top); // 排序,从小到大顺序 } }
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.addResource("classpath:/hadoop/core-site.xml"); conf.addResource("classpath:/hadoop/hdfs-site.xml"); conf.addResource("classpath:/hadoop/mapred-site.xml"); conf.setInt("N", 5); // String[] otherArgs = new GenericOptionsParser(conf, // args).getRemainingArgs(); String[] otherArgs = { "/youtube", "/youtube_category_Top5" }; if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "youtube"); job.setJarByClass(FindMaxCategory.class); job.setMapperClass(CategoryMapper.class); job.setCombinerClass(SumReducer.class); // job.setReducerClass(SumReducer.class); // 统计每个类别的总量 job.setReducerClass(TopNReducer.class); // 统计TopN的类别的总量 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputDirRecursive(job, true); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }
