Hive、MapReduce、Spark分布式生成唯一数值型ID

栏目: 服务器 · 发布时间: 6年前

内容简介:Hive、MapReduce、Spark分布式生成唯一数值型ID

Hive、MapReduce、Spark分布式生成唯一数值型ID

在实际业务场景下,经常会遇到在Hive、MapReduce、Spark中需要生成唯一的数值型ID。

一般常用的做法有:

MapReduce中使用1个Reduce来生成;

Hive中使用row_number分析函数来生成,其实也是1个Reduce;

借助HBase或 Redis 或Zookeeper等其它框架的计数器来生成;

数据量不大的情况下,可以直接使用1和2方法来生成,但如果数据量巨大,1个Reduce处理起来就非常慢。

在数据量非常大的情况下,如果你仅仅需要唯一的数值型ID,注意:不是需要”连续的唯一的数值型ID”,那么可以考虑采用本文中介绍的方法,否则,请使用第3种方法来完成。

Spark中生成这样的非连续唯一数值型ID,非常简单,直接使用zipWithUniqueId()即可。

参考zipWithUniqueId()的方法,在MapReduce和Hive中,实现如下:

Hive、MapReduce、Spark分布式生成唯一数值型ID

在Spark中,zipWithUniqueId是通过使用分区Index作为每个分区ID的开始值,在每个分区内,ID增长的步长为该RDD的分区数,那么在MapReduce和Hive中,也可以照此思路实现,Spark中的分区数,即为MapReduce中的Map数,Spark分区的Index,即为Map Task的ID。Map数,可以通过JobConf的getNumMapTasks(),而Map Task ID,可以通过参数mapred.task.id获取,格式如:attempt_1478926768563_0537_m_000004_0,截取m_000004_0中的4,再加1,作为该Map Task的ID起始值。注意:这两个只均需要在Job运行时才能获取。另外,从图中也可以看出,每个分区/Map Task中的数据量不是绝对一致的,因此,生成的ID不是连续的。

下面的UDF可以在Hive中直接使用:

package com.lxw1234.hive.udf; 
  
import org.apache.hadoop.hive.ql.exec.MapredContext; 
import org.apache.hadoop.hive.ql.exec.UDFArgumentException; 
import org.apache.hadoop.hive.ql.metadata.HiveException; 
import org.apache.hadoop.hive.ql.udf.UDFType; 
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; 
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; 
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; 
import org.apache.hadoop.io.LongWritable; 
  
@UDFType(deterministic = false, stateful = true) 
public class RowSeq2 extends GenericUDF { 
     
    private static LongWritable result = new LongWritable(); 
    private static final char SEPARATOR = '_'; 
    private static final String ATTEMPT = "attempt"; 
    private long initID = 0l; 
    private int increment = 0; 
     
     
    @Override 
    public void configure(MapredContext context) { 
        increment = context.getJobConf().getNumMapTasks(); 
        if(increment == 0) { 
            throw new IllegalArgumentException("mapred.map.tasks is zero"); 
        } 
         
        initID = getInitId(context.getJobConf().get("mapred.task.id"),increment); 
        if(initID == 0l) { 
            throw new IllegalArgumentException("mapred.task.id"); 
        } 
         
        System.out.println("initID : " + initID + "  increment : " + increment); 
    } 
     
    @Override 
    public ObjectInspector initialize(ObjectInspector[] arguments) 
            throws UDFArgumentException { 
        return PrimitiveObjectInspectorFactory.writableLongObjectInspector; 
    } 
  
    @Override 
    public Object evaluate(DeferredObject[] arguments) throws HiveException { 
        result.set(getValue()); 
        increment(increment); 
        return result; 
    } 
     
    @Override 
    public String getDisplayString(String[] children) { 
        return "RowSeq-func()"; 
    } 
     
    private synchronized void increment(int incr) { 
        initID += incr; 
    } 
     
    private synchronized long getValue() { 
        return initID; 
    } 
     
    //attempt_1478926768563_0537_m_000004_0 // return 0+1 
    private long getInitId (String taskAttemptIDstr,int numTasks) 
            throws IllegalArgumentException { 
        try { 
            String[] parts = taskAttemptIDstr.split(Character.toString(SEPARATOR)); 
            if(parts.length == 6) { 
                if(parts[0].equals(ATTEMPT)) { 
                    if(!parts[3].equals("m") && !parts[3].equals("r")) { 
                        throw new Exception(); 
                    } 
                    long result = Long.parseLong(parts[4]); 
                    if(result >= numTasks) { //if taskid >= numtasks 
                        throw new Exception("TaskAttemptId string : " + taskAttemptIDstr 
                                + "  parse ID [" + result + "] >= numTasks[" + numTasks + "] .."); 
                    } 
                    return result + 1; 
                } 
            } 
        } catch (Exception e) {} 
        throw new IllegalArgumentException("TaskAttemptId string : " + taskAttemptIDstr 
                + " is not properly formed"); 
    } 
     
} 
  

有一张去重后的用户id(字符串类型)表,需要位每个用户id生成一个唯一的数值型seq:

ADD jar file:///tmp/udf.jar; 
CREATE temporary function seq2 as 'com.lxw1234.hive.udf.RowSeq2'; 
  
hive>> desc lxw_all_ids; 
OK 
id                      string                                       
Time taken: 0.074 seconds, Fetched: 1 row(s) 
hive> select * from lxw_all_ids limit 5; 
OK 
01779E7A06ABF5565A4982_cookie 
031E2D2408C29556420255_cookie 
03371ADA0B6E405806FFCD_cookie 
0517C4B701BC1256BFF6EC_cookie 
05F12ADE0E880455931C1A_cookie 
Time taken: 0.215 seconds, Fetched: 5 row(s) 
hive> select count(1) from lxw_all_ids; 
253402337 
  
hive> create table lxw_all_ids2 as select id,seq2() as seq from lxw_all_ids; 
… 
Hadoop job information for Stage-1: number of mappers: 27; number of reducers: 0 
… 
  
  
  

该Job使用了27个Map Task,没有使用Reduce,那么将会产生27个结果文件。

再看结果表中的数据:

hive> select * from lxw_all_ids2 limit 10; 
OK 
766CA2770527B257D332AA_cookie   1 
5A0492DB0000C557A81383_cookie   28 
8C06A5770F176E58301EEF_cookie   55 
6498F47B0BCAFE5842B83A_cookie   82 
6DA33CB709A23758428A44_cookie   109 
B766347B0D27925842AC2D_cookie   136 
5794357B050C99584251AC_cookie   163 
81D67A7B011BEA5842776C_cookie   190 
9D2F8EB40AEA525792347D_cookie   217 
BD21077B09F9E25844D2C1_cookie   244 
  
hive> select count(1),count(distinct seq) from lxw_all_ids2; 
253402337       253402337 
  

limit 10只从第一个结果文件,即MapTaskId为0的结果文件中拿了10条,这个Map中,start=1,increment=27,因此生成的ID如上所示。

count(1),count(distinct seq)的值相同,说明seq没有重复值,你可以试试max(seq),结果必然大于253402337,说明seq是”非连续唯一数值型ID“.


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

与孩子一起学编程

与孩子一起学编程

[美] 桑德Warren Sande、Carter Sande / 苏金国、姚曜 等 / 人民邮电出版社 / 2010-11 / 65.00元

一本老少咸宜的编程入门奇书!一册在手,你完全可以带着自己的孩子,跟随Sande父子组合在轻松的氛围中熟悉那些编程概念,如内存、循环、输入和输出、数据结构和图形用户界面等。这些知识一点儿也不高深,听起来备感亲切,书中言语幽默风趣而不失真义,让学习过程充满乐趣。细心的作者还配上了孩子们都喜欢的可爱漫画和经过运行测试的程序示例,教你用最易编写和最易理解的Python语言,写出你梦想中的游戏程序。 ......一起来看看 《与孩子一起学编程》 这本书的介绍吧!

html转js在线工具
html转js在线工具

html转js在线工具

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具