MapReduce数据处理[经典面试题02]

栏目: 服务器 · 发布时间: 5年前

互联网公司处理的比较多的数据就是日志数据,其中访问日志处理要求就比较多一点,一般来说会有要求按照用户进行分组提取相关数据:

一个比较常见的需求就是,取用户最近的访问记录,例如有两分文件:

1、用户信息文件

用户id 用户名称  设备id

1   kedai  1
1  yinwen 2
2  haha   3
3  zhenhao    4

2、设备访问日志

设备id 访问日期  访问ip

1   2019-01-05 01:01:48    192.168.1.39
3  2019-01-04 02:20:13    192.168.1.79
3  2019-01-04 01:55:16    192.168.1.32
9  2019-01-04 06:16:10    192.168.1.83
1  2019-01-03 23:01:43    192.168.1.70
4  2019-01-04 23:16:42    192.168.1.75
5  2019-01-05 01:49:02    192.168.1.56
6  2019-01-04 05:56:26    192.168.1.37

取每个用户最近一次访问记录信息,一个用户可能有多个设备id。

思路一:

一、只需要一个job进行处理,使用mapjoin进行操作,把文件放到内存里面进行初始化一次

二、使用map-reduce进行处理即可,新建排序bean类,把bean类作为map的输出key,再reduce阶段进行限制输出数。

思路二:

一、使用两个job进行处理,job01阶段 map 分别读取两个文件,按照设备id作为key进行输出

二、 job01阶段 reduce按照设备id进行合并记录

三、job02 map阶段,使用 新建排序bean类,把bean实例作为key输出,用户id作为value。

四、按照value进行过滤获取前20;

思路一适合大小文件处理,思路二更适合一般处理过程。

思路一代码:

package hadoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;


public class UserVisitSort {
    public static class UserVisitSortMap extends Mapper<LongWritable, Text,VisitBean, Text> {

        private HashMap<String,String> user = new HashMap<>();

        @Override
        protected void setup(Context context) throws IOException {
            String filename ="/mapreduce/src/main/resources/joinfiles/user.txt";

            BufferedReader br = new BufferedReader(new FileReader(filename));
            String line;
            while ((line=br.readLine())!=null){
                String[] strings =line.split("\t");
                user.put(strings[2],strings[0]+"\t"+strings[1]);
            }
            br.close();
        }

        @Override
        protected void map(LongWritable key,Text values,Context context) throws IOException, InterruptedException {
            VisitBean visitBean = null;
            String lines = values.toString();
            String[] fields = lines.split("\t");
            if(user.get(fields[0])!=null&&!"".equals(user.get(fields[0]))){
                String [] userInfo = user.get(fields[0]).split("\t");
                visitBean = new VisitBean(new Integer(userInfo[0]),new Integer(fields[0]),fields[1],userInfo[1],fields[2]);
                context.write(visitBean,new Text(userInfo[0]));
            }

        }
    }

    public static class UserVisitSortReduce extends Reducer<VisitBean,Text,Text,VisitBean> {
        HashMap<String,Integer> mapCount = new HashMap<>();

        @Override
        protected void reduce(VisitBean key,Iterable<Text> values,Context context) throws IOException, InterruptedException {
            for(Text v:values){
                if(mapCount.get(v.toString())==null){
                    context.write(v,key);
                    mapCount.put(v.toString(),1);
                }else if(mapCount.get(v.toString())<10){
                    context.write(v,key);
                    mapCount.put(v.toString(),mapCount.get(v.toString())+1);
                }
            }
        }
    }


    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf);

        Path inPath = new Path("/mapreduce/src/main/resources/joinfiles/visit_log.txt");
        Path outPath = new Path("/mapreduce/src/main/resources/out");

        FileSystem fileSystem  = FileSystem.get(conf);

        if(fileSystem.isDirectory(outPath)){
            fileSystem.delete(outPath,true);
        }

        job.setJarByClass(UserVisitSort.class);

        job.setMapperClass(UserVisitSortMap.class);

        job.setReducerClass(UserVisitSortReduce.class);

        FileInputFormat.setInputPaths(job,inPath);
        FileOutputFormat.setOutputPath(job,outPath);

        job.setMapOutputKeyClass(VisitBean.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(VisitBean.class);

        boolean res =job.waitForCompletion(true);

        System.exit(res?0:1);

    }

}

bean类

package hadoop;



import com.alibaba.fastjson.JSON;

import org.apache.hadoop.io.WritableComparable;



import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;



public class VisitBean implements WritableComparable<VisitBean> {



    private Integer userId;

    private Integer deviceId;

    private String datetime;

    private String name;

    private String ip;



    //需要空构造

    public VisitBean(){}



    public VisitBean(Integer userId, Integer deviceId, String datetime, String ip,String name) {

        this.userId = userId;

        this.deviceId = deviceId;

        this.datetime = datetime;

        this.ip = ip;

        this.name = name;

    }



    @Override

    public int compareTo(VisitBean o) {

        if(this.userId==o.getUserId()){

          //按照日期降序

            return this.datetime.compareTo(o.getDatetime())>0?-1:1;

        }else {

          //按照用户id升序

            return this.userId>o.getUserId()?1:-1;

        }

    }



    //write、readFields的顺序需要一致

    @Override

    public void write(DataOutput dataOutput) throws IOException {

        dataOutput.writeInt(this.userId);

        dataOutput.writeInt(this.deviceId);

        dataOutput.writeUTF(this.datetime);

        dataOutput.writeUTF(this.ip);

        dataOutput.writeUTF(this.name);

    }



    @Override

    public void readFields(DataInput dataInput) throws IOException {

        this.userId = dataInput.readInt();

        this.deviceId = dataInput.readInt();

        this.datetime = dataInput.readUTF();

        this.ip = dataInput.readUTF();

        this.name = dataInput.readUTF();

    }



    @Override

    public String toString(){

        return JSON.toJSONString(this);

    }



    public Integer getUserId() {

        return userId;

    }



    public void setUserId(Integer userId) {

        this.userId = userId;

    }



    public Integer getDeviceId() {

        return deviceId;

    }



    public void setDeviceId(Integer deviceId) {

        this.deviceId = deviceId;

    }



    public String getDatetime() {

        return datetime;

    }



    public void setDatetime(String datetime) {

        this.datetime = datetime;

    }



    public String getIp() {

        return ip;

    }



    public void setIp(String ip) {

        this.ip = ip;

    }



    public String getName() {

        return name;

    }



    public void setName(String name) {

        this.name = name;

    }

}


注意reducetask数量需要设置成1,如果需要分组文件需要设置 Partitioner

执行结果如下:

1	{"datetime":"2019-01-05 11:31:21","deviceId":1,"ip":"kedai","name":"192.168.1.94","userId":1}

1	{"datetime":"2019-01-05 11:15:52","deviceId":1,"ip":"kedai","name":"192.168.1.23","userId":1}

1	{"datetime":"2019-01-05 11:05:06","deviceId":1,"ip":"kedai","name":"192.168.1.40","userId":1}

1	{"datetime":"2019-01-05 09:12:39","deviceId":1,"ip":"kedai","name":"192.168.1.52","userId":1}

1	{"datetime":"2019-01-05 09:01:30","deviceId":1,"ip":"kedai","name":"192.168.1.11","userId":1}

1	{"datetime":"2019-01-05 08:51:52","deviceId":1,"ip":"kedai","name":"192.168.1.33","userId":1}

1	{"datetime":"2019-01-05 08:47:36","deviceId":1,"ip":"kedai","name":"192.168.1.89","userId":1}

1	{"datetime":"2019-01-05 08:43:59","deviceId":1,"ip":"kedai","name":"192.168.1.38","userId":1}

1	{"datetime":"2019-01-05 05:39:51","deviceId":1,"ip":"kedai","name":"192.168.1.51","userId":1}

1	{"datetime":"2019-01-05 02:23:49","deviceId":1,"ip":"kedai","name":"192.168.1.89","userId":1}

2	{"datetime":"2019-01-05 11:49:04","deviceId":3,"ip":"haha","name":"192.168.1.33","userId":2}

2	{"datetime":"2019-01-05 10:44:18","deviceId":3,"ip":"haha","name":"192.168.1.95","userId":2}

2	{"datetime":"2019-01-05 09:35:54","deviceId":3,"ip":"haha","name":"192.168.1.74","userId":2}

2	{"datetime":"2019-01-05 08:56:08","deviceId":3,"ip":"haha","name":"192.168.1.84","userId":2}

2	{"datetime":"2019-01-05 08:24:55","deviceId":3,"ip":"haha","name":"192.168.1.91","userId":2}

2	{"datetime":"2019-01-05 06:46:20","deviceId":3,"ip":"haha","name":"192.168.1.32","userId":2}

2	{"datetime":"2019-01-05 06:34:02","deviceId":3,"ip":"haha","name":"192.168.1.81","userId":2}

2	{"datetime":"2019-01-05 06:02:43","deviceId":3,"ip":"haha","name":"192.168.1.73","userId":2}

2	{"datetime":"2019-01-05 04:08:02","deviceId":3,"ip":"haha","name":"192.168.1.26","userId":2}

2	{"datetime":"2019-01-05 03:01:34","deviceId":3,"ip":"haha","name":"192.168.1.92","userId":2}

测试数据见附件

本文由brucelu 创作,采用 知识共享署名-相同方式共享 3.0 中国大陆许可协议 进行许可。

转载、引用前需联系作者,并署名作者且注明文章出处。

本站文章版权归原作者及原出处所有 。内容为作者个人观点, 并不代表本站赞同其观点和对其真实性负责。本站是一个个人学习交流的平台,并不用于任何商业目的,如果有任何问题,请及时联系我们,我们将根据著作权人的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

连线

连线

加里·沃尔夫 / 黄锫坚 / 中国铁道出版社 / 2006-7 / 29.80元

简言之,是一个人和一本杂志的传奇……   放在大环境中,是一个时代的跌宕起伏……   一直大力地向所有人推荐这本书,只是因为,故事真的很精彩,是一个伟大而疯狂的时代所造就的一个不屈不挠的斗士——路易斯·罗塞托,还有这本举世皆知的杂志——《连线》。   通过本书,中国读者会知道,新经济热潮的主角不仅是比尔·盖茨、杨致远等技术和商业能人,还有一大批新文化的传教士和吹鼓手,比如路易斯·......一起来看看 《连线》 这本书的介绍吧!

随机密码生成器
随机密码生成器

多种字符组合密码

MD5 加密
MD5 加密

MD5 加密工具

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具