内容简介:作为一个百亿级的流量实时分析统计系统怎么能没有PV /UV 这两经典的超级玛丽亚指标呢,话说五百年前它俩可是鼻祖,咳咳...,不好意思没忍住,回归正文,我们先理一下整个程序的计算流程,请看大图:先从 DriverMain 入口开始撸起
作为一个百亿级的流量实时分析统计系统怎么能没有PV /UV 这两经典的超级玛丽亚指标呢,话说五百年前它俩可是鼻祖,咳咳...,不好意思没忍住,回归正文, 大猪 在上一篇已经介绍了 小巧高性能ETL程序设计与实现 了,到现在,我们的数据已经落地到Hbase 上了,而且日志的时间也已经写到 Mysql 了,万事都已经具备了,接下来我们就要撸指标了,先从两个经典的指标开始撸。
程序流程
我们先理一下整个程序的计算流程,请看大图:
-
开始计算是我们的 Driver 程序入口
-
开始计算之前检查监听 Redis 有没有收到程序退出通知,如果有程序结束,否则往下执行
-
首先去查询我们上篇文章的ETL loghub 日志的进度的平均时间点
-
Switch 处是判断loghub 的时间距离我们上次计算的指标时间是否相差足够时间,一般定义为3分钟时间之后,因为loghub 的时间会有少量的波动情况
-
不满足则 Sleep 30秒,可以自己控制Sleep范围。
-
满足则计算
上次指标计算结束时间
~(loghub时间 - 3分钟日志波动)
-
计算完成更新指标结果并且更新指标计算时间,然后回到第 2 点。
程序实现
先从 DriverMain 入口开始撸起
//监听redis退出消息 while (appRunning) { val dbClient = new DBJdbc(props.getProperty("jdbcUrl")) //日志offset val loghubTime = dbClient.query("loghub").toLocalDateTime.minusMinutes(3) //指标计算offset val indicatorTime =dbClient.query("indicator").toLocalDateTime //两个时间相差(分) val betweenTimeMinutes = Duration.between(indicatorTime, loghubTime).toMinutes val format = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS") //相差足够时间则进行指标运行,否则睡眠 if (betweenTimeMinutes >= 1) { app.run(spark, indicatorTime, loghubTime) //计算完成更新指标时间 dbClient.upsert(Map("offsetName" -> "indicator"), Update(sets = Map("time" -> loghubTime.toString)), "offset") } else { //让我们的老大哥睡会,别太累了 TimeUnit.SECONDS.sleep(30) } } 复制代码
从注释上看,整体思路还是比较清晰的。
接下来我们跟着往下看 run
里面的方法做了什么有意思的操作
conf.set(TableInputFormat.INPUT_TABLE, Tables.LOG_TABLE) conf.set("TableInputFormat.SCAN_ROW_START", start) conf.set("TableInputFormat.SCAN_ROW_START", end) val logDS = sc.newAPIHadoopRDD( conf, classOf[TableInputFormat2], classOf[ImmutableBytesWritable], classOf[Result] ) .map(tp2 => HbaseUtil.resultToMap(tp2._2)) .map(map => { LogCase( //子case类,存放多种格式的时间 dt = DT( map.get("time").toLocalDateTimeStr(), map.get("time").toLocalDate().toString ), `type` = map.get("type"), aid = map.get("aid"), uid = map.get("uid"), tid = map.get("tid"), ip = map.get("ip") ) }).toDS() logDS.cache() logDS.createTempView("log") //各类指标 new PV().run() new UV().run() 复制代码
start
跟 end
就是上面传下来需要查询的日志时间范围
简要说明:就是把Hbase的时间范围数据转成SparkSQL中的一张 log
表
在UV 跟PV 指标计算里面就可以使用这张 log
表了
我们看看这两个经典的指标里面到底有什么乾坤:
spark.sql( """ |SELECT | aid, | dt.date, | COUNT(1) as pv |FROM | log |GROUP BY | aid, | dt.date """.stripMargin) .rdd .foreachPartition(rows => { val props = PropsUtils.properties("db") val dbClient = new DBJdbc(props.getProperty("jdbcUrl")) rows.foreach(row => { dbClient.upsert( Map( "time" -> row.getAs[String]("date"), "aid" -> row.getAs[String]("aid") ), Update(incs = Map("pv" -> row.getAs[Long]("pv").toString)), "common_report" ) }) dbClient.close() }) 复制代码
哇然一看,大哥你这也写得太简单了吧
不就是一个普通的 PV 算法,再加上分区 foreachPartition
操作把更到的每一行聚合的结果数据 upsert
到我们的 common_report
指标表
group by后面跟上要聚合的维度,以上是想统计每篇文章每天的PV
从这个方法我们就能推算出 common_report
长什么样了,至少有 time
+ aid
这两个唯一索引字段,还有pv这个字段,默认值肯定是 0
百闻不如一见,看看表的DDL 是不是这样子:
create table common_report ( id bigint auto_increment primary key, aid bigint not null, pv int default 0 null, uv int default 0 null, time date not null, constraint common_report_aid_time_uindex unique (aid, time) ); 复制代码
果然一点都没错。
再看 dbClient.upsert 里面大概也能猜到是实现了mysql的upsert功能,大概的 sql 就会生成下面格式:
INSERT INTO common_report (time, aid, pv) VALUES ('2019-03-26', '10000', 1) ON DUPLICATE KEY UPDATE pv = pv + 1; 复制代码
大猪那 UV 是怎么实现咧?一个用户在今天来过第一次之后再来就不能重复计算了噢。
大猪答:这个简单简单,可以使用 Redis
去重嘛,但是我们使用的都是 Hbase
了,还使用它做啥子咧,具体我们看一下 UV 里面到底是如何实现的:
val logDS = spark.table("log").as(ExpressionEncoder[LogCase]) import spark.implicits._ logDS .mapPartitions(partitionT => { val hbaseClient = DBHbaseHelper.getDBHbase(Tables.CACHE_TABLE) val md5 = (log: LogCase) => MD5Hash.getMD5AsHex(s"${log.dt.date}|${log.aid}|${log.uid}|uv".getBytes) partitionT .grouped(Consts.BATCH_MAPPARTITIONS) .flatMap { tList => tList .zip(hbaseClient.incrments(tList.map(md5))) .map(tp2 => { val log = tp2._1 log.copy(ext = EXT(tp2._2)) }) } }).createTempView("uvTable") spark.sql( """ |SELECT | aid, | dt.date, | COUNT(1) as uv |FROM | uvTable |WHERE | ext.render = 1 |GROUP BY | aid, | dt.date """.stripMargin) .rdd .foreachPartition(rows => { val props = PropsUtils.properties("db") val dbClient = new DBJdbc(props.getProperty("jdbcUrl")) rows.foreach(row => { dbClient.upsert( Map( "time" -> row.getAs[String]("date"), "aid" -> row.getAs[String]("aid") ), Update(incs = Map("uv" -> row.getAs[Long]("uv").toString)), "common_report" ) }) dbClient.close() }) 复制代码
spark.sql 这里跟PV一样嘛,就是多了一句条件 ext.render = 1
,但是上面那一大堆是啥子咧?
大猪CACHE_TABLE 是什么来的,是Hbase一张中间表,用户存用户UV标记的,建表语句如下,因为维度都是按天,所以我们TTL设计3天就可以了,两天也可以。
create 'CACHE_FOR_TEST',{NAME => 'info',TTL => '3 DAYS',CONFIGURATION => {'SPLIT_POLICY' => 'org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy','KeyPrefixRegionSplitPolicy.prefix_length'=>'2'},COMPRESSION=>'SNAPPY'},SPLITS => ['20', '40', '60', '80', 'a0', 'c0', 'e0'] 复制代码
那还有其它的呢?
莫慌莫慌, 大猪 这就慢慢解释道:
val logDS = spark.table("log").as(ExpressionEncoder[LogCase]) 复制代码
上面这句的意思就是就是把log表给取出来,当然也可以通过参数传递。
下面的 mapPartitions
挺有意思的:
partitionT .grouped(1000) .flatMap { tList => tList .zip(hbaseClient.incrments(tList.map(md5))) .map(tp2 => { val log = tp2._1 log.copy(ext = EXT(tp2._2)) }) } 复制代码
实际上面是处理每个分区的数据,也就是转换数据,我们每来一条数据就要去Hbase那 incrment
一次,返回来的结果就是 render ,用户今天来多少次就 incrment
相应的次数。
那有什么用?我直接从Hbase GET
取出数据,再判断有没有,如果没有这个用户就是今天第一次来,再把这个用户 PUT
进Hbase打一个标记,so easy。
其实当初我们也是这么做的,后面发现业务的东西还是放在SQL里面一起写比较好,容易维护,而且incrment好处多多,因为它是带事务的,可以多线程进行修改。
而且你们也发现了 GET
跟 PUT
是两次请求操作,保证不了事务的,指标几千万的数据少了那么几条,你们都不知道我当初找它们有辛苦。
你们有没有发现 render = 1
的时候是代表UV(刚好等于1的时候为什么是UV?这里大家要慢慢地品尝一下了,其实就是实现了 GET
跟 PUT
操作),如果 render = 2
的时候又可以代表今天来过两次以上的用户指标,随时扩展,就问你撸这样的代码结构爽不爽?
看看 incrments 方法实现了啥子
def incrments(incs: Seq[String], family: String = "info", amount: Int = 1): Seq[Long] = { if (incs.isEmpty) { Seq[Long]() } else { require(incs.head.length == 32, "pk require 32 length") val convertIncs = incs map { pk => new Increment(Bytes.toBytes(pk.take(8))).addColumn(Bytes.toBytes(family), Bytes.toBytes(pk.takeRight(24)), amount) } val results = new Array[Object](convertIncs.length) table.batch(convertIncs.asJava, results) results.array.indices.map( ind => Bytes.toLong( results(ind) .asInstanceOf[Result] .getValue( Bytes.toBytes(family), Bytes.toBytes(incs(ind).takeRight(24)) ) ) ) } } 复制代码
这个方法就是实现了 incrment 的批量处理,因为我们在线上生产环境的时候测试过,批量处理比单条处理性能高了上百倍,所以这也就是为什么要写在 mapPartitions
里面的原因了,因为只有在这个方法里面才有批量数据转换操作, foreachPartition
是批量处理操作, foreach
,与 map
是一条一条操作不能使用,我们在输出报表到Mysql的地方已经用到过了。
大猪不知不觉已经写了那么长的文章了
关闭计算程序只需要给redis发一条stop消息就可以啦
RedisUtil().getResource.publish("computeListenerMessage", "stop") 复制代码
不能再复制代码了,不能显得文章是靠代码撑起来的。
福利 完整项目源码
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- k8s中流量分离以及资源隔离实战
- Spark+Hbase 亿级流量分析实战(日志存储设计)
- Sentinel:分布式系统的流量防卫兵基础实战
- Sentinel:分布式系统的流量防卫兵进阶实战
- Spark+Hbase 亿级流量分析实战(数据结构设计)
- 2 周流量激增百倍的腾讯课堂后台扩容和性能优化实战
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。