MapReducer
public class RatingMR {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
@SuppressWarnings("deprecation")
Job job = new Job(conf, "videorating");
job.setJarByClass(RatingMR.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FloatWritable.class);
//设置 map ,reduce 类及 class
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FloatWritable.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
//设置输入输出路径,这里指定的输路径是 HDFS,输出路径是 Linux 的本地文件系统
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job,
new Path("hdfs://localhost:9000/youtubedata.txt"));
FileOutputFormat.setOutputPath(job, new Path("/tmp/hadoop/out"));
job.waitForCompletion(true);
}
//Map类,封装KV键值对 <k1,v1,k2,v2> , k1 是偏移量,v1 是这行数据,输出类型是这行文本 k2 ,浮点类型 v2
public static class Map extends Mapper<LongWritable, Text, Text, FloatWritable> {
//创建Text,FloatWritable 对象
private Text video_name = new Text();
private FloatWritable rating = new FloatWritable();
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] str = line.split("\t");
//使用正则表达式匹配字符串只应包含浮点数,过滤掉小于 7 的无效字段
if (str.length > 7) {
video_name.set(str[0]);
if (str[6].matches("\\d+.+")) {
//强制类型转换
float f = Float.parseFloat(str[6]);
rating.set(f);
}
}
context.write(video_name, rating);
}
}
public static class Reduce extends Reducer<Text, FloatWritable, Text, FloatWritable> {
public void reduce(Text key, Iterable<FloatWritable> values,
Context context) throws IOException, InterruptedException {
float sum = 0;
int l = 0;
for (FloatWritable val : values) {
l += 1;
//累加value
sum += val.get();
}
sum = sum / l;
//取平均值
context.write(key, new FloatWritable(sum));
}
}
}
public class VideoCount {
//主函数
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
@SuppressWarnings("deprecation")
Job job = new Job(conf, "categories");
//设置生产 jar 包所使用的类
job.setJarByClass(VideoCount.class);
//设置 Map 类的输入输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//设置 Reduce 类的输入输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置 Map, Reduce 类的 class 参数
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
//指定格式化用的类型
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job,
new Path("hdfs://localhost:9000/youtubedata.txt"));
FileOutputFormat.setOutputPath(job, new Path("/tmp/hadoop/out2"));
//等待完成
job.waitForCompletion(true);
}
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
//构造文本类Text对象、IntWritable 对象,也可以直接以匿名函数的方式创建
private Text tx = new Text();
//map 的逻辑,使用tab“\ t”分隔符来分割行,并将值存储在String Array中,以使一行中的所有列都存储在字符串数组中
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//拿到每一行数据
String line = value.toString();
String[] str = line.split("\t");
//过滤字段
if (str.length > 5) {
tx.set(str[3]);
}
//输出key,value
context.write(tx, one);
}
}
//编写 reduce,接收 map 阶段传来的 kv 键值对,输出的类型和传进来的类型一致
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
//reduce????
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
//累加求和评分
for (IntWritable v : values) {
sum += v.get()
}
//写出去
context.write(key, new IntWritable(sum));
}
}
}
以上所述就是小编给大家介绍的《MapReduce编程实例》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- python编程(类变量和实例变量)
- Python网络编程socket模块实例解析
- Python网络编程之socket模块基础实例
- Python多进程并发与多线程并发编程实例总结
- 木兰编程语言重现——儿歌查询实例,引用模块、字符串列表操作
- 木兰编程语言重现:完善函数功能,常用字拆分数据处理实例
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。