Spark综合使用及电商案例实战精析-Spark商业应用实战

栏目: 编程工具 · 发布时间: 6年前

内容简介:cache()和 persist()的区别在于, cache()是 persist()的一种简化方式, cache()的底 层就是调用的 persist()的无参版本,同时就是调用 persist(MEMORY_ONLY),将输 入持久化到内存中。如果需要从内存中清除缓存,那么可以使用 unpersist()方法。广播变量允许程序员在每个机器上保留缓存的只读变量,而不是给每个任务发 送一个副本。 例如,可以使用它们以有效的方式为每个节点提供一个大型输入数据 集的副本。 Spark 还尝试使用高效的广播算法
Spark综合使用及电商案例实战精析-Spark商业应用实战

2 Spark RDD 持久化

cache()和 persist()的区别在于, cache()是 persist()的一种简化方式, cache()的底 层就是调用的 persist()的无参版本,同时就是调用 persist(MEMORY_ONLY),将输 入持久化到内存中。如果需要从内存中清除缓存,那么可以使用 unpersist()方法。

3 Spark 广播变量

广播变量允许 程序员 在每个机器上保留缓存的只读变量,而不是给每个任务发 送一个副本。 例如,可以使用它们以有效的方式为每个节点提供一个大型输入数据 集的副本。 Spark 还尝试使用高效的广播算法分发广播变量,以降低通信成本。

Spark 提供的 Broadcast Variable 是只读的,并且在每个节点上只会有一个副本, 而不会为每个 task 都拷贝一份副本,因此, 它的最大作用,就是减少变量到各个节 点的网络传输消耗,以及在各个节点上的内存消耗。此外, Spark 内部也使用了高效 的广播算法来减少网络消耗。

4 Spark 累加器

累加器(accumulator): Accumulator 是仅仅被相关操作累加的变量,因此可以 在并行中被有效地支持。它们可用于实现计数器(如 MapReduce)或总和计数。 Accumulator 是存在于 Driver 端的,从节点不断把值发到 Driver 端,在 Driver 端计数(Spark UI 在 SparkContext 创建时被创建,即在 Driver 端被创建,因此它可 以读取 Accumulator 的数值), 累加器是存在于 Driver 端的一个值,从节点是读取不到的。

Spark 提供的 Accumulator 主要用于多个节点对一个变量进行共享性的操作。 Accumulator 只提供了累加的功能,但是却给我们提供了多个 task 对于同一个变量 并行操作的功能,但是 task 只能对 Accumulator 进行累加操作,不能读取它的值, 只有 Driver 程序可以读取 Accumulator 的值。

class SessionAggrStatAccumulator extends AccumulatorV2[String, mutable.HashMap[String,
    Int]] {
    
        // 保存所有聚合数据
        private val aggrStatMap = mutable.HashMap[String, Int]()
        
        // 判断是否为初始值
        override def isZero: Boolean = {
        aggrStatMap.isEmpty
        }
        
        // 复制累加器
        override def copy(): AccumulatorV2[String, mutable.HashMap[String, Int]] = {
            val newAcc = new SessionAggrStatAccumulator
            aggrStatMap.synchronized{
            newAcc.aggrStatMap ++= this.aggrStatMap
            }
            newAcc
        }
        
        // 重置累加器中的值
        override def reset(): Unit = {
             aggrStatMap.clear()
        }
        
        // 向累加器中添加另一个值
        override def add(v: String): Unit = {
            if (!aggrStatMap.contains(v))
                aggrStatMap += (v -> 0)
            aggrStatMap.update(v, aggrStatMap(v) + 1)
        }
        
        // 各个 task 的累加器进行合并的方法
        // 合并另一个类型相同的累加器
        override def merge(other: AccumulatorV2[String, mutable.HashMap[String, Int]]): Unit =
        {
            other match {
            case acc:SessionAggrStatAccumulator => {
            (this.aggrStatMap /: acc.value){ case (map, (k,v)) => map += ( k -> (v + map.getOrElse(k,
            0)) )}
        }
        }
        }
        
        // 获取累加器中的值
        // AccumulatorV2 对外访问的数据结果
        override def value: mutable.HashMap[String, Int] = {
            this.aggrStatMap
        }
    }
复制代码

5 Spark将DataFrame插入到Hive表中

  • DataFrame保存到Hive表中

    // 1:ArrayBuffer[ProductInfo]生成
           private def mockProductInfo(): Array[ProductInfo] = {
              val rows = ArrayBuffer[ProductInfo]()
              val random = new Random()
              val productStatus = Array(0, 1)
          
              for (i <- 0 to 100) {
                val productId = i
                val productName = "product" + i
                val extendInfo = "{\"product_status\": " + productStatus(random.nextInt(2)) + "}"
          
                rows += ProductInfo(productId, productName, extendInfo)
              }
              rows.toArray
            }
          
          // 2:模拟数据
          val userVisitActionData = this.mockUserVisitActionData()
          val userInfoData = this.mockUserInfo()
          val productInfoData = this.mockProductInfo()
    
          // 3:将模拟数据装换为RDD
          val userVisitActionRdd = spark.sparkContext.makeRDD(userVisitActionData)
          val userInfoRdd = spark.sparkContext.makeRDD(userInfoData)
          val productInfoRdd = spark.sparkContext.makeRDD(productInfoData)
      
          // 4:加载SparkSQL的隐式转换支持
          import spark.implicits._
      
          // 5:将用户访问数据装换为DF保存到Hive表中
          val userVisitActionDF = userVisitActionRdd.toDF()
          insertHive(spark, USER_VISIT_ACTION_TABLE, userVisitActionDF)
      
          // 6:将用户信息数据转换为DF保存到Hive表中
          val userInfoDF = userInfoRdd.toDF()
          insertHive(spark, USER_INFO_TABLE, userInfoDF)
      
          // 7:将产品信息数据转换为DF保存到Hive表中
          val productInfoDF = productInfoRdd.toDF()
          insertHive(spark, PRODUCT_INFO_TABLE, productInfoDF)
          
          // 8:将DataFrame插入到Hive表中
          private def insertHive(spark: SparkSession, tableName: String, dataDF: DataFrame): Unit = {
              spark.sql("DROP TABLE IF EXISTS " + tableName)
              dataDF.write.saveAsTable(tableName)
            }
    复制代码
  • DataSet 与 RDD 互操作

    1.通过编程获取 Schema:通过 spark 内部的 StructType 方式,将普通的 RDD 转换成 DataFrame。 
      object SparkRDDtoDF {
      
      def rddToDF(sparkSession:SparkSession):DataFrame = {
      
          //设置 schema 结构
          val schema = StructType(
              Seq(
              StructField("name",StringType,true),
              StructField("age",IntegerType,true)
              )
          )
          
          val rowRDD = sparkSession.sparkContext
          .textFile("file:/E:/scala_workspace/z_spark_study/people.txt",2)
          .map( x => x.split(",")).map( x => Row(x(0),x(1).trim().toInt))
          sparkSession.createDataFrame(rowRDD,schema)
      }
      
    
      2.通过反射获取 Schema:使用 case class 的方式,不过在 scala 2.10 中最大支持 22 个字段的 case class,这点需要注意;
      
      case class Person(name:String,age:Int)
      def rddToDFCase(sparkSession : SparkSession):DataFrame = {
      
          //导入隐饰操作,否则 RDD 无法调用 toDF 方法
          import sparkSession.implicits._
          val peopleRDD = sparkSession.sparkContext
          .textFile("file:/E:/scala_workspace/z_spark_study/people.txt",2)
          .map( x => x.split(",")).map( x => Person(x(0),x(1).trim().toInt)).toDF()
          peopleRDD
      }
      
      3 Main函数
      def main(agrs : Array[String]):Unit = {
              val conf = new SparkConf().setMaster("local[2]")
              conf.set("spark.sql.warehouse.dir","file:/E:/scala_workspace/z_spark_study/")
              conf.set("spark.sql.shuffle.partitions","20")
              
              val sparkSession = SparkSession.builder().appName("RDD to DataFrame")
              .config(conf).getOrCreate()
              
              // 通过代码的方式,设置 Spark log4j 的级别
              sparkSession.sparkContext.setLogLevel("WARN")
              
              import sparkSession.implicits._
              
              //使用 case class 的方式
              //val peopleDF = rddToDFCase(sparkSession)
              
              // 通过编程的方式完成 RDD 向
              val peopleDF = rddToDF(sparkSession)
              peopleDF.show()
              peopleDF.select($"name",$"age").filter($"age">20).show()
              }
          }
    复制代码
  • 4 DataFrame/DataSet 转 RDD

    val rdd1=testDF.rdd
    val rdd2=testDS.rdd
    复制代码
  • 5 RDD 转 DataFrame

    import spark.implicits._
      val testDF = rdd.map {line=>
      (line._1,line._2)
      }.toDF("col1","col2")
    复制代码
  • 6 DataSet 转 DataFrame

    import spark.implicits._
      val testDF = testDS.toDF
    复制代码
  • 7 DataFrame 转 DataSet

    import spark.implicits._
      //定义字段名和类型
      case class Coltest(col1:String, col2:Int) extends Serializable
      val testDS = testDF.as[Coltest]
    复制代码

6 用户自定义聚合函数(UDAF)

    1. 弱类型 UDAF 函数

      /**
         * 用户自定义聚合函数
         */
         class GroupConcatDistinctUDAF extends UserDefinedAggregateFunction {
         
             /**
             * 聚合函数输入参数的数据类型
             */
             override def inputSchema: StructType = StructType(StructField("cityInfo", StringType) ::
             Nil)
             
             /**
             * 聚合缓冲区中值的类型
             * 中间进行聚合时所处理的数据类型
             */
             override def bufferSchema: StructType = StructType(StructField("bufferCityInfo",
             StringType) :: Nil)
             
             /**
             * 函数返回值的数据类型
             */
             override def dataType: DataType = StringType
             
             /**
             * 一致性检验,如果为 true,那么输入不变的情况下计算的结果也是不变的
             */
             override def deterministic: Boolean = true
             
             /**
             * 设置聚合中间 buffer 的初始值
             * 需要保证这个语义:两个初始 buffer 调用下面实现的 merge 方法后也应该为初始 buffer 即如果你初始值是
             1,然后你 merge 是执行一个相加的动作,两个初始 buffer 合并之后等于 2,不会等于初始 buffer 了。这样的初始
             值就是有问题的,所以初始值也叫"zero value"
             */
             override def initialize(buffer: MutableAggregationBuffer): Unit = {
             buffer(0)= ""
             }
             
             /**
             * 用输入数据 input 更新 buffer 值,类似于 combineByKey
             */
             override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
                     // 缓冲中的已经拼接过的城市信息串
                     var bufferCityInfo = buffer.getString(0)
                     // 刚刚传递进来的某个城市信息
                     val cityInfo = input.getString(0)
                     // 在这里要实现去重的逻辑
                     // 判断:之前没有拼接过某个城市信息,那么这里才可以接下去拼接新的城市信息
                     if(!bufferCityInfo.contains(cityInfo)) {
                         if("".equals(bufferCityInfo))
                         bufferCityInfo += cityInfo
                         else {
                         // 比如 1:北京
                         // 1:北京,2:上海
                         bufferCityInfo += "," + cityInfo
                     }
                     buffer.update(0, bufferCityInfo)
                  }
             }
             /**
             * 合并两个 buffer,将 buffer2 合并到 buffer1.在合并两个分区聚合结果的时候会被用到,类似于
             reduceByKey
             * 这里要注意该方法没有返回值,在实现的时候是把 buffer2 合并到 buffer1 中去,你需要实现这个合并细节
             */
             override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
                 var bufferCityInfo1 = buffer1.getString(0);
                 val bufferCityInfo2 = buffer2.getString(0);
                 for(cityInfo <- bufferCityInfo2.split(",")) {
                         if(!bufferCityInfo1.contains(cityInfo)) {
                         if("".equals(bufferCityInfo1)) {
                         bufferCityInfo1 += cityInfo;
                         } else {
                         bufferCityInfo1 += "," + cityInfo;
                         }
                         }
                         }
                         buffer1.update(0, bufferCityInfo1);
                 }
                 /**
                 * 计算并返回最终的聚合结果
                 */
                 override def evaluate(buffer: Row): Any = {
                 buffer.getString(0)
             }
         }
      复制代码
    1. 强类型 UDAF 函数

      // 定义 case 类
         case class Employee(name: String, salary: Long)
         case class Average(var sum: Long, var count: Long)
         
         object MyAverage extends Aggregator[Employee, Average, Double] {
        
             /**
             * 计算并返回最终的聚合结果
             */
             def zero: Average = Average(0L, 0L)
             
             /**
             * 根据传入的参数值更新 buffer 值
             */
             def reduce(buffer: Average, employee: Employee): Average = {
                 buffer.sum += employee.salary
                 buffer.count += 1
                 buffer
             }
             
             /**
             * 合并两个 buffer 值,将 buffer2 的值合并到 buffer1
             */
             def merge(b1: Average, b2: Average): Average = {
                 b1.sum += b2.sum
                 b1.count += b2.count
                 b1
             }
             
             /**
             * 计算输出
             */
             def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
             
             /**
             * 设定中间值类型的编码器,要转换成 case 类
             * Encoders.product 是进行 scala 元组和 case 类转换的编码器
             */
             def bufferEncoder: Encoder[Average] = Encoders.product
             
             /**
             * 设定最终输出值的编码器
             */
             def outputEncoder: Encoder[Double] = Encoders.scalaDouble
         }
      复制代码

7 开窗函数

  • 开窗用于为行定义一个窗口(这里的窗口是指运算将要操作的行的集合), 它 对一组值进行操作,不需要使用 GROUP BY 子句对数据进行分组,能够在同一行中 同时返回基础行的列和聚合列。

  • 开窗函数的调用格式为: 函数名(列) OVER(选项)

    第一大类: 聚合开窗函数 -> 聚合函数(列) OVER (选项),这里的选项可以是
      PARTITION BY 子句,但不可是 ORDER BY 子句。
      
      def main(args: Array[String]): Unit = {
          val sparkConf = new SparkConf().setAppName("score").setMaster("local[*]")
          val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
          import sparkSession.implicits._
          val scoreDF = sparkSession.sparkContext.makeRDD(Array(Score("a1", 1, 80),
          Score("a2", 1, 78),
          Score("a3", 1, 95),
          Score("a4", 2, 74),
          Score("a5", 2, 92),
          Score("a6", 3, 99),
          Score("a7", 3, 99),
          Score("a8", 3, 45),
          Score("a9", 3, 55),
          Score("a10", 3, 78))).toDF("name", "class
          ", "score")
          scoreDF.createOrReplaceTempView("score")
          scoreDF.show()
          }
      
      OVER 关键字表示把聚合函数当成聚合开窗函数而不是聚合函数
      sparkSession.sql("select name, class, score, count(name) over() name_count from score")
      
      PARTITION BY 子句创建的分区是独立于结果集的,创建的分区只是供进行聚合计算的,而且不同的开窗函数所创建的分区也不互相影响。
      sparkSession.sql("select name, class, score, count(name) over(partition by class) name_count from score").show()
      
      |name|class|score|name_count|
      +----+-----+-----+----------+
      | a1| 1| 80| 3|
      | a2| 1| 78| 3|
      | a3| 1| 95| 3|
      | a6| 3| 99| 5|
      | a7| 3| 99| 5|
      | a8| 3| 45| 5|
      | a9| 3| 55| 5|
      | a10| 3| 78| 5|
      | a4| 2| 74| 2|
      | a5| 2| 92| 2|
      +----+-----+-----+----------+
      
      第二大类: 排序开窗函数 -> 排序函数(列) OVER(选项),这里的选项可以是
      ORDER BY 子句,也可以是 OVER(PARTITION BY 子句 ORDER BY 子句),
      但不可以是 PARTITION BY 子句。
      
      对于 排序 开窗函数来讲,它支持的开窗函数分别为: ROW_NUMBER(行号)、
      RANK(排名)、 DENSE_RANK(密集排名)和 NTILE(分组排名)。
      
      sparkSession.sql("select name, class, score, row_number() over(order by score) rank from
      score").show()
      
      +----+-----+-----+----+
      |name|class|score|rank|
      +----+-----+-----+----+
      | a8| 3| 45| 1|
      | a9| 3| 55| 2|
      | a4| 2| 74| 3|
      | a2| 1| 78| 4|
      | a10| 3| 78| 5|
      | a1| 1| 80| 6|
      | a5| 2| 92| 7|
      | a3| 1| 95| 8|
      | a6| 3| 99| 9|
      | a7| 3| 99| 10|
      +----+-----+-----+----+
      
      sparkSession.sql("select name, class, score, rank() over(order by score) rank from
      score").show()
      
      +----+-----+-----+----+
      |name|class|score|rank|
      +----+-----+-----+----+
      | a8| 3| 45| 1|
      | a9| 3| 55| 2|
      | a4| 2| 74| 3|
      | a2| 1| 78| 4|
      | a10| 3| 78| 4|
      | a1| 1| 80| 6|
      | a5| 2| 92| 7|
      | a3| 1| 95| 8|
      | a6| 3| 99| 9|
      | a7| 3| 99| 9|
      +----+-----+-----+----+
      
      sparkSession.sql("select name, class, score, dense_rank() over(order by score) rank from
      score").show()
      
      ----+-----+-----+----+
      |name|class|score|rank|
      +----+-----+-----+----+
      | a8| 3| 45| 1|
      | a9| 3| 55| 2|
      | a4| 2| 74| 3|
      | a2| 1| 78| 4|
      | a10| 3| 78| 4|
      | a1| 1| 80| 5|
      | a5| 2| 92| 6|
      | a3| 1| 95| 7|
      | a6| 3| 99| 8|
      | a7| 3| 99| 8|
      +----+-----+-----+----+
      
      sparkSession.sql("select name, class, score, ntile(6) over(order by score) rank from
      score").show()
      
      +----+-----+-----+----+
      |name|class|score|rank|
      +----+-----+-----+----+
      | a8| 3| 45| 1|
      | a9| 3| 55| 1|
      | a4| 2| 74| 2|
      | a2| 1| 78| 2|
      | a10| 3| 78| 3|
      | a1| 1| 80| 3|
      | a5| 2| 92| 4|
      | a3| 1| 95| 4|
      | a6| 3| 99| 5|
      | a7| 3| 99| 6|
      +----+-----+-----+----+
    复制代码

8 Dstream updataStateByKey 算子(要求必须开启 Checkpoint 机制)

object updateStateByKeyWordCount {
    def main(args: Array[String]): Unit = {
            val conf = new SparkConf().setMaster("local[2]").setAppName("Wordcount")
            val ssc = new StreamingContext(conf, Seconds(1))
            
            ssc.checkpoint("hdfs://s100:8020/wordcount_checkpoint")
            val lines = ssc.socketTextStream("localhost", 9999)
            val words = lines.flatMap(_.split(" "))
            val pairs = words.map(word => (word, 1))
            val wordCount = pairs.updateStateByKey((values:Seq[Int], state:Option[Int]) =>{
                var newValue = state.getOrElse(0)
                for(value <- values){
                    newValue += value
                }
                Option(newValue)
            })
            
            wordCount.print()
            ssc.start()
            ssc.awaitTermination()
            }
    }
复制代码

9 电商综合应用案例

9.1 原数据模型

  • 用户行为表模型(每一次Action点击都会生成多条记录,1个Session对应多个页面Id)
Spark综合使用及电商案例实战精析-Spark商业应用实战
  • 用户表
Spark综合使用及电商案例实战精析-Spark商业应用实战
  • 物品表
Spark综合使用及电商案例实战精析-Spark商业应用实战
1. 点击Session
    2018-02-11,81,af18373e1dbc47a397e87f186ffd9555,3,2018-02-11 17:04:42,null,37,17,null,null,null,null,7
    2. 搜索Session
    2018-02-11,81,af18373e1dbc47a397e87f186ffd9555,3,2018-02-11 17:29:50,重庆小面,-1,-1,null,null,null,null,1
    3. 下单Session
    2018-02-11,81,af18373e1dbc47a397e87f186ffd9555,6,2018-02-11 17:50:10,null,-1,-1,61,71,null,null,2
    4. 付款Session
    2018-02-11,81,af18373e1dbc47a397e87f186ffd9555,4,2018-02-11 17:18:24,null,-1,-1,null,null,83,17,1
复制代码
Spark综合使用及电商案例实战精析-Spark商业应用实战

9.2 数据处理模型

  • 用户访问行为模型(每一个 Session_Id对应一个用户,从而可以聚合一个用户的所有操作行为)

  • 一个 Session_Id 对应多个action_time,从而可以得出每一个Session的访问周期Visit_Length。

  • 一个 Session_Id 对应多个page_id,可以进一步统计出Step_Length 以及转化率等指标。

    Session_Id | Search_Keywords | Click_Category_Id | Visit_Length | Step_Length | Start_Time
    复制代码
  • 初步统计出每一个 Session_Id对应的Visit_Length和Step_Length

Spark综合使用及电商案例实战精析-Spark商业应用实战
  • 联合用户信息进行定制过滤后,通过累加器,统计出visit_length_ratio及step_length_ratio
    Spark综合使用及电商案例实战精析-Spark商业应用实战

9.3 累加器功能实现

  • 累加器在Driver端维护了一个Map,用于集中存储所有Sesson中(如:1s_3s或1_3_ratio等)的访问步长和访问时长占比累积数。

  • 每一个Sesson 包含了一种(如:1s_3s或1_3_ratio)特征。

    import org.apache.spark.util.AccumulatorV2
     import scala.collection.mutable
     
     /**
       * 自定义累加器
       */
     class SessionAggrStatAccumulator extends AccumulatorV2[String, mutable.HashMap[String, Int]] {
     
      // 保存所有聚合数据
      private val aggrStatMap = mutable.HashMap[String, Int]()
     
      override def isZero: Boolean = {
         aggrStatMap.isEmpty
       }
     
      override def copy(): AccumulatorV2[String, mutable.HashMap[String, Int]] = {
         val newAcc = new SessionAggrStatAccumulator
         aggrStatMap.synchronized{
           newAcc.aggrStatMap ++= this.aggrStatMap
         }
         newAcc
       }
     
       override def reset(): Unit = {
         aggrStatMap.clear()
       }
     
     
      mutable.HashMap[String, Int]()的更新操作
      override def add(v: String): Unit = {
         if (!aggrStatMap.contains(v))
           aggrStatMap += (v -> 0)
         aggrStatMap.update(v, aggrStatMap(v) + 1)
       }
     
       override def merge(other: AccumulatorV2[String, mutable.HashMap[String, Int]]): Unit = {
         other match {
           case acc:SessionAggrStatAccumulator => {
             (this.aggrStatMap /: acc.value){ case (map, (k,v)) => map += ( k -> (v + map.getOrElse(k, 0)) )}
           }
         }
       }
     
       override def value: mutable.HashMap[String, Int] = {
         this.aggrStatMap
       }
     }
    复制代码

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Java Message Service API Tutorial and Reference

Java Message Service API Tutorial and Reference

Hapner, Mark; Burridge, Rich; Sharma, Rahul / 2002-2 / $ 56.49

Java Message Service (JMS) represents a powerful solution for communicating between Java enterprise applications, software components, and legacy systems. In this authoritative tutorial and comprehens......一起来看看 《Java Message Service API Tutorial and Reference》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码