内容简介:Spark SQL支持两种方式将现有RDD转换为DataFrame。第一种方法使用反射来推断RDD的schema并创建DataSet然后将其转化为DataFrame。这种基于反射方法十分简便,但是前提是在您编写Spark应用程序时就已经知道RDD的schema类型。第二种方法是通过编程接口,使用您构建的StructType,然后将其应用于现有RDD。虽然此方法很麻烦,但它允许您在运行之前并不知道列及其类型的情况下构建DataSet
Spark SQL支持两种方式将现有RDD转换为DataFrame。
第一种方法使用反射来推断RDD的schema并创建DataSet然后将其转化为DataFrame。这种基于反射方法十分简便,但是前提是在您编写Spark应用程序时就已经知道RDD的schema类型。
第二种方法是通过编程接口,使用您构建的StructType,然后将其应用于现有RDD。虽然此方法很麻烦,但它允许您在运行之前并不知道列及其类型的情况下构建DataSet
方法如下 1.将RDD转换成Rows 2.按照第一步Rows的结构定义StructType 3.基于rows和StructType使用createDataFrame创建相应的DF
测试数据为order.data
1 小王 电视 12 2015-08-01 09:08:31 1 小王 冰箱 24 2015-08-01 09:08:14 2 小李 空调 12 2015-09-02 09:01:31
代码如下:
object RDD2DF { /** * 主要有两种方式 * 第一种是在已经知道schema已经知道的情况下,我们使用反射把RDD转换成DS,进而转换成DF * 第二种是你不能提前定义好case class,例如数据的结构是以String类型存在的。我们使用接口自定义一个schema * @param args */ def main(args: Array[String]): Unit = { val spark=SparkSession.builder() .appName("DFDemo") .master("local[2]") .getOrCreate() // rdd2DFFunc1(spark) rdd2DFFunc2(spark) spark.stop() } /** * 提前定义好case class * @param spark */ def rdd2DFFunc1(spark:SparkSession): Unit ={ import spark.implicits._ val orderRDD=spark.sparkContext.textFile("F:\\JAVA\\WorkSpace\\spark\\src\\main\\resources\\order.data") val orderDF=orderRDD.map(_.split("\t")) .map(attributes=>Order(attributes(0),attributes(1),attributes(2),attributes(3),attributes(4))) .toDF() orderDF.show() Thread.sleep(1000000) } /** *总结:第二种方式就是通过最基础的DF接口方法,将 * @param spark */ def rdd2DFFunc2(spark:SparkSession): Unit ={ //TODO: 1.将RDD转换成Rows 2.按照第一步Rows的结构定义StructType 3.基于rows和StructType使用createDataFrame创建相应的DF val orderRDD=spark.sparkContext.textFile("F:\\JAVA\\WorkSpace\\spark\\src\\main\\resources\\order.data") //TODO: 1.将RDD转换成Rows val rowsRDD=orderRDD // .filter((str:String)=>{val arr=str.split("\t");val res=arr(1)!="小李";res}) .map(_.split("\t")) .map(attributes=>Row(attributes(0).trim,attributes(1),attributes(2),attributes(3).trim,attributes(4))) //TODO: 2.按照第一步Rows的结构定义StructType val schemaString="id|name|commodity|age|date" val fields=schemaString.split("\\|") .map(filedName=>StructField(filedName,StringType,nullable = true)) val schema=StructType(fields) //TODO: 3.基于rows和StructType使用createDataFrame创建相应的DF val orderDF= spark.createDataFrame(rowsRDD,schema) orderDF.show() orderDF.groupBy("name").count().show() orderDF.select("name","commodity").show() Thread.sleep(10000000) } } case class Order(id:String,name:String,commodity:String,age:String,date:String)
生产中创建DataFrame代码举例
在实际生产环境中,我们其实选择的是方式二这种进行创建DataFrame的,因为我们生产中很难提前定义case class ,因为业务处理之后字段常常会发生意想不到的变化,所以一定要掌握这种方法。
测试数据
baidu CN A E [01/May/2018:02:15:52 +0800] 2 61.237.59.0 - 112.29.213.35:80 0 movieshow2000.edu.chinaren.com GET http://movieshow2000.edu.chinaren.com/user_upload/15316339776271455.mp4 HTTP/1.1 - bytes 13869056-13885439/25136186 TCP_HIT/206 112.29.213.35 video/mp4 16374 16384 -:0 0 0 - - - 11451601 - "JSP3/2.0.14" "-" "-" "-" http - 2 v1.go2yd.com 0.002 25136186 16384 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 1531818470104-11451601-112.29.213.66#2705261172 644514568 baidu CN A E [01/May/2018:02:25:33 +0800] 2 61.232.37.228 - 112.29.213.35:80 0 github.com GET http://github.com/user_upload/15316339776271/44y.mp4 HTTP/1.1 - bytes 13869056-13885439/25136186 TCP_HIT/206 112.29.213.35 video/mp4 83552 16384 -:0 0 0 - - - 11451601 - "JSP3/2.0.14" "-" "-" "-" http - 2 v1.go2yd.com 0.002 25136186 16384 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 1531818470104-11451601-112.29.213.66#2705261172 644514568
Schema方法类
import org.apache.spark.sql.Row import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType} object LogConverUtil { private val struct=StructType( Array( StructField("domain",StringType) ,StructField("url",StringType) ,StructField("pv",LongType) ,StructField("traffic",LongType) ,StructField("date",StringType) ) ) def getStruct():StructType={ struct } def parseLog(logLine:String): Row ={ val sourceFormat=new SimpleDateFormat("[dd/MMM/yyyy:hh:mm:ss +0800]",Locale.ENGLISH) val targetFormat=new SimpleDateFormat("yyyyMMddhh") try{ val fields=logLine.split("\t") val domain=fields(10) val url=fields(12) val pv=1L val traffic=fields(19).trim.toLong val date=getFormatedDate(fields(4),sourceFormat,targetFormat) Row(domain,url,pv,traffic,date) }catch { case e:Exception=>Row(0) } } /** * * @param sourceDate Log中的未格式化日期 [01/May/2018:01:09:45 +0800] * @return 按照需求格式化字段 2018050101 */ def getFormatedDate(sourceDate: String, sourceFormat: SimpleDateFormat, targetFormat: SimpleDateFormat) = { val targetTime=targetFormat.format(sourceFormat.parse(sourceDate)) targetTime } }
RDD2DataFrame主类
import org.apache.spark.sql.SparkSession object SparkCleanJob { def main(args: Array[String]): Unit = { val spark=SparkSession.builder() .master("local[2]") .appName("SparkCleanJob") .getOrCreate() val logRDD=spark.sparkContext.textFile("file:///D:/baidu.log") // logRDD.take(2).foreach(println(_)) //调用LogConverUtil里的parseLog方法和getStruct方法获得Rows对象和StructType对象 val logDF=spark.createDataFrame(logRDD.map(LogConverUtil.parseLog(_)),LogConverUtil.getStruct()) logDF.show(false) logDF.printSchema() } }
结果
+------------------------------+-------------------------------------------------------------------------+---+-------+----------+ |domain |url |pv |traffic|date | +------------------------------+-------------------------------------------------------------------------+---+-------+----------+ |movieshow2000.edu.chinaren.com|http://movieshow2000.edu.chinaren.com/user_upload/15316339776271455.mp4 |1 |16374 |2018050102| |github.com |http://github.com/user_upload/15316339776271/44y.mp4 |1 |83552 |2018050102| |yooku.com |http://yooku.com/user_upload/15316339776271x0.html |1 |74986 |2018050101| |rw.uestc.edu.cn |http://rw.uestc.edu.cn/user_upload/15316339776271515.mp4 |1 |55297 |2018050101| |github.com |http://github.com/user_upload/15316339776271x05.mp4 |1 |26812 |2018050102| |movieshow2000.edu.chinaren.com|http://movieshow2000.edu.chinaren.com/user_upload/15316339776271y4.html |1 |50392 |2018050103| |github.com |http://github.com/user_upload/15316339776271x15.html |1 |40092 |2018050101| |movieshow2000.edu.chinaren.com|http://movieshow2000.edu.chinaren.com/user_upload/153163397762714z.mp4 |1 |8368 |2018050102| |movieshow2000.edu.chinaren.com|http://movieshow2000.edu.chinaren.com/user_upload/15316339776271/5z.html |1 |29677 |2018050103| |rw.uestc.edu.cn |http://rw.uestc.edu.cn/user_upload/153163397762710w.mp4 |1 |26124 |2018050102| |yooku.com |http://yooku.com/user_upload/15316339776271yz.mp4 |1 |32219 |2018050101| |yooku.com |http://yooku.com/user_upload/153163397762713w.html |1 |90389 |2018050101| |movieshow2000.edu.chinaren.com|http://movieshow2000.edu.chinaren.com/user_upload/15316339776271z/.html |1 |15623 |2018050101| |yooku.com |http://yooku.com/user_upload/1531633977627142.html |1 |53453 |2018050103| |yooku.com |http://yooku.com/user_upload/15316339776271230.mp4 |1 |20309 |2018050102| |movieshow2000.edu.chinaren.com|http://movieshow2000.edu.chinaren.com/user_upload/15316339776271/4w1.html|1 |87804 |2018050103| |movieshow2000.edu.chinaren.com|http://movieshow2000.edu.chinaren.com/user_upload/15316339776271y5y.html |1 |69469 |2018050103| |yooku.com |http://yooku.com/user_upload/15316339776271011/.mp4 |1 |3782 |2018050103| |github.com |http://github.com/user_upload/15316339776271wzw.mp4 |1 |89642 |2018050102| |github.com |http://github.com/user_upload/15316339776271/1/.mp4 |1 |63551 |2018050103| +------------------------------+-------------------------------------------------------------------------+---+-------+----------+ only showing top 20 rows root |-- domain: string (nullable = true) |-- url: string (nullable = true) |-- pv: long (nullable = true) |-- traffic: long (nullable = true) |-- date: string (nullable = true) Process finished with exit code 0
注:除了这种使用RDD读取文本进而转化成DataFrame之外,我们也会使用自定义DefaultSource来直接将text转化成DataFrame
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- python – 将dict列表转换为dicts字典的优雅方式
- JavaScript进阶系列-类型转换、隐式类型转换
- Android 多国语言转换 Excel 和 Excel 转换为 string
- [SSL证书转换(一)]关于JKS 转换成 CRT 和 KEY
- c++中几种常见的类型转换。int与string的转换,float与string的转换以及string和long类型之间的相互...
- Protocol Buffer使用转换工具将proto文件转换成Java文件流程及使用
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
高性能Python
(美)戈雷利克、(英)欧日沃尔德 / 东南大学出版社 / 2015-2
你的Python代码也许运行正确,但是你需要运行得更快速。通过探讨隐藏在设计备选方案中的基础理论,戈雷利克和欧日沃尔德编著的《高性能Python》将帮助你更深入地理解Python的实现。你将了解如何定位性能瓶颈,从而显著提升高数据流量程序中的代码执行效率。 你该如何利用多核架构和集群?或者你该如何搭建一个可以自由伸缩而不会影响可靠性的系统?有经验的Python程序员将会学习到这类问题的具体解......一起来看看 《高性能Python》 这本书的介绍吧!