互联网公司处理的比较多的数据就是日志数据,其中访问日志处理要求就比较多一点,一般来说会有要求按照用户进行分组提取相关数据:
一个比较常见的需求就是,取用户最近的访问记录,例如有两分文件:
1、用户信息文件
用户id 用户名称 设备id 1 kedai 1 1 yinwen 2 2 haha 3 3 zhenhao 4
2、设备访问日志
设备id 访问日期 访问ip 1 2019-01-05 01:01:48 192.168.1.39 3 2019-01-04 02:20:13 192.168.1.79 3 2019-01-04 01:55:16 192.168.1.32 9 2019-01-04 06:16:10 192.168.1.83 1 2019-01-03 23:01:43 192.168.1.70 4 2019-01-04 23:16:42 192.168.1.75 5 2019-01-05 01:49:02 192.168.1.56 6 2019-01-04 05:56:26 192.168.1.37
取每个用户最近一次访问记录信息,一个用户可能有多个设备id。
思路一:
一、只需要一个job进行处理,使用mapjoin进行操作,把文件放到内存里面进行初始化一次
二、使用map-reduce进行处理即可,新建排序bean类,把bean类作为map的输出key,再reduce阶段进行限制输出数。
思路二:
一、使用两个job进行处理,job01阶段 map 分别读取两个文件,按照设备id作为key进行输出
二、 job01阶段 reduce按照设备id进行合并记录
三、job02 map阶段,使用 新建排序bean类,把bean实例作为key输出,用户id作为value。
四、按照value进行过滤获取前20;
思路一适合大小文件处理,思路二更适合一般处理过程。
思路一代码:
package hadoop; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.util.HashMap; public class UserVisitSort { public static class UserVisitSortMap extends Mapper<LongWritable, Text,VisitBean, Text> { private HashMap<String,String> user = new HashMap<>(); @Override protected void setup(Context context) throws IOException { String filename ="/mapreduce/src/main/resources/joinfiles/user.txt"; BufferedReader br = new BufferedReader(new FileReader(filename)); String line; while ((line=br.readLine())!=null){ String[] strings =line.split("\t"); user.put(strings[2],strings[0]+"\t"+strings[1]); } br.close(); } @Override protected void map(LongWritable key,Text values,Context context) throws IOException, InterruptedException { VisitBean visitBean = null; String lines = values.toString(); String[] fields = lines.split("\t"); if(user.get(fields[0])!=null&&!"".equals(user.get(fields[0]))){ String [] userInfo = user.get(fields[0]).split("\t"); visitBean = new VisitBean(new Integer(userInfo[0]),new Integer(fields[0]),fields[1],userInfo[1],fields[2]); context.write(visitBean,new Text(userInfo[0])); } } } public static class UserVisitSortReduce extends Reducer<VisitBean,Text,Text,VisitBean> { HashMap<String,Integer> mapCount = new HashMap<>(); @Override protected void reduce(VisitBean key,Iterable<Text> values,Context context) throws IOException, InterruptedException { for(Text v:values){ if(mapCount.get(v.toString())==null){ context.write(v,key); mapCount.put(v.toString(),1); }else if(mapCount.get(v.toString())<10){ context.write(v,key); mapCount.put(v.toString(),mapCount.get(v.toString())+1); } } } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); Path inPath = new Path("/mapreduce/src/main/resources/joinfiles/visit_log.txt"); Path outPath = new Path("/mapreduce/src/main/resources/out"); FileSystem fileSystem = FileSystem.get(conf); if(fileSystem.isDirectory(outPath)){ fileSystem.delete(outPath,true); } job.setJarByClass(UserVisitSort.class); job.setMapperClass(UserVisitSortMap.class); job.setReducerClass(UserVisitSortReduce.class); FileInputFormat.setInputPaths(job,inPath); FileOutputFormat.setOutputPath(job,outPath); job.setMapOutputKeyClass(VisitBean.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(VisitBean.class); boolean res =job.waitForCompletion(true); System.exit(res?0:1); } }
bean类
package hadoop; import com.alibaba.fastjson.JSON; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class VisitBean implements WritableComparable<VisitBean> { private Integer userId; private Integer deviceId; private String datetime; private String name; private String ip; //需要空构造 public VisitBean(){} public VisitBean(Integer userId, Integer deviceId, String datetime, String ip,String name) { this.userId = userId; this.deviceId = deviceId; this.datetime = datetime; this.ip = ip; this.name = name; } @Override public int compareTo(VisitBean o) { if(this.userId==o.getUserId()){ //按照日期降序 return this.datetime.compareTo(o.getDatetime())>0?-1:1; }else { //按照用户id升序 return this.userId>o.getUserId()?1:-1; } } //write、readFields的顺序需要一致 @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeInt(this.userId); dataOutput.writeInt(this.deviceId); dataOutput.writeUTF(this.datetime); dataOutput.writeUTF(this.ip); dataOutput.writeUTF(this.name); } @Override public void readFields(DataInput dataInput) throws IOException { this.userId = dataInput.readInt(); this.deviceId = dataInput.readInt(); this.datetime = dataInput.readUTF(); this.ip = dataInput.readUTF(); this.name = dataInput.readUTF(); } @Override public String toString(){ return JSON.toJSONString(this); } public Integer getUserId() { return userId; } public void setUserId(Integer userId) { this.userId = userId; } public Integer getDeviceId() { return deviceId; } public void setDeviceId(Integer deviceId) { this.deviceId = deviceId; } public String getDatetime() { return datetime; } public void setDatetime(String datetime) { this.datetime = datetime; } public String getIp() { return ip; } public void setIp(String ip) { this.ip = ip; } public String getName() { return name; } public void setName(String name) { this.name = name; } }
注意reducetask数量需要设置成1,如果需要分组文件需要设置 Partitioner
执行结果如下:
1 {"datetime":"2019-01-05 11:31:21","deviceId":1,"ip":"kedai","name":"192.168.1.94","userId":1} 1 {"datetime":"2019-01-05 11:15:52","deviceId":1,"ip":"kedai","name":"192.168.1.23","userId":1} 1 {"datetime":"2019-01-05 11:05:06","deviceId":1,"ip":"kedai","name":"192.168.1.40","userId":1} 1 {"datetime":"2019-01-05 09:12:39","deviceId":1,"ip":"kedai","name":"192.168.1.52","userId":1} 1 {"datetime":"2019-01-05 09:01:30","deviceId":1,"ip":"kedai","name":"192.168.1.11","userId":1} 1 {"datetime":"2019-01-05 08:51:52","deviceId":1,"ip":"kedai","name":"192.168.1.33","userId":1} 1 {"datetime":"2019-01-05 08:47:36","deviceId":1,"ip":"kedai","name":"192.168.1.89","userId":1} 1 {"datetime":"2019-01-05 08:43:59","deviceId":1,"ip":"kedai","name":"192.168.1.38","userId":1} 1 {"datetime":"2019-01-05 05:39:51","deviceId":1,"ip":"kedai","name":"192.168.1.51","userId":1} 1 {"datetime":"2019-01-05 02:23:49","deviceId":1,"ip":"kedai","name":"192.168.1.89","userId":1} 2 {"datetime":"2019-01-05 11:49:04","deviceId":3,"ip":"haha","name":"192.168.1.33","userId":2} 2 {"datetime":"2019-01-05 10:44:18","deviceId":3,"ip":"haha","name":"192.168.1.95","userId":2} 2 {"datetime":"2019-01-05 09:35:54","deviceId":3,"ip":"haha","name":"192.168.1.74","userId":2} 2 {"datetime":"2019-01-05 08:56:08","deviceId":3,"ip":"haha","name":"192.168.1.84","userId":2} 2 {"datetime":"2019-01-05 08:24:55","deviceId":3,"ip":"haha","name":"192.168.1.91","userId":2} 2 {"datetime":"2019-01-05 06:46:20","deviceId":3,"ip":"haha","name":"192.168.1.32","userId":2} 2 {"datetime":"2019-01-05 06:34:02","deviceId":3,"ip":"haha","name":"192.168.1.81","userId":2} 2 {"datetime":"2019-01-05 06:02:43","deviceId":3,"ip":"haha","name":"192.168.1.73","userId":2} 2 {"datetime":"2019-01-05 04:08:02","deviceId":3,"ip":"haha","name":"192.168.1.26","userId":2} 2 {"datetime":"2019-01-05 03:01:34","deviceId":3,"ip":"haha","name":"192.168.1.92","userId":2}
测试数据见附件
本文由brucelu 创作,采用 知识共享署名-相同方式共享 3.0 中国大陆许可协议 进行许可。
转载、引用前需联系作者,并署名作者且注明文章出处。
本站文章版权归原作者及原出处所有 。内容为作者个人观点, 并不代表本站赞同其观点和对其真实性负责。本站是一个个人学习交流的平台,并不用于任何商业目的,如果有任何问题,请及时联系我们,我们将根据著作权人的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- MapReduce数据处理[经典面试题01]
- Python数据处理(二):处理 Excel 数据
- R|数据处理|因子型数据
- 大数据 -- 下一代数据处理技术
- Python数据处理(一):处理 JSON、XML、CSV 三种格式数据
- Python数据抓取(1) —数据处理前的准备
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。