Spark SQL | Spark,从入门到精通

栏目: 服务器 · 发布时间: 5年前

内容简介:熟悉 Spark SQL 的都知道,Spark SQL 是从 Shark 发展而来。Shark 为了实现 Hive 兼容,在 HQL 方面重用了 Hive 中 HQL 的解析、逻辑执行计划翻译、执行计划优化等逻辑,可以近似认为仅将物理执行计划从 MR 作业替换成了 Spark 作业(辅以内存列式存储等各种和 Hive 关系不大的优化);同时还依赖 Hive Metastore 和 Hive SerDe(用于兼容现有的各种 Hive 存储格式)。Spark SQL 在 Hive 兼容层面仅依赖 HQL par

欢迎阅读美图数据技术团队的「Spark,从入门到精通」系列文章,本系列文章将由浅入深为大家介绍 Spark,从框架入门到底层架构的实现,相信总有一种姿势适合你。

/ 发家史 / 

熟悉 Spark SQL 的都知道,Spark SQL 是从 Shark 发展而来。Shark 为了实现 Hive 兼容,在 HQL 方面重用了 Hive 中 HQL 的解析、逻辑执行计划翻译、执行计划优化等逻辑,可以近似认为仅将物理执行计划从 MR 作业替换成了 Spark 作业(辅以内存列式存储等各种和 Hive 关系不大的优化);同时还依赖 Hive Metastore 和 Hive SerDe(用于兼容现有的各种 Hive 存储格式)。

Spark SQL 在 Hive 兼容层面仅依赖 HQL parser、Hive Metastore 和 Hive SerDe。也就是说,从 HQL 被解析成抽象语法树(AST)起,就全部由 Spark SQL 接管了。执行计划生成和优化都由 Catalyst 负责。借助 Scala 的模式匹配等函数式语言特性,利用 Catalyst 开发执行计划优化策略比 Hive 要简洁得多。

Spark SQL | Spark,从入门到精通

Spark SQL

Spark SQL  提供了多种接口:

  • Sql 文本;

  • dataset/dataframe api。

当然,相应的,也会有各种客户端:

  • sql 文本,可以用 thriftserver/spark-sql;

  • 编码,Dataframe/dataset/sql。

/ Dataframe/Dataset API 简介 / 

Dataframe/Dataset 也是分布式数据集,但与 RDD 不同的是其带有 schema 信息,类似一张表。

可以用下面一张图详细对比 Dataset/dataframe 和 RDD 的区别:

Spark SQL | Spark,从入门到精通

Dataset 是在 spark1.6 引入的,目的是提供像 RDD 一样的强类型、使用强大的 lambda 函数,同时使用 Spark SQL 的优化执行引擎。到 spark2.0 以后,DataFrame 变成类型为 Row 的 Dataset,即为:

type DataFrame = Dataset[Row]

Spark SQL | Spark,从入门到精通

所以,很多移植 spark1.6 及之前的代码到 spark2+的都会报错误,找不到 dataframe 类。

基本操作

val df = spark.read.json(“file:///opt/meitu/bigdata/src/main/data/people.json”)
df.show()
import spark.implicits._
df.printSchema()
df.select("name").show()
df.select($"name", $"age" + 1).show()
df.filter($"age" > 21).show()
df.groupBy("age").count().show()
spark.stop()

分区分桶排序

分桶 排序 保存hive表
df.write.bucketBy(42,“name”).sortBy(“age”).saveAsTable(“people_bucketed”)
分区以parquet输出到指定目录
df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
分区分桶保存到hive表
df.write .partitionBy("favorite_color").bucketBy(42,"name").saveAsTable("users_partitioned_bucketed")

cube rullup pivot

cube
sales.cube("city", "year”).agg(sum("amount")as "amount”) .show()
rull up
sales.rollup("city", "year”).agg(sum("amount")as "amount”).show()
pivot 只能跟在groupby之后
sales.groupBy("year").pivot("city",Seq("Warsaw","Boston","Toronto")).agg(sum("amount")as "amount”).show()

/ SQL 编程 / 

Spark SQL 允许用户提交 SQL 文本,支持以下三种手段编写  SQL 文本:

1. spark 代码

2. spark-sql的shell

3. thriftserver

支持 Spark SQL 自身的语法,同时也兼容 HSQL。

1. 编码

要先声明构建 SQLContext 或者 SparkSession,这个是 SparkSQL 的编码入口。早起的版本使用的是 SQLContext 或者 HiveContext,spark2 以后,建议使用的是 SparkSession。

SQLContext

new SQLContext(SparkContext)

HiveContext

new HiveContext(spark.sparkContext)

SparkSession

不使用 hive 元数据:

val spark = SparkSession.builder()
 .config(sparkConf) .getOrCreate()

使用 hive 元数据:

val spark = SparkSession.builder()
 .config(sparkConf) .enableHiveSupport().getOrCreate()

使用

val df =spark.read.json("examples/src/main/resources/people.json") 
df.createOrReplaceTempView("people") 
spark.sql("SELECT * FROM people").show()

2. spark-sql 脚本

spark-sql 启动的时候类似于 spark-submit 可以设置部署模式资源等,可以使用

bin/spark-sql –help 查看配置参数。 

需要将 hive-site.xml 放到 ${SPARK_HOME}/conf/ 目录下,然后就可以测试

show tables;

select count(*) from student;

3. thriftserver

thriftserver jdbc/odbc 的实现类似于 hive1.2.1 的 hiveserver2,可以使用 spark 的 beeline 命令来测试 jdbc server。

安装部署

/1 开启 hive 的 metastore

bin/hive --service metastore

/2 将配置文件复制到spark/conf/目录下

/3 thriftserver

sbin/start-thriftserver.sh --masteryarn  --deploy-mode client

对于 yarn 只支持 client 模式。

/4 启动 bin/beeline

/5 连接到 thriftserver

!connect jdbc:hive2://localhost:10001

/ 用户自定义函数 / 

1. UDF

定义一个 udf 很简单,例如我们自定义一个求字符串长度的 udf:

val len = udf{(str:String) => str.length}
spark.udf.register("len",len)
val ds =spark.read.json("file:///opt/meitu/bigdata/src/main/data/employees.json")
ds.createOrReplaceTempView("employees")
ds.show()
spark.sql("select len(name) from employees").show()

2. UserDefinedAggregateFunction

定义一个 UDAF

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.types._

object MyAverageUDAF extends UserDefinedAggregateFunction {
 //Data types of input arguments of this aggregate function
 definputSchema:StructType = StructType(StructField("inputColumn", LongType) :: Nil)
 //Data types of values in the aggregation buffer
 defbufferSchema:StructType = {
   StructType(StructField("sum", LongType):: StructField("count", LongType) :: Nil)
 }
 //The data type of the returned value
 defdataType:DataType = DoubleType
 //Whether this function always returns the same output on the identical input
 defdeterministic: Boolean = true
 //Initializes the given aggregation buffer. The buffer itself is a `Row` that inaddition to
 // standard methods like retrieving avalue at an index (e.g., get(), getBoolean()), provides
 // the opportunity to update itsvalues. Note that arrays and maps inside the buffer are still
 // immutable.
 definitialize(buffer:MutableAggregationBuffer): Unit = {
   buffer(0) = 0L
   buffer(1) = 0L
 }
 //Updates the given aggregation buffer `buffer` with new input data from `input`
 defupdate(buffer:MutableAggregationBuffer, input: Row): Unit ={
   if(!input.isNullAt(0)) {
     buffer(0) = buffer.getLong(0)+ input.getLong(0)
     buffer(1) = buffer.getLong(1)+ 1
   }
 }
 // Mergestwo aggregation buffers and stores the updated buffer values back to `buffer1`
 defmerge(buffer1:MutableAggregationBuffer, buffer2: Row): Unit ={
   buffer1(0) = buffer1.getLong(0)+ buffer2.getLong(0)
   buffer1(1) = buffer1.getLong(1)+ buffer2.getLong(1)
 }
 //Calculates the final result
 defevaluate(buffer:Row): Double =buffer.getLong(0).toDouble /buffer.getLong(1)
}

使用 UDAF

val ds = spark.read.json("file:///opt/meitu/bigdata/src/main/data/employees.json")
ds.createOrReplaceTempView("employees")
ds.show()
spark.udf.register("myAverage", MyAverageUDAF)
val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
result.show()

3. Aggregator

定义一个 Aggregator

import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.apache.spark.sql.expressions.Aggregator
case class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)

object MyAverageAggregator extends Aggregator[Employee, Average, Double] {

 // A zero value for this aggregation. Should satisfy the property that any b + zero = b
 def zero: Average = Average(0L, 0L)
 // Combine two values to produce a new value. For performance, the function may modify `buffer`
 // and return it instead of constructing a new object
 def reduce(buffer: Average, employee: Employee): Average = {
   buffer.sum += employee.salary
   buffer.count += 1
   buffer
 }
 // Merge two intermediate values
 def merge(b1: Average, b2: Average): Average = {
   b1.sum += b2.sum
   b1.count += b2.count
   b1
 }
 // Transform the output of the reduction
 def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
 // Specifies the Encoder for the intermediate value type
 def bufferEncoder: Encoder[Average] = Encoders.product
 // Specifies the Encoder for the final output value type
 def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}

使用

spark.udf.register("myAverage2", MyAverageAggregator)
import spark.implicits._
val ds = spark.read.json("file:///opt/meitu/bigdata/src/main/data/employees.json").as[Employee]
ds.show()
val averageSalary = MyAverageAggregator.toColumn.name("average_salary")
val result = ds.select(averageSalary)
result.show()

/ 数据源 / 

1. 通用的 laod/save 函数

可支持多种数据格式:json, parquet, jdbc, orc, libsvm, csv, text

val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")

默认的是 parquet,可以通过 spark.sql.sources.default,修改默认配置。

2. Parquet 文件

val parquetFileDF =spark.read.parquet("people.parquet") 
peopleDF.write.parquet("people.parquet")

3. ORC 文件

val ds = spark.read.json("file:///opt/meitu/bigdata/src/main/data/employees.json")
ds.write.mode("append").orc("/opt/outputorc/")
spark.read.orc("/opt/outputorc/*").show(1)

4. JSON

ds.write.mode("overwrite").json("/opt/outputjson/")
spark.read.json("/opt/outputjson/*").show()

5. Hive 表

spark 1.6 及以前的版本使用 hive 表需要 hivecontext。Spark2 开始只需要创建 sparksession 增加 enableHiveSupport()即可。

val spark = SparkSession
.builder()
.config(sparkConf)
.enableHiveSupport()
.getOrCreate()

spark.sql("select count(*) from student").show()

6. JDBC

写入 mysql

wcdf.repartition(1).write.mode("append").option("user", "root")
 .option("password", "mdh2018@#").jdbc("jdbc:mysql://localhost:3306/test","alluxio",new Properties())

mysql 里读

val fromMysql = spark.read.option("user", "root")
 .option("password", "mdh2018@#").jdbc("jdbc:mysql://localhost:3306/test","alluxio",new Properties())

7. 自定义数据源

自定义 source 比较简单,首先我们要看看 source 加载的方式。指定的目录下,定义一个 DefaultSource 类,在类里面实现自定义 source,就可以实现我们的目标。

import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport}

class DefaultSource  extends DataSourceV2 with ReadSupport {

 def createReader(options: DataSourceOptions) = new SimpleDataSourceReader()
}
import org.apache.spark.sql.Row
import org.apache.spark.sql.sources.v2.reader.{DataReaderFactory, DataSourceReader}
import org.apache.spark.sql.types.{StringType, StructField, StructType}

class SimpleDataSourceReader extends DataSourceReader {

 def readSchema() = StructType(Array(StructField("value", StringType)))

 def createDataReaderFactories = {
   val factoryList = new java.util.ArrayList[DataReaderFactory[Row]]
   factoryList.add(new SimpleDataSourceReaderFactory())
   factoryList
 }
}
import org.apache.spark.sql.Row
import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory}

class SimpleDataSourceReaderFactory extends
 DataReaderFactory[Row] with DataReader[Row] {
 def createDataReader = new SimpleDataSourceReaderFactory()
 val values = Array("1", "2", "3", "4", "5")

 var index = 0

 def next = index < values.length

 def get = {
   val row = Row(values(index))
   index = index + 1
   row
 }

 def close() = Unit
}

使用

val simpleDf = spark.read
 .format("bigdata.spark.SparkSQL.DataSources")
 .load()

simpleDf.show()

/优化器及执行计划 / 

1. 流程简介

Spark SQL | Spark,从入门到精通

总体执行流程如下:从提供的输入 API(SQL,Dataset, dataframe)开始,依次经过 unresolved逻辑计划,解析的逻辑计划,优化的逻辑计划,物理计划,然后根据 cost based 优化,选取一条物理计划进行执行。

简单化成四个部分:

/1 analysis

Spark 2.0 以后语法树生成使用的是 antlr4,之前是 scalaparse。

/2 logical optimization

常量合并,谓词下推,列裁剪,boolean 表达式简化,和其它的规则。

/3 physical planning

eg:SortExec 。

/4 Codegen

codegen 技术是用 scala 的字符串插值特性生成源码,然后使用 Janino 编译成 java 字节码,Eg: SortExec。

2. 自定义优化器

/1 实现

继承 Rule[LogicalPlan]

object MultiplyOptimizationRule extends Rule[LogicalPlan] {

   def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {

     case Multiply(left,right) if right.isInstanceOf[Literal] &&

       right.asInstanceOf[Literal].value.asInstanceOf[Double] == 1.0 =>

       println("=========> optimization of one applied")

       left

   }

 }
 
   spark.experimental.extraOptimizations = Seq(MultiplyOptimizationRule)

   val multipliedDFWithOptimization = df.selectExpr("amountPaid * 1")

   println("after optimization")

/2 注册

spark.experimental.extraOptimizations= Seq(MultiplyOptimizationRule)

/3 使用

selectExpr("amountPaid* 1")

Spark SQL | Spark,从入门到精通

3. 自定义执行计划

/1 物理计划

继承 SparkLan 实现 doExecute 方法。

/2逻辑计划

继承 SparkStrategy 实现 apply。

case class FastOperator(output: Seq[Attribute],child:SparkPlan) extends SparkPlan {

 override def children: Seq[SparkPlan] = Nil

 override protected def doExecute(): RDD[InternalRow] = {
   val row = org.apache.spark.sql.Row("hi",12L)
   val unsafeRow = toUnsafeRow(row, Array(org.apache.spark.sql.types.StringType,org.apache.spark.sql.types.LongType))
   sparkContext.parallelize(Seq(unsafeRow),1)
 }

 def toUnsafeRow(row: org.apache.spark.sql.Row, schema: Array[org.apache.spark.sql.types.DataType]): org.apache.spark.sql.catalyst.expressions.UnsafeRow = {
   val converter = unsafeRowConverter(schema)
   converter(row)
 }

 def unsafeRowConverter(schema: Array[org.apache.spark.sql.types.DataType]): org.apache.spark.sql.Row => org.apache.spark.sql.catalyst.expressions.UnsafeRow = {
   val converter = org.apache.spark.sql.catalyst.expressions.UnsafeProjection.create(schema)
   (row: org.apache.spark.sql.Row) => {
     converter(org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToCatalyst(row).asInstanceOf[org.apache.spark.sql.catalyst.InternalRow])
   }
 }
}
case object NeverPlanned extends LeafNode {
 override def output: Seq[Attribute] = Nil
}

object TestStrategy extends Strategy {
 def apply(plan: LogicalPlan): Seq[SparkPlan] =
   plan match {
     case Project(pblist, child) =>
       println("mt fastOperator ------------>")
       FastOperator(pblist.map(_.toAttribute),planLater(child)) :: Nil
     case Union(children) =>
       println("mt union ========>")
       UnionExec(children.map(planLater)) :: Nil
     case LocalRelation(output, data, _) =>
       LocalTableScanExec(output, data):: Nil
     case _ => Nil
 }
}

/3 注册到 Spark 执行策略

spark.experimental.extraStrategies =Seq(countStrategy)

/4 使用

spark.sql("select count(*) fromtest")

Spark SQL | Spark,从入门到精通


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

The Art of Computer Programming, Volume 3

The Art of Computer Programming, Volume 3

Donald E. Knuth / Addison-Wesley Professional / 1998-05-04 / USD 74.99

Finally, after a wait of more than thirty-five years, the first part of Volume 4 is at last ready for publication. Check out the boxed set that brings together Volumes 1 - 4A in one elegant case, and ......一起来看看 《The Art of Computer Programming, Volume 3》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码