Spark SQL与DataSet

栏目: 服务器 · 发布时间: 7年前

内容简介:DataSet是分布式数据集合。Dataset是Spark 1.6中添加的一个新接口,它提供了RDD的优势(强类型,使用强大的lambda函数的能力)以及Spark SQL优化执行引擎的优点。数据集可以被构造从JVM对象,然后使用功能性的转换(操作map,flatMap,filter等等)。DataFrame是组织为命名列的数据集。它在概念上等同于关系数据库中的表或R / Python中的数据框,但在引擎盖下具有更丰富的优化。DataFrame可以从多种来源构建,例如:结构化数据文件,Hive中的表,外部

Spark SQL的架构图

Spark SQL与DataSet

Spark SQL是用于结构化数据处理的Spark模块。与基本的Spark RDD API不同,Spark SQL提供的接口为Spark提供了有关数据结构和正在执行的计算的更多信息。在内部,Spark SQL使用此额外信息来执行额外的优化

Spark SQL执行计划生成和优化都由Catalyst完成

DataSet是分布式数据集合。Dataset是Spark 1.6中添加的一个新接口,它提供了RDD的优势(强类型,使用强大的lambda函数的能力)以及Spark SQL优化执行引擎的优点。数据集可以被构造从JVM对象,然后使用功能性的转换(操作map,flatMap,filter等等)。

DataFrame是组织为命名列的数据集。它在概念上等同于关系数据库中的表或R / Python中的数据框,但在引擎盖下具有更丰富的优化。DataFrame可以从多种来源构建,例如:结构化数据文件,Hive中的表,外部数据库或现有RDD。DataFrame API在Scala,Java,Python和R中可用。在Scala和 Java 中,DataFrame由Rows 的数据集表示。在Scala API中,DataFrame它只是一个类型别名Dataset[Row]。而在Java API中,用户需要使用Dataset来表示DataFrame。

RDD 优点

  • JVM对象组成的分布式数据集合
  • 不可变并且有容错能力
  • 可处理机构化和非结构化的数据
  • 支持函数式转换

RDD缺点

  • 没有Schema
  • 用户自己优化程序
  • 从不同的数据源读取数据非常困难
  • 合并多个数据源中的数据也非常困难

DataFrame API

  • Row对象组成的分布式数据集
  • 不可变并且有容错能力
  • 处理结构化数据
  • 自带优化器Catalyset,可自动优化程序
  • Data source API
    DataFrame让Spark对结构化数据有了处理能力

DataFrame的缺点:

1.编译时不能类型转化安全检查,运行时才能确定是否有问题

2.对于对象支持不友好,rdd内部数据直接以java对象存储,dataframe内存存储的是row对象而不能是自定义对象

DateSet的优点:

1.DateSet整合了RDD和DataFrame的优点,支持结构化和非结构化数据

2.和RDD一样,支持自定义对象存储

3.和DataFrame一样,支持结构化数据的 sql 查询

4.采用堆外内存存储,gc友好

5.类型转化安全,代码友好

foreach 在Executor端遍历

cache

persist //持久化

printSchema

toDF

unpersist //清除持久化的

创建SparkSession 命名为spark 下面使用

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

利用反射机制推断RDD模式

在利用反射机制推断RDD模式时,需要首先定义一个case class,因为,只有case class才能被Spark隐式地转换为DataFrame

代码片段

import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoder 
import spark.implicits._  //导入包,支持把一个RDD隐式转换为一个DataFrame
//定义一个case class
case class Person(name: String, age: Long)  
//转换成DataFrame
val peopleDF = spark.sparkContext
                .textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")
                .map(_.split(","))
                .map(attributes =>Person(attributes(0),attributes(1).trim.toInt))
                .toDF()
//必须注册为临时表才能供下面的查询使用
peopleDF.createOrReplaceTempView("people") 
val personsRDD = spark.sql("select name,age from people where age > 20")
//最终生成一个DataFrame,下面是系统执行返回的信息
//personsRDD: org.apache.spark.sql.DataFrame = [name: string, age: bigint]
personsRDD.map(t => "Name: "+t(0)+ ","+"Age: "+t(1)).show()  //DataFrame中的每个元素都是一行记录,包含name和age两个字段,分别用t(0)和t(1)来获取值
//下面是系统执行返回的信息
+------------------+ 
| value|
+------------------+
|Name:Michael,Age:29|
| Name:Andy,Age:30|
+------------------+

当无法提前定义case class时,就需要采用编程方式定义RDD模式。

代码片段

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
//生成字段
val fields = Array(StructField("name",StringType,true), StructField("age",IntegerType,true))
val schema = StructType(fields)
//从上面信息可以看出,schema描述了模式信息,模式中包含name和age两个字段
//shcema就是“表头”
val peopleRDD = spark.sparkContext.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")
//对peopleRDD 这个RDD中的每一行元素都进行解析
val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0),attributes(1).trim.toInt))
//上面得到的rowRDD就是“表中的记录”
//下面把“表头”和“表中的记录”拼装起来
 val peopleDF = spark.createDataFrame(rowRDD, schema)
 //必须注册为临时表才能供下面查询使用
peopleDF.createOrReplaceTempView("people")
val results = spark.sql("SELECT name,age FROM people")
results.map(attributes => "name: " + attributes(0)+","+"age:"+attributes(1)).show()
+--------------------+
| value|
+--------------------+
|name: Michael,age:29|
| name: Andy,age:30|
| name: Justin,age:19|

下面的spark代表SparkSession读写json数据

val peopleDF = spark.read.format("json").load("/people.json")
或者 
val peopleDF = spark.read.json("/people.json")

peopleDF.select("name", "age").write.format("json").save("namesAndAges.json")

读写parquet数据

val peopleDF = spark.read.format("parquet").load("/people.parquet")
或者
val peopleDF = spark.read.parquet("/people.parquet")

peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")

读取text数据

spark.read.text(…) // 返 回 Dataset<Row>
或者
 spark.read.textFile(…) // 返 回 Dataset<String>

读取写jdbc数据 如:mysql

val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:mysql://localhost:3306/spark")
  .option("driver","com.mysql.jdbc.Driver").
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .load()

jdbcDF.write
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .save()

Spark SQL调优

DataFrame缓存

dataFrame.cache()

Reduce task数目: spark.sql.shuffle.partitions (默认是200)可以根据自己的情况设置partitions个数

读数据时每个Partition大小:spark.sql.files.maxPartitionBytes(默认128MB)

小文件合并读: spark.sql.files.openCostInBytes (默认是4194304 (4 MB) )

广播小表大小: spark.sql.autoBroadcastJoinThreshold(默认是10485760 (10 MB))


以上所述就是小编给大家介绍的《Spark SQL与DataSet》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Haskell

Haskell

Simon Thompson / Addison-Wesley / 1999-3-16 / GBP 40.99

The second edition of Haskell: The Craft of Functional Programming is essential reading for beginners to functional programming and newcomers to the Haskell programming language. The emphasis is on th......一起来看看 《Haskell》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换