Spark+Hbase 亿级流量分析实战( PV/UV )

栏目: 数据库 · 发布时间: 6年前

内容简介:作为一个百亿级的流量实时分析统计系统怎么能没有PV /UV 这两经典的超级玛丽亚指标呢,话说五百年前它俩可是鼻祖,咳咳...,不好意思没忍住,回归正文,我们先理一下整个程序的计算流程,请看大图:先从 DriverMain 入口开始撸起

作为一个百亿级的流量实时分析统计系统怎么能没有PV /UV 这两经典的超级玛丽亚指标呢,话说五百年前它俩可是鼻祖,咳咳...,不好意思没忍住,回归正文, 大猪 在上一篇已经介绍了 小巧高性能ETL程序设计与实现 了,到现在,我们的数据已经落地到Hbase 上了,而且日志的时间也已经写到 Mysql 了,万事都已经具备了,接下来我们就要撸指标了,先从两个经典的指标开始撸。

程序流程

我们先理一下整个程序的计算流程,请看大图:

Spark+Hbase 亿级流量分析实战( PV/UV )
  1. 开始计算是我们的 Driver 程序入口

  2. 开始计算之前检查监听 Redis 有没有收到程序退出通知,如果有程序结束,否则往下执行

  3. 首先去查询我们上篇文章的ETL loghub 日志的进度的平均时间点

  4. Switch 处是判断loghub 的时间距离我们上次计算的指标时间是否相差足够时间,一般定义为3分钟时间之后,因为loghub 的时间会有少量的波动情况

  5. 不满足则 Sleep 30秒,可以自己控制Sleep范围。

  6. 满足则计算 上次指标计算结束时间 ~ (loghub时间 - 3分钟日志波动)

  7. 计算完成更新指标结果并且更新指标计算时间,然后回到第 2 点。

程序实现

先从 DriverMain 入口开始撸起

//监听redis退出消息
while (appRunning) {
      val dbClient = new DBJdbc(props.getProperty("jdbcUrl"))
      //日志offset
      val loghubTime = dbClient.query("loghub").toLocalDateTime.minusMinutes(3)
      //指标计算offset
      val indicatorTime =dbClient.query("indicator").toLocalDateTime
      //两个时间相差(分)
      val betweenTimeMinutes = Duration.between(indicatorTime, loghubTime).toMinutes

      val format = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS")
      //相差足够时间则进行指标运行,否则睡眠
      if (betweenTimeMinutes >= 1) {
        app.run(spark, indicatorTime, loghubTime)
        //计算完成更新指标时间
        dbClient.upsert(Map("offsetName" -> "indicator"), Update(sets = Map("time" -> loghubTime.toString)), "offset")
      } else {
        //让我们的老大哥睡会,别太累了
        TimeUnit.SECONDS.sleep(30)
      }
    }
复制代码

从注释上看,整体思路还是比较清晰的。

接下来我们跟着往下看 run 里面的方法做了什么有意思的操作

conf.set(TableInputFormat.INPUT_TABLE, Tables.LOG_TABLE)
conf.set("TableInputFormat.SCAN_ROW_START", start)
conf.set("TableInputFormat.SCAN_ROW_START", end)
val logDS = sc.newAPIHadoopRDD(
      conf,
      classOf[TableInputFormat2],
      classOf[ImmutableBytesWritable],
      classOf[Result]
    )
      .map(tp2 => HbaseUtil.resultToMap(tp2._2))
      .map(map => {
        LogCase(
          //子case类,存放多种格式的时间
          dt = DT(
            map.get("time").toLocalDateTimeStr(),
            map.get("time").toLocalDate().toString
          ),
          `type` = map.get("type"),
          aid = map.get("aid"),
          uid = map.get("uid"),
          tid = map.get("tid"),
          ip = map.get("ip")
        )
      }).toDS()

    logDS.cache()
    logDS.createTempView("log")
    //各类指标
    new PV().run()
    new UV().run()
复制代码

startend 就是上面传下来需要查询的日志时间范围

简要说明:就是把Hbase的时间范围数据转成SparkSQL中的一张 log

在UV 跟PV 指标计算里面就可以使用这张 log 表了

我们看看这两个经典的指标里面到底有什么乾坤:

spark.sql(
      """
        |SELECT
        |    aid,
        |    dt.date,
        |    COUNT(1) as pv
        |FROM
        |    log
        |GROUP BY
        |    aid,
        |    dt.date
      """.stripMargin)
      .rdd
      .foreachPartition(rows => {
        val props = PropsUtils.properties("db")
        val dbClient = new DBJdbc(props.getProperty("jdbcUrl"))
        rows.foreach(row => {
          dbClient.upsert(
            Map(
              "time" -> row.getAs[String]("date"),
              "aid" -> row.getAs[String]("aid")
            ),
            Update(incs = Map("pv" -> row.getAs[Long]("pv").toString)),
            "common_report"
          )
        })
        dbClient.close()
      })
复制代码

哇然一看,大哥你这也写得太简单了吧

不就是一个普通的 PV 算法,再加上分区 foreachPartition 操作把更到的每一行聚合的结果数据 upsert 到我们的 common_report 指标表

group by后面跟上要聚合的维度,以上是想统计每篇文章每天的PV

从这个方法我们就能推算出 common_report 长什么样了,至少有 time + aid 这两个唯一索引字段,还有pv这个字段,默认值肯定是 0

百闻不如一见,看看表的DDL 是不是这样子:

create table common_report
(
	id bigint auto_increment primary key,
	aid bigint not null,
	pv int default 0 null,
	uv int default 0 null,
	time date not null,
	constraint common_report_aid_time_uindex unique (aid, time)
);
复制代码

果然一点都没错。

Spark+Hbase 亿级流量分析实战( PV/UV )

再看 dbClient.upsert 里面大概也能猜到是实现了mysql的upsert功能,大概的 sql 就会生成下面格式:

INSERT INTO common_report (time, aid, pv)
VALUES ('2019-03-26', '10000', 1) ON DUPLICATE KEY UPDATE pv = pv + 1;
复制代码

大猪那 UV 是怎么实现咧?一个用户在今天来过第一次之后再来就不能重复计算了噢。

大猪答:这个简单简单,可以使用 Redis 去重嘛,但是我们使用的都是 Hbase 了,还使用它做啥子咧,具体我们看一下 UV 里面到底是如何实现的:

val logDS = spark.table("log").as(ExpressionEncoder[LogCase])
    import spark.implicits._
    logDS
      .mapPartitions(partitionT => {
        val hbaseClient = DBHbaseHelper.getDBHbase(Tables.CACHE_TABLE)
        val md5 = (log: LogCase) => MD5Hash.getMD5AsHex(s"${log.dt.date}|${log.aid}|${log.uid}|uv".getBytes)
        partitionT
          .grouped(Consts.BATCH_MAPPARTITIONS)
          .flatMap { tList =>
            tList
              .zip(hbaseClient.incrments(tList.map(md5)))
              .map(tp2 => {
                val log = tp2._1
                log.copy(ext = EXT(tp2._2))
              })
          }
      }).createTempView("uvTable")

    spark.sql(
      """
        |SELECT
        |    aid,
        |    dt.date,
        |    COUNT(1) as uv
        |FROM
        |    uvTable
        |WHERE
        |    ext.render = 1
        |GROUP BY
        |    aid,
        |    dt.date
      """.stripMargin)
      .rdd
      .foreachPartition(rows => {
        val props = PropsUtils.properties("db")
        val dbClient = new DBJdbc(props.getProperty("jdbcUrl"))
        rows.foreach(row => {
          dbClient.upsert(
            Map(
              "time" -> row.getAs[String]("date"),
              "aid" -> row.getAs[String]("aid")
            ),
            Update(incs = Map("uv" -> row.getAs[Long]("uv").toString)),
            "common_report"
          )
        })
        dbClient.close()
      })
复制代码

spark.sql 这里跟PV一样嘛,就是多了一句条件 ext.render = 1 ,但是上面那一大堆是啥子咧?

大猪CACHE_TABLE 是什么来的,是Hbase一张中间表,用户存用户UV标记的,建表语句如下,因为维度都是按天,所以我们TTL设计3天就可以了,两天也可以。

create 'CACHE_FOR_TEST',{NAME => 'info',TTL => '3 DAYS',CONFIGURATION => {'SPLIT_POLICY' => 'org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy','KeyPrefixRegionSplitPolicy.prefix_length'=>'2'},COMPRESSION=>'SNAPPY'},SPLITS => ['20', '40', '60', '80', 'a0', 'c0', 'e0']
复制代码

那还有其它的呢?

莫慌莫慌, 大猪 这就慢慢解释道:

val logDS = spark.table("log").as(ExpressionEncoder[LogCase])
复制代码

上面这句的意思就是就是把log表给取出来,当然也可以通过参数传递。

下面的 mapPartitions 挺有意思的:

partitionT
    .grouped(1000)
        .flatMap { tList =>
          tList
            .zip(hbaseClient.incrments(tList.map(md5)))
            .map(tp2 => {
              val log = tp2._1
              log.copy(ext = EXT(tp2._2))
            })
        }
复制代码

实际上面是处理每个分区的数据,也就是转换数据,我们每来一条数据就要去Hbase那 incrment 一次,返回来的结果就是 render ,用户今天来多少次就 incrment 相应的次数。

那有什么用?我直接从Hbase GET 取出数据,再判断有没有,如果没有这个用户就是今天第一次来,再把这个用户 PUT 进Hbase打一个标记,so easy。

其实当初我们也是这么做的,后面发现业务的东西还是放在SQL里面一起写比较好,容易维护,而且incrment好处多多,因为它是带事务的,可以多线程进行修改。

而且你们也发现了 GETPUT 是两次请求操作,保证不了事务的,指标几千万的数据少了那么几条,你们都不知道我当初找它们有辛苦。

Spark+Hbase 亿级流量分析实战( PV/UV )

你们有没有发现 render = 1 的时候是代表UV(刚好等于1的时候为什么是UV?这里大家要慢慢地品尝一下了,其实就是实现了 GETPUT 操作),如果 render = 2 的时候又可以代表今天来过两次以上的用户指标,随时扩展,就问你撸这样的代码结构爽不爽?

Spark+Hbase 亿级流量分析实战( PV/UV )

看看 incrments 方法实现了啥子

def incrments(incs: Seq[String], family: String = "info", amount: Int = 1): Seq[Long] = {
    if (incs.isEmpty) {
      Seq[Long]()
    } else {
      require(incs.head.length == 32, "pk require 32 length")
      val convertIncs = incs map { pk => new Increment(Bytes.toBytes(pk.take(8))).addColumn(Bytes.toBytes(family), Bytes.toBytes(pk.takeRight(24)), amount) }
      val results = new Array[Object](convertIncs.length)
      table.batch(convertIncs.asJava, results)
      results.array.indices.map(
        ind =>
          Bytes.toLong(
            results(ind)
              .asInstanceOf[Result]
              .getValue(
                Bytes.toBytes(family),
                Bytes.toBytes(incs(ind).takeRight(24))
              )
          )
      )
    }
  }
复制代码

这个方法就是实现了 incrment 的批量处理,因为我们在线上生产环境的时候测试过,批量处理比单条处理性能高了上百倍,所以这也就是为什么要写在 mapPartitions 里面的原因了,因为只有在这个方法里面才有批量数据转换操作, foreachPartition 是批量处理操作, foreach ,与 map 是一条一条操作不能使用,我们在输出报表到Mysql的地方已经用到过了。

大猪不知不觉已经写了那么长的文章了

Spark+Hbase 亿级流量分析实战( PV/UV )

关闭计算程序只需要给redis发一条stop消息就可以啦

RedisUtil().getResource.publish("computeListenerMessage", "stop")
复制代码

不能再复制代码了,不能显得文章是靠代码撑起来的。

Spark+Hbase 亿级流量分析实战( PV/UV )

福利 完整项目源码

Spark+Hbase 亿级流量分析实战( PV/UV )

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Fluent Python

Fluent Python

Luciano Ramalho / O'Reilly Media / 2015-8-20 / USD 39.99

Learn how to write idiomatic, effective Python code by leveraging its best features. Python's simplicity quickly lets you become productive with it, but this often means you aren’t using everything th......一起来看看 《Fluent Python》 这本书的介绍吧!

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具