Spark综合使用及电商案例广告点击量实时统计分析实战-Spark商业应用实战

栏目: 数据库 · 发布时间: 5年前

内容简介:版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。QQ邮箱地址:1120746959@qq.com,如有任何技术交流,可随时联系。

版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。QQ邮箱地址:1120746959@qq.com,如有任何技术交流,可随时联系。

1 广告点击数据模型

1.1 数据格式

timestamp 	  province 	  city        userid         adid
复制代码

1.2 生成动态黑名单

数据格式
    (timestamp province city userid adid)
    
    统计单用户的统计次数
    (date, userid,adid,clickCount)
    
    阈值统计统计黑名单
复制代码
Spark综合使用及电商案例广告点击量实时统计分析实战-Spark商业应用实战

1.3 计算广告点击流量实时统计结果

输入数据格式
    (userid, timestamp province city userid adid)
    
    计算后数据格式并持久化
    (date,province,city,adid,clickCount)
复制代码
Spark综合使用及电商案例广告点击量实时统计分析实战-Spark商业应用实战

1.4 实时统计每天每个省份top3热门广告

输入数据格式
   (yyyyMMdd_province_city_adid,clickCount)
    
    计算后数据格式并持久化
    (date,province, adid,clickCount)
    
    注册成表ROW_NUMBER()实现窗聚合
    tmp_daily_ad_click_count_by_prov
复制代码
Spark综合使用及电商案例广告点击量实时统计分析实战-Spark商业应用实战

1.5 实时统计每天每个广告在最近1小时的滑动窗口内的点击趋势(每分钟的点击量)

输入数据格式
    (timestamp province city userid adid)
    
    计算后数据格式并持久化
    (date,hour,minute,adid,clickCount)
复制代码
Spark综合使用及电商案例广告点击量实时统计分析实战-Spark商业应用实战

2 具体技术实现

2.1 SparkStreaming 与Kafka对接

  • 构建Spark上下文

    val sparkConf = new SparkConf().setAppName("streamingRecommendingSystem").setMaster("local[*]")
    
      // 创建Spark客户端
      val spark = SparkSession.builder().config(sparkConf).getOrCreate()
      val sc = spark.sparkContext
      val ssc = new StreamingContext(sc, Seconds(5))
      
      // 获取Kafka配置
      val broker_list = ConfigurationManager.config.getString("kafka.broker.list")
      val topics = ConfigurationManager.config.getString("kafka.topics")
    复制代码
  • kafka消费者配置

    val kafkaParam = Map(
       "bootstrap.servers" -> broker_list,//用于初始化链接到集群的地址
       "key.deserializer" -> classOf[StringDeserializer],
       "value.deserializer" -> classOf[StringDeserializer],
       //用于标识这个消费者属于哪个消费团体
       "group.id" -> "commerce-consumer-group",
       //如果没有初始化偏移量或者当前的偏移量不存在任何服务器上,可以使用这个配置属性
       //可以使用这个配置,latest自动重置偏移量为最新的偏移量
       //earilist:提交过分区,从Offset处读取,如果没有提交过offset,从头读取
       //latest:提交过分区,从Offset处读取,没有从最新的数据开始读取
       //None:如果没有提交offset,就会报错,提交过offset,就从offset处读取
       "auto.offset.reset" -> "latest",
       //如果是true,则这个消费者的偏移量会在后台自动提交
       "enable.auto.commit" -> (false: java.lang.Boolean)
     )
    复制代码
  • 设置检查点目录

    ssc.checkpoint("./streaming_checkpoint")
    复制代码
  • LocationStrategies 分配分区策略

    // 创建DStream,返回接收到的输入数据
      // LocationStrategies:根据给定的主题和集群地址创建consumer
      // LocationStrategies.PreferConsistent:持续的在所有Executor之间匀分配分区 (均匀分配,选中的每一个Executor都会分配 partition)
      // LocationStrategies.PreferBrokers: 如果executor和kafka brokers 在同一台机器上,选择该executor。
      // LocationStrategies.PreferFixed: 如果机器不是均匀的情况下,可以指定特殊的hosts。当然如果不指定,采用 LocationStrategies.PreferConsistent模式
    复制代码
  • ConsumerStrategies 消费策略

    // ConsumerStrategies:选择如何在Driver和Executor上创建和配置Kafka Consumer
      // ConsumerStrategies.Subscribe:订阅一系列主题
      val adRealTimeLogDStream=KafkaUtils.createDirectStream[String,String](ssc,
              LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](Array(topics),kafkaParam))
    复制代码
  • SparkStreaming开始消费

    var adRealTimeValueDStream = adRealTimeLogDStream.map(consumerRecordRDD => consumerRecordRDD.value())
    复制代码

2.2 SparkStreaming 开始根据黑名单进行过滤

  • 算法过程如下 (timestamp province city userid adid) -> (userid, timestamp province city userid adid)

  • 根据userId进行过滤

    用于Kafka Stream的线程非安全问题,重新分区切断血统
      adRealTimeValueDStream = adRealTimeValueDStream.repartition(400)
     
      val filteredAdRealTimeLogDStream = filterByBlacklist(spark,adRealTimeValueDStream)
      
      def filterByBlacklist(spark: SparkSession, adRealTimeValueDStream:DStream[String]):DStream[(Long, String)] = {
         
          // 刚刚接受到原始的用户点击行为日志之后
          // 根据 mysql 中的动态黑名单,进行实时的黑名单过滤(黑名单用户的点击行为,直接过滤掉,不要了)
          // 使用transform算子(将dstream中的每个batch RDD进行处理,转换为任意的其他RDD,功能很强大)
      
          val filteredAdRealTimeLogDStream = adRealTimeValueDStream.transform{ consumerRecordRDD =>
      
            //格式 :timestamp province city userid adid
            //某个时间点 某个省份 某个城市 某个用户 某个广告
      
            // 首先,从mysql中查询所有黑名单用户,将其转换为一个rdd
            val adBlacklists = AdBlacklistDAO.findAll()
            // (userid, timestamp province city userid adid)
            val blacklistRDD = spark.sparkContext.makeRDD(adBlacklists.map(item => (item.userid, true)))
      
      
            //格式 :timestamp province city userid adid
            val mappedRDD = consumerRecordRDD.map(consumerRecord => {
              val userid = consumerRecord.split(" ")(3).toLong
              (userid,consumerRecord)
            })
      
            // 将原始日志数据rdd,与黑名单rdd,进行左外连接
            // 如果说原始日志的userid,没有在对应的黑名单中,join不到,左外连接
            // 用inner join,内连接,会导致数据丢失
            val joinedRDD = mappedRDD.leftOuterJoin(blacklistRDD)
      
            val filteredRDD = joinedRDD.filter{ case (userid,(log, black)) =>
              // 如果这个值存在,那么说明原始日志中的userid,join到了某个黑名单用户
              if(black.isDefined && black.get) false else true
            }
      
            filteredRDD.map{ case (userid,(log, black)) => (userid, log)}
          }
      
          filteredAdRealTimeLogDStream
    }
    复制代码

2.3 SparkStreaming 生成动态黑名单

  • 转化为用户粒度进行过滤,抛弃 province city ,格式为:(date, userid,adid,clickCount)

  • 过滤次数大于阈值的userId,持久化到磁盘。

    generateDynamicBlacklist(filteredAdRealTimeLogDStream)
      
      def generateDynamicBlacklist(filteredAdRealTimeLogDStream: DStream[(Long, String)]) {
          
          // (timestamp province city userid adid)
          // 计算出每5个秒内的数据中,每天每个用户每个广告的点击量
          // 通过对原始实时日志的处理
          // 将日志的格式处理成<yyyyMMdd_userid_adid, 1L>格式
          val dailyUserAdClickDStream = filteredAdRealTimeLogDStream.map{ case (userid,log) =>
      
            // 从tuple中获取到每一条原始的实时日志
            val logSplited = log.split(" ")
      
            // 提取出日期(yyyyMMdd)、userid、adid
            val timestamp = logSplited(0)
            val date = new Date(timestamp.toLong)
            val datekey = DateUtils.formatDateKey(date)
      
            val userid = logSplited(3).toLong
            val adid = logSplited(4)
      
            // 拼接key
            val key = datekey + "_" + userid + "_" + adid
            (key, 1L)
          }
      
          // 针对处理后的日志格式,执行reduceByKey算子即可,(每个batch中)每天每个用户对每个广告的点击量
          val dailyUserAdClickCountDStream = dailyUserAdClickDStream.reduceByKey(_ + _)
      
          // 源源不断的,每个5s的batch中,当天每个用户对每支广告的点击次数
          // <yyyyMMdd_userid_adid, clickCount>
          dailyUserAdClickCountDStream.foreachRDD{ rdd =>
            rdd.foreachPartition{ items =>
              // 对每个分区的数据就去获取一次连接对象
              // 每次都是从连接池中获取,而不是每次都创建
              // 写数据库操作,性能已经提到最高了
      
              val adUserClickCounts = ArrayBuffer[AdUserClickCount]()
              for(item <- items){
                val keySplited = item._1.split("_")
                val date = DateUtils.formatDate(DateUtils.parseDateKey(keySplited(0)))
                // yyyy-MM-dd
                val userid = keySplited(1).toLong
                val adid = keySplited(2).toLong
                val clickCount = item._2
      
                //批量插入
                adUserClickCounts += AdUserClickCount(date, userid,adid,clickCount)
              }
              AdUserClickCountDAO.updateBatch(adUserClickCounts.toArray)
            }
          }
      
          // 现在我们在mysql里面,已经有了累计的每天各用户对各广告的点击量
          // 遍历每个batch中的所有记录,对每条记录都要去查询一下,这一天这个用户对这个广告的累计点击量是多少
          // 从mysql中查询
          // 查询出来的结果,如果是100,如果你发现某个用户某天对某个广告的点击量已经大于等于100了
          // 那么就判定这个用户就是黑名单用户,就写入mysql的表中,持久化
          val blacklistDStream = dailyUserAdClickCountDStream.filter{ case (key, count) =>
            val keySplited = key.split("_")
      
            // yyyyMMdd -> yyyy-MM-dd
            val date = DateUtils.formatDate(DateUtils.parseDateKey(keySplited(0)))
            val userid = keySplited(1).toLong
            val adid = keySplited(2).toLong
      
            // 从mysql中查询指定日期指定用户对指定广告的点击量
            val clickCount = AdUserClickCountDAO.findClickCountByMultiKey(date, userid, adid)
      
            // 判断,如果点击量大于等于100,ok,那么不好意思,你就是黑名单用户
            // 那么就拉入黑名单,返回true
            if(clickCount >= 100) {
              true
            }else{
              // 反之,如果点击量小于100的,那么就暂时不要管它了
              false
            }
      } 
    复制代码

2.4 计算广告点击流量实时统计结果(yyyyMMdd_province_city_adid,clickCount)

  • 转化为省城市粒度进行过滤,抛弃userId,格式为:(yyyyMMdd_province_city_adid,clickCount)

    val adRealTimeStatDStream = calculateRealTimeStat(filteredAdRealTimeLogDStream)
      
      def calculateRealTimeStat(filteredAdRealTimeLogDStream:DStream[(Long, String)]):DStream[(String, Long)] = {
      
          // 计算每天各省各城市各广告的点击量
          // 设计出来几个维度:日期、省份、城市、广告
          // 2015-12-01,当天,可以看到当天所有的实时数据(动态改变),比如江苏省南京市
          // 广告可以进行选择(广告主、广告名称、广告类型来筛选一个出来)
          // 拿着date、province、city、adid,去mysql中查询最新的数据
          // 等等,基于这几个维度,以及这份动态改变的数据,是可以实现比较灵活的广告点击流量查看的功能的
      
          // date province city userid adid
          // date_province_city_adid,作为key;1作为value
          // 通过spark,直接统计出来全局的点击次数,在spark集群中保留一份;在mysql中,也保留一份
          // 我们要对原始数据进行map,映射成<date_province_city_adid,1>格式
          // 然后呢,对上述格式的数据,执行updateStateByKey算子
          // spark streaming特有的一种算子,在spark集群内存中,维护一份key的全局状态
          //(userid, timestamp province city userid adid)
          val mappedDStream = filteredAdRealTimeLogDStream.map{ case (userid, log) =>
            val logSplited = log.split(" ")
      
            val timestamp = logSplited(0)
            val date = new Date(timestamp.toLong)
            val datekey = DateUtils.formatDateKey(date)
      
            val province = logSplited(1)
            val city = logSplited(2)
            val adid = logSplited(4).toLong
      
            val key = datekey + "_" + province + "_" + city + "_" + adid
      
            (key, 1L)
          }
      
          // 在这个dstream中,就相当于,有每个batch rdd累加的各个key(各天各省份各城市各广告的点击次数)
          // 每次计算出最新的值,就在aggregatedDStream中的每个batch rdd中反应出来
          val aggregatedDStream = mappedDStream.updateStateByKey[Long]{ (values:Seq[Long], old:Option[Long]) =>
            // 举例来说
            // 对于每个key,都会调用一次这个方法
            // 比如key是<20151201_Jiangsu_Nanjing_10001,1>,就会来调用一次这个方法7
            // 10个
      
            // values,(1,1,1,1,1,1,1,1,1,1)
      
            // 首先根据optional判断,之前这个key,是否有对应的状态
            var clickCount = 0L
      
            // 如果说,之前是存在这个状态的,那么就以之前的状态作为起点,进行值的累加
            if(old.isDefined) {
              clickCount = old.get
            }
      
            // values,代表了,batch rdd中,每个key对应的所有的值
            for(value <- values) {
              clickCount += value
            }
      
            Some(clickCount)
          }
      
          // 将计算出来的最新结果,同步一份到mysql中,以便于j2ee系统使用
          aggregatedDStream.foreachRDD{ rdd =>
      
            rdd.foreachPartition{ items =>
      
              //批量保存到数据库
              val adStats = ArrayBuffer[AdStat]()
      
              for(item <- items){
                val keySplited = item._1.split("_")
                val date = keySplited(0)
                val province = keySplited(1)
                val city = keySplited(2)
                val adid = keySplited(3).toLong
      
                val clickCount = item._2
                adStats += AdStat(date,province,city,adid,clickCount)
              }
              AdStatDAO.updateBatch(adStats.toArray)
      
            }
      
          }
      
          aggregatedDStream
    }
    复制代码

2.5 实时统计每天每个省份top3热门广告

  • 转化为省粒度进行过滤,抛弃userId ,cityid,格式为:(yyyyMMdd_province_adid,clickCount)

  • 注册成表,基于ROW_NUMBER()实现窗聚合,按照province分区,实现top3排序,

    tmp_daily_ad_click_count_by_prov
    
     calculateProvinceTop3Ad(spark,adRealTimeStatDStream)
     
     def calculateProvinceTop3Ad(spark:SparkSession, adRealTimeStatDStream:DStream[(String, Long)]) {
    
     // 每一个batch rdd,都代表了最新的全量的每天各省份各城市各广告的点击量
     //(yyyyMMdd_province_city_adid,clickCount)
     val rowsDStream = adRealTimeStatDStream.transform{ rdd =>
    
       // <yyyyMMdd_province_city_adid, clickCount>
       // <yyyyMMdd_province_adid, clickCount>
    
       // 计算出每天各省份各广告的点击量
       val mappedRDD = rdd.map{ case (keyString, count) =>
    
         val keySplited = keyString.split("_")
         val date = keySplited(0)
         val province = keySplited(1)
         val adid = keySplited(3).toLong
         val clickCount = count
    
         val key = date + "_" + province + "_" + adid
         (key, clickCount)
       }
    
       val dailyAdClickCountByProvinceRDD = mappedRDD.reduceByKey( _ + _ )
    
       // 将dailyAdClickCountByProvinceRDD转换为DataFrame
       // 注册为一张临时表
       // 使用Spark SQL,通过开窗函数,获取到各省份的top3热门广告
       val rowsRDD = dailyAdClickCountByProvinceRDD.map{ case (keyString, count) =>
    
         val keySplited = keyString.split("_")
         val datekey = keySplited(0)
         val province = keySplited(1)
         val adid = keySplited(2).toLong
         val clickCount = count
    
         val date = DateUtils.formatDate(DateUtils.parseDateKey(datekey))
    
         (date, province, adid, clickCount)
    
       }
    
       import spark.implicits._
       val dailyAdClickCountByProvinceDF = rowsRDD.toDF("date","province","ad_id","click_count")
    
       // 将dailyAdClickCountByProvinceDF,注册成一张临时表
       dailyAdClickCountByProvinceDF.createOrReplaceTempView("tmp_daily_ad_click_count_by_prov")
    
       // 使用Spark SQL执行 SQL 语句,配合开窗函数,统计出各身份top3热门的广告
       val provinceTop3AdDF = spark.sql(
         "SELECT "
             + "date,"
             + "province,"
             + "ad_id,"
             + "click_count "
           + "FROM ( "
             + "SELECT "
               + "date,"
               + "province,"
               + "ad_id,"
               + "click_count,"
               + "ROW_NUMBER() OVER(PARTITION BY province ORDER BY click_count DESC) rank "
             + "FROM tmp_daily_ad_click_count_by_prov "
             + ") t "
           + "WHERE rank<=3"
       )
    
       provinceTop3AdDF.rdd
     }
    
     // 每次都是刷新出来各个省份最热门的top3广告,将其中的数据批量更新到MySQL中
     rowsDStream.foreachRDD{ rdd =>
       rdd.foreachPartition{ items =>
    
         // 插入数据库
         val adProvinceTop3s = ArrayBuffer[AdProvinceTop3]()
    
         for (item <- items){
           val date = item.getString(0)
           val province = item.getString(1)
           val adid = item.getLong(2)
           val clickCount = item.getLong(3)
           adProvinceTop3s += AdProvinceTop3(date,province,adid,clickCount)
         }
         AdProvinceTop3DAO.updateBatch(adProvinceTop3s.toArray)
    
       }
     }
    复制代码

    }

2.6 实时统计每天每个广告在最近1小时的滑动窗口内的点击趋势(每分钟的点击量)

  • 版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。QQ邮箱地址:1120746959@qq.com,如有任何技术交流,可随时联系。

  • 转化为时间粒度进行过滤,抛弃province,userId ,cityid,格式为: <yyyyMMddHHMM_adid,1L>,基于reduceByKeyAndWindow进行聚合

  • 最终结果展开 (date,hour,minute,adid,clickCount)

    calculateAdClickCountByWindow(adRealTimeValueDStream)
      def calculateAdClickCountByWindow(adRealTimeValueDStream:DStream[String]) {
    
      // 映射成<yyyyMMddHHMM_adid,1L>格式
      //(timestamp province city userid adid)
      val pairDStream = adRealTimeValueDStream.map{ case consumerRecord  =>
        val logSplited = consumerRecord.split(" ")
        val timeMinute = DateUtils.formatTimeMinute(new Date(logSplited(0).toLong))
        val adid = logSplited(4).toLong
    
        (timeMinute + "_" + adid, 1L)
      }
    
      // 计算窗口函数,1小时滑动窗口内的广告点击趋势
      val aggrRDD = pairDStream.reduceByKeyAndWindow((a:Long,b:Long) => (a + b),Minutes(60L), Seconds(10L))
    
      // 最近1小时内,各分钟的点击量,并保存到数据库
      aggrRDD.foreachRDD{ rdd =>
        rdd.foreachPartition{ items =>
          //保存到数据库
          val adClickTrends = ArrayBuffer[AdClickTrend]()
          for (item <- items){
            val keySplited = item._1.split("_")
            // yyyyMMddHHmm
            val dateMinute = keySplited(0)
            val adid = keySplited(1).toLong
            val clickCount = item._2
    
            val date = DateUtils.formatDate(DateUtils.parseDateKey(dateMinute.substring(0, 8)))
            val hour = dateMinute.substring(8, 10)
            val minute = dateMinute.substring(10)
    
            adClickTrends += AdClickTrend(date,hour,minute,adid,clickCount)
          }
          AdClickTrendDAO.updateBatch(adClickTrends.toArray)
        }
      }
    }
    复制代码

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Software Paradigms

Software Paradigms

Stephen H. Kaisler / Wiley-Interscience / 2005-03-17 / USD 93.95

Software Paradigms provides the first complete compilation of software paradigms commonly used to develop large software applications, with coverage ranging from discrete problems to full-scale applic......一起来看看 《Software Paradigms》 这本书的介绍吧!

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

MD5 加密
MD5 加密

MD5 加密工具

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器