MapReduce数据处理[经典面试题02]

栏目: 服务器 · 发布时间: 5年前

互联网公司处理的比较多的数据就是日志数据,其中访问日志处理要求就比较多一点,一般来说会有要求按照用户进行分组提取相关数据:

一个比较常见的需求就是,取用户最近的访问记录,例如有两分文件:

1、用户信息文件

用户id 用户名称  设备id

1   kedai  1
1  yinwen 2
2  haha   3
3  zhenhao    4

2、设备访问日志

设备id 访问日期  访问ip

1   2019-01-05 01:01:48    192.168.1.39
3  2019-01-04 02:20:13    192.168.1.79
3  2019-01-04 01:55:16    192.168.1.32
9  2019-01-04 06:16:10    192.168.1.83
1  2019-01-03 23:01:43    192.168.1.70
4  2019-01-04 23:16:42    192.168.1.75
5  2019-01-05 01:49:02    192.168.1.56
6  2019-01-04 05:56:26    192.168.1.37

取每个用户最近一次访问记录信息,一个用户可能有多个设备id。

思路一:

一、只需要一个job进行处理,使用mapjoin进行操作,把文件放到内存里面进行初始化一次

二、使用map-reduce进行处理即可,新建排序bean类,把bean类作为map的输出key,再reduce阶段进行限制输出数。

思路二:

一、使用两个job进行处理,job01阶段 map 分别读取两个文件,按照设备id作为key进行输出

二、 job01阶段 reduce按照设备id进行合并记录

三、job02 map阶段,使用 新建排序bean类,把bean实例作为key输出,用户id作为value。

四、按照value进行过滤获取前20;

思路一适合大小文件处理,思路二更适合一般处理过程。

思路一代码:

package hadoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;


public class UserVisitSort {
    public static class UserVisitSortMap extends Mapper<LongWritable, Text,VisitBean, Text> {

        private HashMap<String,String> user = new HashMap<>();

        @Override
        protected void setup(Context context) throws IOException {
            String filename ="/mapreduce/src/main/resources/joinfiles/user.txt";

            BufferedReader br = new BufferedReader(new FileReader(filename));
            String line;
            while ((line=br.readLine())!=null){
                String[] strings =line.split("\t");
                user.put(strings[2],strings[0]+"\t"+strings[1]);
            }
            br.close();
        }

        @Override
        protected void map(LongWritable key,Text values,Context context) throws IOException, InterruptedException {
            VisitBean visitBean = null;
            String lines = values.toString();
            String[] fields = lines.split("\t");
            if(user.get(fields[0])!=null&&!"".equals(user.get(fields[0]))){
                String [] userInfo = user.get(fields[0]).split("\t");
                visitBean = new VisitBean(new Integer(userInfo[0]),new Integer(fields[0]),fields[1],userInfo[1],fields[2]);
                context.write(visitBean,new Text(userInfo[0]));
            }

        }
    }

    public static class UserVisitSortReduce extends Reducer<VisitBean,Text,Text,VisitBean> {
        HashMap<String,Integer> mapCount = new HashMap<>();

        @Override
        protected void reduce(VisitBean key,Iterable<Text> values,Context context) throws IOException, InterruptedException {
            for(Text v:values){
                if(mapCount.get(v.toString())==null){
                    context.write(v,key);
                    mapCount.put(v.toString(),1);
                }else if(mapCount.get(v.toString())<10){
                    context.write(v,key);
                    mapCount.put(v.toString(),mapCount.get(v.toString())+1);
                }
            }
        }
    }


    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf);

        Path inPath = new Path("/mapreduce/src/main/resources/joinfiles/visit_log.txt");
        Path outPath = new Path("/mapreduce/src/main/resources/out");

        FileSystem fileSystem  = FileSystem.get(conf);

        if(fileSystem.isDirectory(outPath)){
            fileSystem.delete(outPath,true);
        }

        job.setJarByClass(UserVisitSort.class);

        job.setMapperClass(UserVisitSortMap.class);

        job.setReducerClass(UserVisitSortReduce.class);

        FileInputFormat.setInputPaths(job,inPath);
        FileOutputFormat.setOutputPath(job,outPath);

        job.setMapOutputKeyClass(VisitBean.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(VisitBean.class);

        boolean res =job.waitForCompletion(true);

        System.exit(res?0:1);

    }

}

bean类

package hadoop;



import com.alibaba.fastjson.JSON;

import org.apache.hadoop.io.WritableComparable;



import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;



public class VisitBean implements WritableComparable<VisitBean> {



    private Integer userId;

    private Integer deviceId;

    private String datetime;

    private String name;

    private String ip;



    //需要空构造

    public VisitBean(){}



    public VisitBean(Integer userId, Integer deviceId, String datetime, String ip,String name) {

        this.userId = userId;

        this.deviceId = deviceId;

        this.datetime = datetime;

        this.ip = ip;

        this.name = name;

    }



    @Override

    public int compareTo(VisitBean o) {

        if(this.userId==o.getUserId()){

          //按照日期降序

            return this.datetime.compareTo(o.getDatetime())>0?-1:1;

        }else {

          //按照用户id升序

            return this.userId>o.getUserId()?1:-1;

        }

    }



    //write、readFields的顺序需要一致

    @Override

    public void write(DataOutput dataOutput) throws IOException {

        dataOutput.writeInt(this.userId);

        dataOutput.writeInt(this.deviceId);

        dataOutput.writeUTF(this.datetime);

        dataOutput.writeUTF(this.ip);

        dataOutput.writeUTF(this.name);

    }



    @Override

    public void readFields(DataInput dataInput) throws IOException {

        this.userId = dataInput.readInt();

        this.deviceId = dataInput.readInt();

        this.datetime = dataInput.readUTF();

        this.ip = dataInput.readUTF();

        this.name = dataInput.readUTF();

    }



    @Override

    public String toString(){

        return JSON.toJSONString(this);

    }



    public Integer getUserId() {

        return userId;

    }



    public void setUserId(Integer userId) {

        this.userId = userId;

    }



    public Integer getDeviceId() {

        return deviceId;

    }



    public void setDeviceId(Integer deviceId) {

        this.deviceId = deviceId;

    }



    public String getDatetime() {

        return datetime;

    }



    public void setDatetime(String datetime) {

        this.datetime = datetime;

    }



    public String getIp() {

        return ip;

    }



    public void setIp(String ip) {

        this.ip = ip;

    }



    public String getName() {

        return name;

    }



    public void setName(String name) {

        this.name = name;

    }

}


注意reducetask数量需要设置成1,如果需要分组文件需要设置 Partitioner

执行结果如下:

1	{"datetime":"2019-01-05 11:31:21","deviceId":1,"ip":"kedai","name":"192.168.1.94","userId":1}

1	{"datetime":"2019-01-05 11:15:52","deviceId":1,"ip":"kedai","name":"192.168.1.23","userId":1}

1	{"datetime":"2019-01-05 11:05:06","deviceId":1,"ip":"kedai","name":"192.168.1.40","userId":1}

1	{"datetime":"2019-01-05 09:12:39","deviceId":1,"ip":"kedai","name":"192.168.1.52","userId":1}

1	{"datetime":"2019-01-05 09:01:30","deviceId":1,"ip":"kedai","name":"192.168.1.11","userId":1}

1	{"datetime":"2019-01-05 08:51:52","deviceId":1,"ip":"kedai","name":"192.168.1.33","userId":1}

1	{"datetime":"2019-01-05 08:47:36","deviceId":1,"ip":"kedai","name":"192.168.1.89","userId":1}

1	{"datetime":"2019-01-05 08:43:59","deviceId":1,"ip":"kedai","name":"192.168.1.38","userId":1}

1	{"datetime":"2019-01-05 05:39:51","deviceId":1,"ip":"kedai","name":"192.168.1.51","userId":1}

1	{"datetime":"2019-01-05 02:23:49","deviceId":1,"ip":"kedai","name":"192.168.1.89","userId":1}

2	{"datetime":"2019-01-05 11:49:04","deviceId":3,"ip":"haha","name":"192.168.1.33","userId":2}

2	{"datetime":"2019-01-05 10:44:18","deviceId":3,"ip":"haha","name":"192.168.1.95","userId":2}

2	{"datetime":"2019-01-05 09:35:54","deviceId":3,"ip":"haha","name":"192.168.1.74","userId":2}

2	{"datetime":"2019-01-05 08:56:08","deviceId":3,"ip":"haha","name":"192.168.1.84","userId":2}

2	{"datetime":"2019-01-05 08:24:55","deviceId":3,"ip":"haha","name":"192.168.1.91","userId":2}

2	{"datetime":"2019-01-05 06:46:20","deviceId":3,"ip":"haha","name":"192.168.1.32","userId":2}

2	{"datetime":"2019-01-05 06:34:02","deviceId":3,"ip":"haha","name":"192.168.1.81","userId":2}

2	{"datetime":"2019-01-05 06:02:43","deviceId":3,"ip":"haha","name":"192.168.1.73","userId":2}

2	{"datetime":"2019-01-05 04:08:02","deviceId":3,"ip":"haha","name":"192.168.1.26","userId":2}

2	{"datetime":"2019-01-05 03:01:34","deviceId":3,"ip":"haha","name":"192.168.1.92","userId":2}

测试数据见附件

本文由brucelu 创作,采用 知识共享署名-相同方式共享 3.0 中国大陆许可协议 进行许可。

转载、引用前需联系作者,并署名作者且注明文章出处。

本站文章版权归原作者及原出处所有 。内容为作者个人观点, 并不代表本站赞同其观点和对其真实性负责。本站是一个个人学习交流的平台,并不用于任何商业目的,如果有任何问题,请及时联系我们,我们将根据著作权人的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Powerful

Powerful

Patty McCord / Missionday / 2018-1-25

Named by The Washington Post as one of the 11 Leadership Books to Read in 2018 When it comes to recruiting, motivating, and creating great teams, Patty McCord says most companies have it all wrong. Mc......一起来看看 《Powerful》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

html转js在线工具
html转js在线工具

html转js在线工具