不久前,我们处理了一个用户工单,该工单对应的 HQL 如下所示:

        array(0.50, 0.70, 0.90, 0.95, 0.99)
    date = '20170207'
    and column1 = 'value1'
    and column2 = 'value2'
    and column3 = 'value3'
    and column4 = 'value4'
    and column5 = 'value5'

这个 HQL 看上去并不复杂,其目的不过是计算 column0 这个字段的几个近似分位点(percentile_approx),就一个 stage,应该会比较顺利地完成计算才对。不巧的是,其 MapReduce 任务在 reduce 阶段抛出如下异常:

Error: java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row (tag=0) {"key":  
{},"value":{"_col0":[0.0,10000.0]}} at at
org.apache.hadoop.mapred.ReduceTask.runOldReducer( at at  
org.apache.hadoop.mapred.YarnChild$ at Method) at at at  
org.apache.hadoop.mapred.YarnChild.main( Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while  
processing row (tag=0) {"key":{},"value":{"_col0":[0.0,10000.0]}} at  
... 7 more Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.IndexOutOfBoundsException: Index: 2, Size: 2 at
org.apache.hadoop.hive.ql.exec.GroupByOperator.process( at ... 7 more Caused by: java.lang.IndexOutOfBoundsException: Index: 2,  
Size: 2 at java.util.ArrayList.rangeCheck( at java.util.ArrayList.get( at  
org.apache.hadoop.hive.ql.udf.generic.NumericHistogram.merge( at  
at org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.aggregate( at  
org.apache.hadoop.hive.ql.exec.GroupByOperator.updateAggregations( at  
org.apache.hadoop.hive.ql.exec.GroupByOperator.processAggr( at  
org.apache.hadoop.hive.ql.exec.GroupByOperator.processKey( at  
org.apache.hadoop.hive.ql.exec.GroupByOperator.process( ... 8 more

顺着 stacktrace,我跟踪了一下 NumericHistogram(为了支持 percentile_approx,于 2010 年引入) 的实现。这个类的头部有一段注释,引发了我的好奇心,最终促成了 HIVE-15872 的修复。

 * A generic, re-usable histogram class that supports partial aggregations.
 * The algorithm is a heuristic adapted from the following paper:
 * Yael Ben-Haim and Elad Tom-Tov, "A streaming parallel decision tree algorithm",
 * J. Machine Learning Research 11 (2010), pp. 849--872. Although there are no approximation
 * guarantees, it appears to work well with adequate data and a large (e.g., 20-80) number
 * of histogram bins.
public class NumericHistogram {  

为何 Hive 的 histogram 实现会跟「决策树」扯上关系?看上去有点意思。


需要说明的是,percentile_approx 得到的是近似值,不同的系统或算法针对同一个 Query 很可能会得到不同的结果。Hive 和 Spark SQL 在这个问题上就有差异,两者无论具体实现还是设计思想都有所不同,后面会给出详细解释。


「Partial aggregation 」VS 「Combiners」

  • 为什么要提前把这俩概念提出来?

    对于大多数 MapReduce Job,整体性能的瓶颈往往出现在 shuffle 阶段的 I/O 上。为了减少 shuffle 阶段的 I/O,MapReduce 引入了 Combiner,其本质就是期望能够在 map 之后加一层 combine 聚合计算以减少 shuffle 到下游的数据规模。但是,Combiner 是有局限性的。一般来讲,CombinerClass 和 ReducerClass 都是同一个类。这要求 Combiner 或 Reducer 的具体实现方法满足「交换律」和「结合律」,类似 C(C(x3,x2), C(x1,x4)) <=> C(C(x1,x4), C(x2,x3)) 这样。常见的满足「交换律」和「结合律」的方法有 Max、Min、Count、Sum 等,而 histogram 或者 percentile 之类的聚合函数则不满足。Hive 为了让不同的 UDAF 在工程实现上保持一定的接口一致性,全部放弃使用 Combiner,转而使用 Partial aggregation。这也是 Hive 的源码里面没有一行代码调用了 JobConf#setCombinerClass 的原因。需要说明的是,Combiner 本质上是一种 「trade CPU for gains in IO」的做法,而 Partial aggregation 则是「trade Memory for IO gains by caching values (up to a threshold)」。

  • Partial aggregation 到底是怎么一回事?

    严格讲,Partial aggregation 不同于 Combiner,并不是 MapReduce 层面的概念,它只是应用层面的一种优化策略。Hive 有做这样的优化,另外一个叫做 Cascading 的 BigData 项目也有做类似的工作,我们这里只说下 Hive 的实现。Hive 里面存在一个叫做 GroupByOperator 的算子,它既可以工作在 ExecMapper,也可以工作在 ExecReducer,其内部存在一个 hashAggregations 的 HashMap(前面提到过这是一种牺牲内存而得到 IO 收益的做法):

    // Used by hash-based GroupBy: Mode = HASH, PARTIALS private transient HashMap<KeyWrapper, AggregationBuffer[]> hashAggregations;

    简言之,如果 Hive 需要做一个 UDAF 的计算,那么 ExecMapper 是需要在生成 时不断通过聚合计算更新 hashAggregations 同时在 shuffle 阶段输出进而由 ExecReducer 来做进一步的聚合。Partial 的含义就是,即便我们的结果是一个 value(例如一个 histogram),那么其计算也是一种从 partial 到 complete 的聚合过程。

  • Partial aggregation 对分布式 histogram 有何意义?

    后面介绍了 Hive 的 histogram 实现之后自然会明白。


在讨论 Hive 的 percentile_approx 实现之前,我们先看看 A Streaming Parallel Decision Tree Algorithm (发表于 2010 年)这篇论文到底要解决什么问题或者提出哪种算法给了 Hive 一些启发思想。其中论文摘要如下:

We propose a new algorithm for building decision tree classifiers. The algorithm is executed in a distributed environment and is especially designed for classifying large data sets and streaming

data. It is empirically shown to be as accurate as a standard decision tree classifier, while being

scalable for processing of streaming data on multiple processors. These findings are supported by

a rigorous analysis of the algorithm’s accuracy.

The essence of the algorithm is to quickly construct histograms at the processors, which compress the data to a fixed amount of memory. A master processor uses this information to find

near-optimal split points to terminal tree nodes. Our analysis shows that guarantees on the local

accuracy of split points imply guarantees on the overall tree accuracy.

这篇论文对 Hive 最大的启发就是提出了一种 On-line Histogram Building 算法。什么是 histogram?定义如下:

A histogram is a set of B pairs (called bins) of real numbers {(p1,m1),...,(pB,mB)}, where B is a preset constant integer.

这里的 pair,Hive 用了一个叫做 Coord 的 Class 来表示:

 * The Coord class defines a histogram bin, which is just an (x,y) pair.
static class Coord implements Comparable {  
  double x;
  double y;

  public int compareTo(Object other) {
    return, ((Coord) other).x);

在 histogram 的定义里面,有一个用于标识 bins 数量的常量 B。为何一定要引入这个常量?假设我们有一个简单的 sample 数据集(元素可重复):

[1, 1, 1, 2, 2, 2, 3, 4, 4, 5, 6, 7, 8, 9, 9, 10, 10]

简单心算一下就可以得到其 histogram 为:[(1, 3), (2, 3), (3, 1), (4, 2), (5, 1), (6, 1), (7, 1), (8, 1), (9, 2), (10, 2)]。可以看出,这个 histogram 内部的 bins(数据点和频率一起构成的 pair) 数组长度实质上就是 sample 数据集的基数(不同元素的个数)。histogram 的存储开销会随着 sample 数据集的基数线性增长,这意味着如果不做额外的优化,histogram 将无法适应超大规模数据集合的统计需求。常量 B 就是在这种背景下引入的,其目的在于控制 histogram 的 bins 数组长度(内存开销)。histogram 提供了四个基本的方法:update、merge、sum 和 uniform,为了简化问题,这里只介绍 update 和 merge。

  • update

    update 的作用是将一个 point(数据元素)添加到 histogram,同时会保证整个 histogram 的 bins 数组长度不超过常量 B。

    Hive:percentile_approx 原理与实现

    「步骤 7」的替换方法,应该是整个算法的关健点,感兴趣的同学可以想想这个替换公式是如何得来的。无论是为了控制 bins 数组长度,还是为了得到 percentile 的近似值,没有「步骤 7」的替换,都不可能完成。所谓 approximate,其实就是这里的 replace 引入的。 Hive 在工程实现上,对 update 的操作总是假设已经存在的 bins 数组已经有序(sort by bin.x asc),通过二分查找得到插入点。这个查找的结果可能会精确命中已经存在的 bin,也可能不会。前者只需要简单做下 increment 即可,后者则需要加入一个 trim 过程(也就是上面的「步骤 7」)。update(Hive 里面的方法名叫 add) 的具体实现如下:

            * Adds a new data point to the histogram approximation. Make sure you have
            * called either allocate() or merge() first. This method implements Algorithm #1
            * from Ben-Haim and Tom-Tov, "A Streaming Parallel Decision Tree Algorithm", JMLR 2010.
            * @param v The data point to add to the histogram approximation.
           public void add(double v) {
             // Binary search to find the closest bucket that v should go into.
             // 'bin' should be interpreted as the bin to shift right in order to accomodate
             // v. As a result, bin is in the range [0,N], where N means that the value v is
             // greater than all the N bins currently in the histogram. It is also possible that
             // a bucket centered at 'v' already exists, so this must be checked in the next step.
             int bin = 0;
             for(int l=0, r=nusedbins; l < r; ) {
               bin = (l+r)/2;
               if (bins.get(bin).x > v) {
                 r = bin;
               } else {
                 if (bins.get(bin).x < v) {
                   l = ++bin;
                 } else {
                   break; // break loop on equal comparator

             // If we found an exact bin match for value v, then just increment that bin's count.
             // Otherwise, we need to insert a new bin and trim the resulting histogram back to size.
             // A possible optimization here might be to set some threshold under which 'v' is just
             // assumed to be equal to the closest bin -- if fabs(v-bins[bin].x) < THRESHOLD, then
             // just increment 'bin'. This is not done now because we don't want to make any
             // assumptions about the range of numeric data being analyzed.
             if (bin < nusedbins && bins.get(bin).x == v) {
             } else {
               Coord newBin = new Coord();
               newBin.x = v;
               newBin.y = 1;
               bins.add(bin, newBin);

               // Trim the bins down to the correct number of bins.
               if (++nusedbins > nbins) {
  • merge

    merge 和 update 差别不大,只不过它是将两个 histogram 做合并,方法类似。

    Hive:percentile_approx 原理与实现

    这里仍然少不了 update 算法里面用到的「replace」,Hive 对应的代码实现如下:

    ``` public void merge(List other, DoubleObjectInspector doi) { if(other == null) { return; }

    if(nbins == 0 || nusedbins == 0)  {
           // Our aggregation buffer has nothing in it, so just copy over 'other'
           // by deserializing the ArrayList of (x,y) pairs into an array of Coord objects
           nbins = (int) doi.get(other.get(0));
           nusedbins = (other.size()-1)/2;
           bins = new ArrayList<Coord>(nusedbins);
           for (int i = 1; i < other.size(); i+=2) {
             Coord bin = new Coord();
             bin.x = doi.get(other.get(i));
             bin.y = doi.get(other.get(i+1));
         } else {
           // The aggregation buffer already contains a partial histogram. Therefore, we need
           // to merge histograms using Algorithm #2 from the Ben-Haim and Tom-Tov paper.
    ArrayList<Coord> tmp_bins = new ArrayList<Coord>(nusedbins + (other.size()-1)/2);
       // Copy all the histogram bins from us and 'other' into an overstuffed histogram
       for (int i = 0; i < nusedbins; i++) {
         Coord bin = new Coord();
         bin.x = bins.get(i).x;
         bin.y = bins.get(i).y;
       for (int j = 1; j < other.size(); j += 2) {
         Coord bin = new Coord();
         bin.x = doi.get(other.get(j));
         bin.y = doi.get(other.get(j+1));
       // Now trim the overstuffed histogram down to the correct number of bins
       bins = tmp_bins;
       nusedbins += (other.size()-1)/2;



    从代码的实现来看,other 这个 List(结构为 [nbins, bin0.x, bin0.y, bin1.x, bin1.y, ..., binB.x, binB.y]),其长度必须是奇数。但是,我们前面提到的那个工单,它的错误日志里面拿到的这个 other 的长度为 2(偶数)且数据内容非常奇怪([0.0, 10000.0],nbins 怎么可能是 0?),后面会分析产生这个奇怪数组的原因。


对 histogram 来说,percentile approx 算是它的一个应用 case(histogram 不仅仅可以用来计算 percentile)。Hive 的 percentile approx 由 GenericUDAFPercentileApprox 实现,其核心实现是在 histogram 的 bins 数组前面加上一个用于标识分位点的序列。以上面的 HQL 为例,percentile_approx 的 partial aggregations 序列化之后的结果如下:

// [分位点个数,分位点1,分位点2,分位点3,分位点4,分位点5,bins 数组长度上限 B,bin0.x, bin0.y, bin1.x, bin1.y, ..., binK.x, binK.y],bins 的默认长度上限是 10000
[5, 0.50, 0.70, 0.90, 0.95, 0.99, 10000, x0, y0, x1, y1, ..., xK, yK]

理解了上面这个结构,也就不难理解为何 Hive 的 PercentileAggBuf 会有如下源码:

static class PercentileAggBuf extends AbstractAggregationBuffer {  
  NumericHistogram histogram;   // histogram used for quantile approximation
  double[] quantiles;           // the quantiles requested
  public int estimate() {
    JavaDataModel model = JavaDataModel.get();
    return model.lengthFor(histogram) +
        model.array() + JavaDataModel.PRIMITIVES2 * quantiles.length;

同 histogram 相对应,percentile approx 也有一个 merge 过程(基本思想仍然是 Partial aggregation)。这个 merge 最主要的操作仍然是 PercentileAggBuf#histogram 层面的 merge,而前面的 quantiles 数组在 merge 过程中并没有什么用途,主要用在最终依据分位点获取相关结果上。percentile approx 在做 merge 时,需要将上面那个数组最前面的 quantiles 子序列删除,仅保留 histogram 序列(长度为奇数)。

int nquantiles = (int) doi.get(partialHistogram.get(0));  
if(nquantiles > 0) {  
  myagg.quantiles = new double[nquantiles];
  for(int i = 1; i <= nquantiles; i++) {
    myagg.quantiles[i-1] = doi.get(partialHistogram.get(i));
  partialHistogram.subList(0, nquantiles+1).clear();

前面那个工单提到的 bug,实际上就是这一步的 clear 操作仅仅针对 nquantiles 大于 0 的情况作了考虑,而针对 nquantiles 为 0 的情况则是完全忽略(这份代码存在了 6 年)。从前面的 HQL 可以知道,我们传入的 quantiles 数组是 array(0.50, 0.70, 0.90, 0.95, 0.99),其长度为 5,nquantiles 本不应该为 0 的。那这个 0 是从哪来的?最后追查代码发现,如果一个 Query 的在 map 阶段经过 filter 之后得到的数据为 NULL,那么在这个 NULL 数据集上做 percentile approx 时,map 端的 GroupByOperator 所做的事情只会对 PercentileAggBuf 填充 [0(分位点个数), 10000(histogram 的 bins 数组默认最大长度)]。这才导致了 stacktrace 里面那行导致 IndexOutOfBoundsException 的奇怪 value:{" col0":[0.0,10000.0]}。追到这里,Hive 的这个隐藏 bug 算是被定位清楚了,之后的提交 patch 并被 commit 就水到渠成了。验证这个 bug 的数据可以通过以下 HQL 得到(查询结果是 NULL):

    date = '20170207'
    and column1 = 'value1'
    and column2 = 'value2'
    and column3 = 'value3'
    and column4 = 'value4'
    and column5 = 'value5'

bug 修复了,那么 percentile_approx 是如何从 histogram 里面计算出来的呢?histogram 相关代码实现如下:

 * Gets an approximate quantile value from the current histogram. Some popular
 * quantiles are 0.5 (median), 0.95, and 0.98.
 * @param q The requested quantile, must be strictly within the range (0,1).
 * @return The quantile value.
public double quantile(double q) {  
  assert(bins != null && nusedbins > 0 && nbins > 0);
  double sum = 0, csum = 0;
  int b;
  for(b = 0; b < nusedbins; b++)  {
    sum += bins.get(b).y;
  for(b = 0; b < nusedbins; b++) {
    csum += bins.get(b).y;
    if(csum / sum >= q) {
      if(b == 0) {
        return bins.get(b).x;

      csum -= bins.get(b).y;
      double r = bins.get(b-1).x +
        (q*sum - csum) * (bins.get(b).x - bins.get(b-1).x)/(bins.get(b).y);
      return r;
  return -1; // for Xlint, code will never reach here

Spark SQL

讨论完了 Hive 的 percentile_ approx,我们再来看看 Spark SQL 是如何实现的。Spark SQL 和 Hive 使用了完全不一样的算法,甚至连参考的论文都差了近十年。Spark SQL 的 percentile_approx 是在 SPARK-16283 得到实现和支持的,其参考的论文 Space-Efficient Online Computation of Quantile Summaries 发表于 2001 年。由于篇幅有限加上 Spark SQL 这一块并没有发现 bug,所以不做过多介绍。感兴趣的同学可以翻看论文和 源码

