Spark RDD转换成DataFrame的两种方式

栏目: 服务器 · 发布时间: 6年前

内容简介:Spark SQL支持两种方式将现有RDD转换为DataFrame。第一种方法使用反射来推断RDD的schema并创建DataSet然后将其转化为DataFrame。这种基于反射方法十分简便,但是前提是在您编写Spark应用程序时就已经知道RDD的schema类型。第二种方法是通过编程接口,使用您构建的StructType,然后将其应用于现有RDD。虽然此方法很麻烦,但它允许您在运行之前并不知道列及其类型的情况下构建DataSet

Spark SQL支持两种方式将现有RDD转换为DataFrame。

第一种方法使用反射来推断RDD的schema并创建DataSet然后将其转化为DataFrame。这种基于反射方法十分简便,但是前提是在您编写Spark应用程序时就已经知道RDD的schema类型。

第二种方法是通过编程接口,使用您构建的StructType,然后将其应用于现有RDD。虽然此方法很麻烦,但它允许您在运行之前并不知道列及其类型的情况下构建DataSet

方法如下
         1.将RDD转换成Rows   
         2.按照第一步Rows的结构定义StructType  
         3.基于rows和StructType使用createDataFrame创建相应的DF

测试数据为order.data

1   小王  电视  12  2015-08-01 09:08:31
1   小王  冰箱  24  2015-08-01 09:08:14
2   小李  空调  12  2015-09-02 09:01:31

代码如下:

object RDD2DF {

  /**
    * 主要有两种方式
    *   第一种是在已经知道schema已经知道的情况下,我们使用反射把RDD转换成DS,进而转换成DF
    *   第二种是你不能提前定义好case class,例如数据的结构是以String类型存在的。我们使用接口自定义一个schema
    * @param args
    */
  def main(args: Array[String]): Unit = {

    val spark=SparkSession.builder()
      .appName("DFDemo")
      .master("local[2]")
      .getOrCreate()

//    rdd2DFFunc1(spark)

    rdd2DFFunc2(spark)
    spark.stop()
  }

  /**
    * 提前定义好case class
    * @param spark
    */
  def rdd2DFFunc1(spark:SparkSession): Unit ={
    import spark.implicits._
    val orderRDD=spark.sparkContext.textFile("F:\\JAVA\\WorkSpace\\spark\\src\\main\\resources\\order.data")
    val orderDF=orderRDD.map(_.split("\t"))
      .map(attributes=>Order(attributes(0),attributes(1),attributes(2),attributes(3),attributes(4)))
      .toDF()
    orderDF.show()
    Thread.sleep(1000000)
  }

  /**
    *总结:第二种方式就是通过最基础的DF接口方法,将
    * @param spark
    */
  def rdd2DFFunc2(spark:SparkSession): Unit ={
    //TODO:   1.将RDD转换成Rows   2.按照第一步Rows的结构定义StructType  3.基于rows和StructType使用createDataFrame创建相应的DF
    val orderRDD=spark.sparkContext.textFile("F:\\JAVA\\WorkSpace\\spark\\src\\main\\resources\\order.data")

    //TODO:   1.将RDD转换成Rows
    val rowsRDD=orderRDD
//      .filter((str:String)=>{val arr=str.split("\t");val res=arr(1)!="小李";res})
      .map(_.split("\t"))
      .map(attributes=>Row(attributes(0).trim,attributes(1),attributes(2),attributes(3).trim,attributes(4)))

    //TODO:   2.按照第一步Rows的结构定义StructType
    val schemaString="id|name|commodity|age|date"
    val fields=schemaString.split("\\|")
      .map(filedName=>StructField(filedName,StringType,nullable = true))
    val schema=StructType(fields)

    //TODO:   3.基于rows和StructType使用createDataFrame创建相应的DF
   val orderDF= spark.createDataFrame(rowsRDD,schema)
    orderDF.show()
    orderDF.groupBy("name").count().show()
    orderDF.select("name","commodity").show()
    Thread.sleep(10000000)
  }
}
case class Order(id:String,name:String,commodity:String,age:String,date:String)

生产中创建DataFrame代码举例

在实际生产环境中,我们其实选择的是方式二这种进行创建DataFrame的,因为我们生产中很难提前定义case class ,因为业务处理之后字段常常会发生意想不到的变化,所以一定要掌握这种方法。

测试数据

baidu   CN  A   E   [01/May/2018:02:15:52 +0800]    2   61.237.59.0 -   112.29.213.35:80    0   movieshow2000.edu.chinaren.com  GET http://movieshow2000.edu.chinaren.com/user_upload/15316339776271455.mp4 HTTP/1.1    -   bytes 13869056-13885439/25136186    TCP_HIT/206 112.29.213.35   video/mp4   16374   16384   -:0 0   0   -   -   -   11451601    -   "JSP3/2.0.14"   "-" "-" "-" http    -   2   v1.go2yd.com    0.002   25136186    16384   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   1531818470104-11451601-112.29.213.66#2705261172 644514568
baidu   CN  A   E   [01/May/2018:02:25:33 +0800]    2   61.232.37.228   -   112.29.213.35:80    0   github.com  GET http://github.com/user_upload/15316339776271/44y.mp4    HTTP/1.1    -   bytes 13869056-13885439/25136186    TCP_HIT/206 112.29.213.35   video/mp4   83552   16384   -:0 0   0   -   -   -   11451601    -   "JSP3/2.0.14"   "-" "-" "-" http    -   2   v1.go2yd.com    0.002   25136186    16384   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   1531818470104-11451601-112.29.213.66#2705261172 644514568

Schema方法类

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}

object LogConverUtil {

  private val struct=StructType(
    Array(
      StructField("domain",StringType)
      ,StructField("url",StringType)
      ,StructField("pv",LongType)
      ,StructField("traffic",LongType)
      ,StructField("date",StringType)
    )
  )
  def getStruct():StructType={
    struct
  }

  def parseLog(logLine:String): Row ={
    val sourceFormat=new SimpleDateFormat("[dd/MMM/yyyy:hh:mm:ss +0800]",Locale.ENGLISH)
    val targetFormat=new SimpleDateFormat("yyyyMMddhh")

    try{

      val fields=logLine.split("\t")
      val domain=fields(10)
      val url=fields(12)
      val pv=1L
      val traffic=fields(19).trim.toLong
      val date=getFormatedDate(fields(4),sourceFormat,targetFormat)

      Row(domain,url,pv,traffic,date)
    }catch {
      case e:Exception=>Row(0)
    }
  }
  /**
    *
    * @param sourceDate  Log中的未格式化日期   [01/May/2018:01:09:45 +0800]
    * @return  按照需求格式化字段      2018050101
    */
  def getFormatedDate(sourceDate: String, sourceFormat: SimpleDateFormat, targetFormat: SimpleDateFormat) = {
    val targetTime=targetFormat.format(sourceFormat.parse(sourceDate))
    targetTime
  }

}

RDD2DataFrame主类

import org.apache.spark.sql.SparkSession

object SparkCleanJob {
  def main(args: Array[String]): Unit = {
    val spark=SparkSession.builder()
      .master("local[2]")
      .appName("SparkCleanJob")
      .getOrCreate()

    val logRDD=spark.sparkContext.textFile("file:///D:/baidu.log")
//    logRDD.take(2).foreach(println(_))

    //调用LogConverUtil里的parseLog方法和getStruct方法获得Rows对象和StructType对象
    val logDF=spark.createDataFrame(logRDD.map(LogConverUtil.parseLog(_)),LogConverUtil.getStruct())
    logDF.show(false)
    logDF.printSchema()
  }
}

结果

+------------------------------+-------------------------------------------------------------------------+---+-------+----------+
|domain                        |url                                                                      |pv |traffic|date      |
+------------------------------+-------------------------------------------------------------------------+---+-------+----------+
|movieshow2000.edu.chinaren.com|http://movieshow2000.edu.chinaren.com/user_upload/15316339776271455.mp4  |1  |16374  |2018050102|
|github.com                    |http://github.com/user_upload/15316339776271/44y.mp4                     |1  |83552  |2018050102|
|yooku.com                     |http://yooku.com/user_upload/15316339776271x0.html                       |1  |74986  |2018050101|
|rw.uestc.edu.cn               |http://rw.uestc.edu.cn/user_upload/15316339776271515.mp4                 |1  |55297  |2018050101|
|github.com                    |http://github.com/user_upload/15316339776271x05.mp4                      |1  |26812  |2018050102|
|movieshow2000.edu.chinaren.com|http://movieshow2000.edu.chinaren.com/user_upload/15316339776271y4.html  |1  |50392  |2018050103|
|github.com                    |http://github.com/user_upload/15316339776271x15.html                     |1  |40092  |2018050101|
|movieshow2000.edu.chinaren.com|http://movieshow2000.edu.chinaren.com/user_upload/153163397762714z.mp4   |1  |8368   |2018050102|
|movieshow2000.edu.chinaren.com|http://movieshow2000.edu.chinaren.com/user_upload/15316339776271/5z.html |1  |29677  |2018050103|
|rw.uestc.edu.cn               |http://rw.uestc.edu.cn/user_upload/153163397762710w.mp4                  |1  |26124  |2018050102|
|yooku.com                     |http://yooku.com/user_upload/15316339776271yz.mp4                        |1  |32219  |2018050101|
|yooku.com                     |http://yooku.com/user_upload/153163397762713w.html                       |1  |90389  |2018050101|
|movieshow2000.edu.chinaren.com|http://movieshow2000.edu.chinaren.com/user_upload/15316339776271z/.html  |1  |15623  |2018050101|
|yooku.com                     |http://yooku.com/user_upload/1531633977627142.html                       |1  |53453  |2018050103|
|yooku.com                     |http://yooku.com/user_upload/15316339776271230.mp4                       |1  |20309  |2018050102|
|movieshow2000.edu.chinaren.com|http://movieshow2000.edu.chinaren.com/user_upload/15316339776271/4w1.html|1  |87804  |2018050103|
|movieshow2000.edu.chinaren.com|http://movieshow2000.edu.chinaren.com/user_upload/15316339776271y5y.html |1  |69469  |2018050103|
|yooku.com                     |http://yooku.com/user_upload/15316339776271011/.mp4                      |1  |3782   |2018050103|
|github.com                    |http://github.com/user_upload/15316339776271wzw.mp4                      |1  |89642  |2018050102|
|github.com                    |http://github.com/user_upload/15316339776271/1/.mp4                      |1  |63551  |2018050103|
+------------------------------+-------------------------------------------------------------------------+---+-------+----------+
only showing top 20 rows

root
 |-- domain: string (nullable = true)
 |-- url: string (nullable = true)
 |-- pv: long (nullable = true)
 |-- traffic: long (nullable = true)
 |-- date: string (nullable = true)

Process finished with exit code 0

注:除了这种使用RDD读取文本进而转化成DataFrame之外,我们也会使用自定义DefaultSource来直接将text转化成DataFrame


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Beginning XSLT 2.0

Beginning XSLT 2.0

Jeni Tennison / Apress / 2005-07-22 / USD 49.99

This is an updated revision of Tennison's "Beginning XSLT", updated for the new revision of the XSLT standard. XSLT is a technology used to transform an XML document with one structure into another ......一起来看看 《Beginning XSLT 2.0》 这本书的介绍吧!

MD5 加密
MD5 加密

MD5 加密工具

SHA 加密
SHA 加密

SHA 加密工具

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具