Spark SQL与DataSet

栏目: 服务器 · 发布时间: 6年前

内容简介:DataSet是分布式数据集合。Dataset是Spark 1.6中添加的一个新接口,它提供了RDD的优势(强类型,使用强大的lambda函数的能力)以及Spark SQL优化执行引擎的优点。数据集可以被构造从JVM对象,然后使用功能性的转换(操作map,flatMap,filter等等)。DataFrame是组织为命名列的数据集。它在概念上等同于关系数据库中的表或R / Python中的数据框,但在引擎盖下具有更丰富的优化。DataFrame可以从多种来源构建,例如:结构化数据文件,Hive中的表,外部

Spark SQL的架构图

Spark SQL与DataSet

Spark SQL是用于结构化数据处理的Spark模块。与基本的Spark RDD API不同,Spark SQL提供的接口为Spark提供了有关数据结构和正在执行的计算的更多信息。在内部,Spark SQL使用此额外信息来执行额外的优化

Spark SQL执行计划生成和优化都由Catalyst完成

DataSet是分布式数据集合。Dataset是Spark 1.6中添加的一个新接口,它提供了RDD的优势(强类型,使用强大的lambda函数的能力)以及Spark SQL优化执行引擎的优点。数据集可以被构造从JVM对象,然后使用功能性的转换(操作map,flatMap,filter等等)。

DataFrame是组织为命名列的数据集。它在概念上等同于关系数据库中的表或R / Python中的数据框,但在引擎盖下具有更丰富的优化。DataFrame可以从多种来源构建,例如:结构化数据文件,Hive中的表,外部数据库或现有RDD。DataFrame API在Scala,Java,Python和R中可用。在Scala和 Java 中,DataFrame由Rows 的数据集表示。在Scala API中,DataFrame它只是一个类型别名Dataset[Row]。而在Java API中,用户需要使用Dataset来表示DataFrame。

RDD 优点

  • JVM对象组成的分布式数据集合
  • 不可变并且有容错能力
  • 可处理机构化和非结构化的数据
  • 支持函数式转换

RDD缺点

  • 没有Schema
  • 用户自己优化程序
  • 从不同的数据源读取数据非常困难
  • 合并多个数据源中的数据也非常困难

DataFrame API

  • Row对象组成的分布式数据集
  • 不可变并且有容错能力
  • 处理结构化数据
  • 自带优化器Catalyset,可自动优化程序
  • Data source API
    DataFrame让Spark对结构化数据有了处理能力

DataFrame的缺点:

1.编译时不能类型转化安全检查,运行时才能确定是否有问题

2.对于对象支持不友好,rdd内部数据直接以java对象存储,dataframe内存存储的是row对象而不能是自定义对象

DateSet的优点:

1.DateSet整合了RDD和DataFrame的优点,支持结构化和非结构化数据

2.和RDD一样,支持自定义对象存储

3.和DataFrame一样,支持结构化数据的 sql 查询

4.采用堆外内存存储,gc友好

5.类型转化安全,代码友好

foreach 在Executor端遍历

cache

persist //持久化

printSchema

toDF

unpersist //清除持久化的

创建SparkSession 命名为spark 下面使用

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

利用反射机制推断RDD模式

在利用反射机制推断RDD模式时,需要首先定义一个case class,因为,只有case class才能被Spark隐式地转换为DataFrame

代码片段

import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoder 
import spark.implicits._  //导入包,支持把一个RDD隐式转换为一个DataFrame
//定义一个case class
case class Person(name: String, age: Long)  
//转换成DataFrame
val peopleDF = spark.sparkContext
                .textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")
                .map(_.split(","))
                .map(attributes =>Person(attributes(0),attributes(1).trim.toInt))
                .toDF()
//必须注册为临时表才能供下面的查询使用
peopleDF.createOrReplaceTempView("people") 
val personsRDD = spark.sql("select name,age from people where age > 20")
//最终生成一个DataFrame,下面是系统执行返回的信息
//personsRDD: org.apache.spark.sql.DataFrame = [name: string, age: bigint]
personsRDD.map(t => "Name: "+t(0)+ ","+"Age: "+t(1)).show()  //DataFrame中的每个元素都是一行记录,包含name和age两个字段,分别用t(0)和t(1)来获取值
//下面是系统执行返回的信息
+------------------+ 
| value|
+------------------+
|Name:Michael,Age:29|
| Name:Andy,Age:30|
+------------------+

当无法提前定义case class时,就需要采用编程方式定义RDD模式。

代码片段

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
//生成字段
val fields = Array(StructField("name",StringType,true), StructField("age",IntegerType,true))
val schema = StructType(fields)
//从上面信息可以看出,schema描述了模式信息,模式中包含name和age两个字段
//shcema就是“表头”
val peopleRDD = spark.sparkContext.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")
//对peopleRDD 这个RDD中的每一行元素都进行解析
val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0),attributes(1).trim.toInt))
//上面得到的rowRDD就是“表中的记录”
//下面把“表头”和“表中的记录”拼装起来
 val peopleDF = spark.createDataFrame(rowRDD, schema)
 //必须注册为临时表才能供下面查询使用
peopleDF.createOrReplaceTempView("people")
val results = spark.sql("SELECT name,age FROM people")
results.map(attributes => "name: " + attributes(0)+","+"age:"+attributes(1)).show()
+--------------------+
| value|
+--------------------+
|name: Michael,age:29|
| name: Andy,age:30|
| name: Justin,age:19|

下面的spark代表SparkSession读写json数据

val peopleDF = spark.read.format("json").load("/people.json")
或者 
val peopleDF = spark.read.json("/people.json")

peopleDF.select("name", "age").write.format("json").save("namesAndAges.json")

读写parquet数据

val peopleDF = spark.read.format("parquet").load("/people.parquet")
或者
val peopleDF = spark.read.parquet("/people.parquet")

peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")

读取text数据

spark.read.text(…) // 返 回 Dataset<Row>
或者
 spark.read.textFile(…) // 返 回 Dataset<String>

读取写jdbc数据 如:mysql

val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:mysql://localhost:3306/spark")
  .option("driver","com.mysql.jdbc.Driver").
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .load()

jdbcDF.write
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .save()

Spark SQL调优

DataFrame缓存

dataFrame.cache()

Reduce task数目: spark.sql.shuffle.partitions (默认是200)可以根据自己的情况设置partitions个数

读数据时每个Partition大小:spark.sql.files.maxPartitionBytes(默认128MB)

小文件合并读: spark.sql.files.openCostInBytes (默认是4194304 (4 MB) )

广播小表大小: spark.sql.autoBroadcastJoinThreshold(默认是10485760 (10 MB))


以上所述就是小编给大家介绍的《Spark SQL与DataSet》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

TensorFlow实战

TensorFlow实战

黄文坚、唐源 / 电子工业出版社 / 2017-2-1 / 79

Google近日发布了TensorFlow 1.0候选版,这个稳定版将是深度学习框架发展中的里程碑的一步。自TensorFlow于2015年底正式开源,距今已有一年多,这期间TensorFlow不断给人以惊喜,推出了分布式版本,服务框架TensorFlow Serving,可视化工具TensorFlow,上层封装TF.Learn,其他语言(Go、Java、Rust、Haskell)的绑定、Wind......一起来看看 《TensorFlow实战》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

MD5 加密
MD5 加密

MD5 加密工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具