MapReduce数据处理[经典面试题02]

栏目: 服务器 · 发布时间: 5年前

互联网公司处理的比较多的数据就是日志数据,其中访问日志处理要求就比较多一点,一般来说会有要求按照用户进行分组提取相关数据:

一个比较常见的需求就是,取用户最近的访问记录,例如有两分文件:

1、用户信息文件

用户id 用户名称  设备id

1   kedai  1
1  yinwen 2
2  haha   3
3  zhenhao    4

2、设备访问日志

设备id 访问日期  访问ip

1   2019-01-05 01:01:48    192.168.1.39
3  2019-01-04 02:20:13    192.168.1.79
3  2019-01-04 01:55:16    192.168.1.32
9  2019-01-04 06:16:10    192.168.1.83
1  2019-01-03 23:01:43    192.168.1.70
4  2019-01-04 23:16:42    192.168.1.75
5  2019-01-05 01:49:02    192.168.1.56
6  2019-01-04 05:56:26    192.168.1.37

取每个用户最近一次访问记录信息,一个用户可能有多个设备id。

思路一:

一、只需要一个job进行处理,使用mapjoin进行操作,把文件放到内存里面进行初始化一次

二、使用map-reduce进行处理即可,新建排序bean类,把bean类作为map的输出key,再reduce阶段进行限制输出数。

思路二:

一、使用两个job进行处理,job01阶段 map 分别读取两个文件,按照设备id作为key进行输出

二、 job01阶段 reduce按照设备id进行合并记录

三、job02 map阶段,使用 新建排序bean类,把bean实例作为key输出,用户id作为value。

四、按照value进行过滤获取前20;

思路一适合大小文件处理,思路二更适合一般处理过程。

思路一代码:

package hadoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;


public class UserVisitSort {
    public static class UserVisitSortMap extends Mapper<LongWritable, Text,VisitBean, Text> {

        private HashMap<String,String> user = new HashMap<>();

        @Override
        protected void setup(Context context) throws IOException {
            String filename ="/mapreduce/src/main/resources/joinfiles/user.txt";

            BufferedReader br = new BufferedReader(new FileReader(filename));
            String line;
            while ((line=br.readLine())!=null){
                String[] strings =line.split("\t");
                user.put(strings[2],strings[0]+"\t"+strings[1]);
            }
            br.close();
        }

        @Override
        protected void map(LongWritable key,Text values,Context context) throws IOException, InterruptedException {
            VisitBean visitBean = null;
            String lines = values.toString();
            String[] fields = lines.split("\t");
            if(user.get(fields[0])!=null&&!"".equals(user.get(fields[0]))){
                String [] userInfo = user.get(fields[0]).split("\t");
                visitBean = new VisitBean(new Integer(userInfo[0]),new Integer(fields[0]),fields[1],userInfo[1],fields[2]);
                context.write(visitBean,new Text(userInfo[0]));
            }

        }
    }

    public static class UserVisitSortReduce extends Reducer<VisitBean,Text,Text,VisitBean> {
        HashMap<String,Integer> mapCount = new HashMap<>();

        @Override
        protected void reduce(VisitBean key,Iterable<Text> values,Context context) throws IOException, InterruptedException {
            for(Text v:values){
                if(mapCount.get(v.toString())==null){
                    context.write(v,key);
                    mapCount.put(v.toString(),1);
                }else if(mapCount.get(v.toString())<10){
                    context.write(v,key);
                    mapCount.put(v.toString(),mapCount.get(v.toString())+1);
                }
            }
        }
    }


    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf);

        Path inPath = new Path("/mapreduce/src/main/resources/joinfiles/visit_log.txt");
        Path outPath = new Path("/mapreduce/src/main/resources/out");

        FileSystem fileSystem  = FileSystem.get(conf);

        if(fileSystem.isDirectory(outPath)){
            fileSystem.delete(outPath,true);
        }

        job.setJarByClass(UserVisitSort.class);

        job.setMapperClass(UserVisitSortMap.class);

        job.setReducerClass(UserVisitSortReduce.class);

        FileInputFormat.setInputPaths(job,inPath);
        FileOutputFormat.setOutputPath(job,outPath);

        job.setMapOutputKeyClass(VisitBean.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(VisitBean.class);

        boolean res =job.waitForCompletion(true);

        System.exit(res?0:1);

    }

}

bean类

package hadoop;



import com.alibaba.fastjson.JSON;

import org.apache.hadoop.io.WritableComparable;



import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;



public class VisitBean implements WritableComparable<VisitBean> {



    private Integer userId;

    private Integer deviceId;

    private String datetime;

    private String name;

    private String ip;



    //需要空构造

    public VisitBean(){}



    public VisitBean(Integer userId, Integer deviceId, String datetime, String ip,String name) {

        this.userId = userId;

        this.deviceId = deviceId;

        this.datetime = datetime;

        this.ip = ip;

        this.name = name;

    }



    @Override

    public int compareTo(VisitBean o) {

        if(this.userId==o.getUserId()){

          //按照日期降序

            return this.datetime.compareTo(o.getDatetime())>0?-1:1;

        }else {

          //按照用户id升序

            return this.userId>o.getUserId()?1:-1;

        }

    }



    //write、readFields的顺序需要一致

    @Override

    public void write(DataOutput dataOutput) throws IOException {

        dataOutput.writeInt(this.userId);

        dataOutput.writeInt(this.deviceId);

        dataOutput.writeUTF(this.datetime);

        dataOutput.writeUTF(this.ip);

        dataOutput.writeUTF(this.name);

    }



    @Override

    public void readFields(DataInput dataInput) throws IOException {

        this.userId = dataInput.readInt();

        this.deviceId = dataInput.readInt();

        this.datetime = dataInput.readUTF();

        this.ip = dataInput.readUTF();

        this.name = dataInput.readUTF();

    }



    @Override

    public String toString(){

        return JSON.toJSONString(this);

    }



    public Integer getUserId() {

        return userId;

    }



    public void setUserId(Integer userId) {

        this.userId = userId;

    }



    public Integer getDeviceId() {

        return deviceId;

    }



    public void setDeviceId(Integer deviceId) {

        this.deviceId = deviceId;

    }



    public String getDatetime() {

        return datetime;

    }



    public void setDatetime(String datetime) {

        this.datetime = datetime;

    }



    public String getIp() {

        return ip;

    }



    public void setIp(String ip) {

        this.ip = ip;

    }



    public String getName() {

        return name;

    }



    public void setName(String name) {

        this.name = name;

    }

}


注意reducetask数量需要设置成1,如果需要分组文件需要设置 Partitioner

执行结果如下:

1	{"datetime":"2019-01-05 11:31:21","deviceId":1,"ip":"kedai","name":"192.168.1.94","userId":1}

1	{"datetime":"2019-01-05 11:15:52","deviceId":1,"ip":"kedai","name":"192.168.1.23","userId":1}

1	{"datetime":"2019-01-05 11:05:06","deviceId":1,"ip":"kedai","name":"192.168.1.40","userId":1}

1	{"datetime":"2019-01-05 09:12:39","deviceId":1,"ip":"kedai","name":"192.168.1.52","userId":1}

1	{"datetime":"2019-01-05 09:01:30","deviceId":1,"ip":"kedai","name":"192.168.1.11","userId":1}

1	{"datetime":"2019-01-05 08:51:52","deviceId":1,"ip":"kedai","name":"192.168.1.33","userId":1}

1	{"datetime":"2019-01-05 08:47:36","deviceId":1,"ip":"kedai","name":"192.168.1.89","userId":1}

1	{"datetime":"2019-01-05 08:43:59","deviceId":1,"ip":"kedai","name":"192.168.1.38","userId":1}

1	{"datetime":"2019-01-05 05:39:51","deviceId":1,"ip":"kedai","name":"192.168.1.51","userId":1}

1	{"datetime":"2019-01-05 02:23:49","deviceId":1,"ip":"kedai","name":"192.168.1.89","userId":1}

2	{"datetime":"2019-01-05 11:49:04","deviceId":3,"ip":"haha","name":"192.168.1.33","userId":2}

2	{"datetime":"2019-01-05 10:44:18","deviceId":3,"ip":"haha","name":"192.168.1.95","userId":2}

2	{"datetime":"2019-01-05 09:35:54","deviceId":3,"ip":"haha","name":"192.168.1.74","userId":2}

2	{"datetime":"2019-01-05 08:56:08","deviceId":3,"ip":"haha","name":"192.168.1.84","userId":2}

2	{"datetime":"2019-01-05 08:24:55","deviceId":3,"ip":"haha","name":"192.168.1.91","userId":2}

2	{"datetime":"2019-01-05 06:46:20","deviceId":3,"ip":"haha","name":"192.168.1.32","userId":2}

2	{"datetime":"2019-01-05 06:34:02","deviceId":3,"ip":"haha","name":"192.168.1.81","userId":2}

2	{"datetime":"2019-01-05 06:02:43","deviceId":3,"ip":"haha","name":"192.168.1.73","userId":2}

2	{"datetime":"2019-01-05 04:08:02","deviceId":3,"ip":"haha","name":"192.168.1.26","userId":2}

2	{"datetime":"2019-01-05 03:01:34","deviceId":3,"ip":"haha","name":"192.168.1.92","userId":2}

测试数据见附件

本文由brucelu 创作,采用 知识共享署名-相同方式共享 3.0 中国大陆许可协议 进行许可。

转载、引用前需联系作者,并署名作者且注明文章出处。

本站文章版权归原作者及原出处所有 。内容为作者个人观点, 并不代表本站赞同其观点和对其真实性负责。本站是一个个人学习交流的平台,并不用于任何商业目的,如果有任何问题,请及时联系我们,我们将根据著作权人的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

程序员成长的烦恼

程序员成长的烦恼

吴亮、周金桥、李春雷、周礼 / 华中科技大学出版社 / 2011-4 / 28.00元

还在犹豫该不该转行学编程?还在编程的道路上摸爬滚打?在追寻梦想的道路上你并不孤单,《程序员成长的烦恼》中的四位“草根”程序员也曾有过类似的困惑。看看油田焊接技术员出身的周金桥是如何成功转行当上程序员的,做过钳工、当过外贸跟单员的李春雷是如何自学编程的,打小在486计算机上学习编程的吴亮是如何一路坚持下来的,工作中屡屡受挫、频繁跳槽的周礼是如何找到出路的。 《程序员成长的烦恼》记录了他们一步一......一起来看看 《程序员成长的烦恼》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具