内容简介:DataSet是分布式数据集合。Dataset是Spark 1.6中添加的一个新接口,它提供了RDD的优势(强类型,使用强大的lambda函数的能力)以及Spark SQL优化执行引擎的优点。数据集可以被构造从JVM对象,然后使用功能性的转换(操作map,flatMap,filter等等)。DataFrame是组织为命名列的数据集。它在概念上等同于关系数据库中的表或R / Python中的数据框,但在引擎盖下具有更丰富的优化。DataFrame可以从多种来源构建,例如:结构化数据文件,Hive中的表,外部
Spark SQL的架构图
Spark SQL是用于结构化数据处理的Spark模块。与基本的Spark RDD API不同,Spark SQL提供的接口为Spark提供了有关数据结构和正在执行的计算的更多信息。在内部,Spark SQL使用此额外信息来执行额外的优化
Spark SQL执行计划生成和优化都由Catalyst完成
DataSet是分布式数据集合。Dataset是Spark 1.6中添加的一个新接口,它提供了RDD的优势(强类型,使用强大的lambda函数的能力)以及Spark SQL优化执行引擎的优点。数据集可以被构造从JVM对象,然后使用功能性的转换(操作map,flatMap,filter等等)。
DataFrame是组织为命名列的数据集。它在概念上等同于关系数据库中的表或R / Python中的数据框,但在引擎盖下具有更丰富的优化。DataFrame可以从多种来源构建,例如:结构化数据文件,Hive中的表,外部数据库或现有RDD。DataFrame API在Scala,Java,Python和R中可用。在Scala和 Java 中,DataFrame由Rows 的数据集表示。在Scala API中,DataFrame它只是一个类型别名Dataset[Row]。而在Java API中,用户需要使用Dataset来表示DataFrame。
RDD 优点
- JVM对象组成的分布式数据集合
- 不可变并且有容错能力
- 可处理机构化和非结构化的数据
- 支持函数式转换
RDD缺点
- 没有Schema
- 用户自己优化程序
- 从不同的数据源读取数据非常困难
- 合并多个数据源中的数据也非常困难
DataFrame API
- Row对象组成的分布式数据集
- 不可变并且有容错能力
- 处理结构化数据
- 自带优化器Catalyset,可自动优化程序
- Data source API
DataFrame让Spark对结构化数据有了处理能力
DataFrame的缺点:
1.编译时不能类型转化安全检查,运行时才能确定是否有问题
2.对于对象支持不友好,rdd内部数据直接以java对象存储,dataframe内存存储的是row对象而不能是自定义对象
DateSet的优点:
1.DateSet整合了RDD和DataFrame的优点,支持结构化和非结构化数据
2.和RDD一样,支持自定义对象存储
3.和DataFrame一样,支持结构化数据的 sql 查询
4.采用堆外内存存储,gc友好
5.类型转化安全,代码友好
foreach 在Executor端遍历
cache
persist //持久化
printSchema
toDF
unpersist //清除持久化的
创建SparkSession 命名为spark 下面使用
import org.apache.spark.sql.SparkSession val spark = SparkSession .builder() .appName("Spark SQL basic example") .config("spark.some.config.option", "some-value") .getOrCreate() // For implicit conversions like converting RDDs to DataFrames import spark.implicits._
利用反射机制推断RDD模式
在利用反射机制推断RDD模式时,需要首先定义一个case class,因为,只有case class才能被Spark隐式地转换为DataFrame
代码片段
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.Encoder import spark.implicits._ //导入包,支持把一个RDD隐式转换为一个DataFrame //定义一个case class case class Person(name: String, age: Long) //转换成DataFrame val peopleDF = spark.sparkContext .textFile("file:///usr/local/spark/examples/src/main/resources/people.txt") .map(_.split(",")) .map(attributes =>Person(attributes(0),attributes(1).trim.toInt)) .toDF() //必须注册为临时表才能供下面的查询使用 peopleDF.createOrReplaceTempView("people") val personsRDD = spark.sql("select name,age from people where age > 20") //最终生成一个DataFrame,下面是系统执行返回的信息 //personsRDD: org.apache.spark.sql.DataFrame = [name: string, age: bigint] personsRDD.map(t => "Name: "+t(0)+ ","+"Age: "+t(1)).show() //DataFrame中的每个元素都是一行记录,包含name和age两个字段,分别用t(0)和t(1)来获取值 //下面是系统执行返回的信息 +------------------+ | value| +------------------+ |Name:Michael,Age:29| | Name:Andy,Age:30| +------------------+
当无法提前定义case class时,就需要采用编程方式定义RDD模式。
代码片段
import org.apache.spark.sql.types._ import org.apache.spark.sql.Row //生成字段 val fields = Array(StructField("name",StringType,true), StructField("age",IntegerType,true)) val schema = StructType(fields) //从上面信息可以看出,schema描述了模式信息,模式中包含name和age两个字段 //shcema就是“表头” val peopleRDD = spark.sparkContext.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt") //对peopleRDD 这个RDD中的每一行元素都进行解析 val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0),attributes(1).trim.toInt)) //上面得到的rowRDD就是“表中的记录” //下面把“表头”和“表中的记录”拼装起来 val peopleDF = spark.createDataFrame(rowRDD, schema) //必须注册为临时表才能供下面查询使用 peopleDF.createOrReplaceTempView("people") val results = spark.sql("SELECT name,age FROM people") results.map(attributes => "name: " + attributes(0)+","+"age:"+attributes(1)).show() +--------------------+ | value| +--------------------+ |name: Michael,age:29| | name: Andy,age:30| | name: Justin,age:19|
下面的spark代表SparkSession读写json数据
val peopleDF = spark.read.format("json").load("/people.json") 或者 val peopleDF = spark.read.json("/people.json") peopleDF.select("name", "age").write.format("json").save("namesAndAges.json")
读写parquet数据
val peopleDF = spark.read.format("parquet").load("/people.parquet") 或者 val peopleDF = spark.read.parquet("/people.parquet") peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
读取text数据
spark.read.text(…) // 返 回 Dataset<Row> 或者 spark.read.textFile(…) // 返 回 Dataset<String>
读取写jdbc数据 如:mysql
val jdbcDF = spark.read .format("jdbc") .option("url", "jdbc:mysql://localhost:3306/spark") .option("driver","com.mysql.jdbc.Driver"). .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .load() jdbcDF.write .format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .save()
Spark SQL调优
DataFrame缓存
dataFrame.cache()
Reduce task数目: spark.sql.shuffle.partitions (默认是200)可以根据自己的情况设置partitions个数
读数据时每个Partition大小:spark.sql.files.maxPartitionBytes(默认128MB)
小文件合并读: spark.sql.files.openCostInBytes (默认是4194304 (4 MB) )
广播小表大小: spark.sql.autoBroadcastJoinThreshold(默认是10485760 (10 MB))
以上所述就是小编给大家介绍的《Spark SQL与DataSet》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。