spark简明笔记

栏目: 编程工具 · 发布时间: 5年前

内容简介:以scala为例,我们通过IDE编写Spark应用后,将其打包成jar包,然后使用spark-submit程序进行部署优先级从高到低依次是:RDD是一个统一分布式数据抽象数据集。其下对应实际的数据存储介质,可能是文件,也可以是hadoop。通过RDD可以进行tranformation和action操作,从而实现分布式计算。

一、Spark结构

spark简明笔记
  • 使用 java 、scala、 python 任意一种语言编写的Spark应用叫Driver
  • Driver程序一般负责初始SparkContext,然后通过SparkContext与整个集群通信,进行分布式计算,比如通过SparkContext创建RDD。鉴于Driver行驶的地位,其角色上有可叫central coordinator
  • SparkContext与集群通信的方式  第一步先通过Cluster Manager申请计算资源Executor 第二步,SparkContext与Executor直接通信,将分布式计算程序发送到每个Executor 第三步,SparkContext发送当前要执行的计算Task给Executor执行  
  • WorkerNode是Spark集群中的某个具体节点,也叫slave
  • Executor是在Worker Node上开的一个应用执行器,他会在worknode上起一个JVM, 他可以执行多个Task, Executor是应用隔离的。也即一个Executor只能属于某一个Spark应用,这样Spark集群才能同时服务多个Spark应用,互不干扰。
  • Executor有点像Java中的工作线程一样,可以执行SparkContext发来的多个任务。不同的是Executor是一个独立的JVM进程
  • Cluster Manager是有多种类型,可以是Spark自带的Standalone 集群,也可以是YARN或Mesos集群

二、如何部署Spark程序

以scala为例,我们通过IDE编写Spark应用后,将其打包成jar包,然后使用spark-submit程序进行部署

./bin/spark-submit \
  --class <main-class> \
  --master <master-url> \
  --deploy-mode <deploy-mode> \
  --conf <key>=<value> \
  ... # other options
  <application-jar> \
  [application-arguments]
  • --class: 应用的主方法入口
  • --master: cluster manager集群地址,可以是local,也可以是yarn或mesos,或者spark自带的standalone 地址
  • --deploy-mode: Whether to deploy your driver on the worker nodes (cluster) or locally as an external client (client) (default: client)
  • --conf: Arbitrary Spark configuration property in key=value format. For values that contain spaces wrap “key=value” in quotes (as shown).
  • application-jar: Path to a bundled jar including your application and all dependencies. The URL must be globally visible inside of your cluster, for instance, an hdfs:// path or a file:// path that is present on all nodes.
  • application-arguments: Arguments passed to the main method of your main class, if any

2.1 上述master参数可配置的值如下

spark简明笔记

2.2 Spark配置优先级

优先级从高到低依次是:

  • 直接在代码中通过SparkConf控制,比如指定cluster manager的master参数,可以在代码中配置 val conf = new SparkConf().setAppName("WordCount").setMaster("local");
  • 在命令中指定,比如: ./bin/spark-submit  --class org.apache.spark.examples.SparkPi  --master yarn  --deploy-mode cluster \  # can be client for client mode --executor-memory 20G  --num-executors 50  /path/to/examples.jar  1000
  • 在spark的安装目录下,通过spark-defaults.conf配置。

三、RDD

RDD是一个统一分布式数据抽象数据集。其下对应实际的数据存储介质,可能是文件,也可以是hadoop。通过RDD可以进行tranformation和action操作,从而实现分布式计算。

3.1 关键数据结构

一个RDD具有以下固定的数据结构

  • 需要应用的计算操作,也即transformation
  • 当前RDD对应的分区列表。因为数据是分区存储的
  • 当前RDD依赖的父数据集。每个RDD都维护一个其依赖关系,这就构成了一个亲缘图谱叫做DAG(Directed Acyclic Graph),中文称作有向无环图。

总结来说,一个RDD的关键信息无非是,定义了数据来源,数据分布存储的情况,以及准备执行的计算逻辑。通过这些新,我们可以构建一个图,图的两个vertex分别是RDD,edge为computation

private[spark] def conf = sc.conf
  // =======================================================================
  // Methods that should be implemented by subclasses of RDD
  // =======================================================================

  /**
   * :: DeveloperApi ::
   * Implemented by subclasses to compute a given partition.
   */
  @DeveloperApi
  def compute(split: Partition, context: TaskContext): Iterator[T]//当前RDD需要执行的计算

  /**
   * Implemented by subclasses to return the set of partitions in this RDD. This method will only
   * be called once, so it is safe to implement a time-consuming computation in it.
   *
   * The partitions in this array must satisfy the following property:
   *   `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`
   */
  protected def getPartitions: Array[Partition]//当前RDD对应的分区

  /**
   * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
   * be called once, so it is safe to implement a time-consuming computation in it.
   */
  protected def getDependencies: Seq[Dependency[_]] = deps//当前RDD依赖的父亲数据集

  /**
   * Optionally overridden by subclasses to specify placement preferences.
   */
  protected def getPreferredLocations(split: Partition): Seq[String] = Nil

  /** Optionally overridden by subclasses to specify how they are partitioned. */
  @transient val partitioner: Option[Partitioner] = None

  // =======================================================================
  // Methods and fields available on all RDDs
  // =======================================================================

  /** The SparkContext that created this RDD. */
  def sparkContext: SparkContext = sc

  /** A unique ID for this RDD (within its SparkContext). */
  val id: Int = sc.newRddId()

  /** A friendly name for this RDD */
  @transient var name: String = null //当前RDD的名称

3.2 RDD 特点

  • 分布式
  • 不可变性,一个RDD生成后,就不可变,所有Transformation操作,都是在原RDD基础上生成新的RDD。
  • 自动容错特性,Spark RDD记录了数据谱系信息(Data lineage),也即check point。这样在某步失败后,可以直接重试那一步,而不用所有计算过程重来。谱系信息记录了,输入的数据,以及处理函数。由于RDD的不可变特性,以及处理函数的幂等性,使得整个重试不会有side effect。依然能保持计算的一致性
  • 没有性能优化DataFrame会根据用户的sql,自动做性能优化。而RDD要求用户自己组织transformation atcion代码,可能用户组织的不合理,会导致数据频繁在集群间移动
  • 没有结构化信息DataFrame有字段的名称,类型等信息,但RDD没有
  • Lazy Comuting.只有action时,前面所有的transformation动作才会执行。这节约了空间和时间。试想,如果每个transformation都单独执行一次,那每一次的计算调度都有时间成本,以及中间结果的存储成本

四、Spark的计算流程

  • driver创建RDD
  • RDD通过SparkContext的runJob方法,提交一次数据计算
  • SparkContext最终又交由DAGScheduler的runJob进行计算job执行
  • DAGScheduler使用handleJobSubmitted方法处理job,第一步是根据RDD中的DAG构建Stage列表。不涉及数据移动的transformation会被放到一个stage里面,比如filter和map操作,他们可以并行的在各分区中执行。第二步通过submitStage提交Stage到集群
  • DAGSchedulersubmitStage再调用submitMissingTasks方法。submitMissingTasks会将stage转化成task
  • task最后通过TaskScheduler提交到spark集群的worknode,进行实际执行
spark简明笔记

五、RDD Transformation

将RDD进行一系列变换,生成新的RDD的过程,叫做Transformation。所有那些可以就地计算,而不需要数据迁移的transformation叫做Narrow Transformation。

5.1 transformation大概源码

以map操作为例

def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    //将传进来的函数f进行clean,这里先不深究,只需要知道clean后的函数,跟原函数功能相同
    
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
    //这里返回MapPartitionsRDD对象,其构造参数为当前RDD和一个将f应用于迭代器的函数定义
  }
  • 可以看到map方法并没有执行任何函数,而只是将所有计算过程和数据包装成MapPartitionsRDD后返回。这也就是transformation操作,lazy Computing的特点所在。
  • 所有tranformation返回的都是RDD,比如filter。其余transfomation 函数源码大致同map类似,不再赘述

5.2 flatMap

map操作是将迭代RDD中的每个元素,然后将其做一定加工,返回的的依然是一个元素。而flapMap接受的函数参数的入参是RDD中的每个元素,但对该元素处理后,返回的是一个集合,而不是一个元素。flatMap源码如下:

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
  }

总结来说,map和flapMap的异同点如下:

- map接受的函数参数签名是:(f: T => U)而flatMap接受的函数参数签名为:(f: T => TraversableOnce[U]),可以看到返回的是集合

  • map和flatMap的返回值都是RDD[T]。也即是说,flatMap拥有将多个集合数据合并,抹平的功效,从该函数的命名也可看出这一点,flat是平的意思

5.2 Narrow Transformations

Narrow Transformation操作有

spark简明笔记

5.3 Wide Transformations

有些计算,需要依赖其他节点数据,这种计算会导致数据移动,成为Wide Transformations。比如,基于某个key分类的操作GroupByKey,这个Key可能散落在不同的work node上,为了进行GroupByKey计算,需要计算节点间进行数据移动,比如将某个Key对应的数据,统一移动到一个节点上。Wide Transformation操作有如下:

spark简明笔记

六、RDD Action

所有Tranformation操作,都不会真正执行,直到Action操作被调用,Action操作返回是具体值,而不是RDD。这种特性成为Lazy Computing.

Action操作触发后,会将执行结果发给Driver 或者写如到外部存储。以下操作属于Action操作:

First(), take(), reduce(), collect(),  count()

6.1 关键action源码

所有action操作,最终都会调用SparkContext的runJob方法。runJob有需多重载方法,以其中一个为例

def runJob[T, U: ClassTag](
      rdd: RDD[T],//需要处理的RDD数据
      processPartition: Iterator[T] => U,//需要在每个数据分区上进行的操作
      resultHandler: (Int, U) => Unit)//如何将上述每个分区处理后的结果进行处理

可以看到runJob中体现了所有分布式计算理论架构,即MapReduce。其中processPartition定义每个分区要需要做的map操作,这一步将减少数据量,将map操作的结果做为输入,传进reduce操作,进行汇总处理。

6.2 aggregate

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
    // Clone the zero value since we will also be serializing it as part of tasks
    var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())//1
    val cleanSeqOp = sc.clean(seqOp)
    val cleanCombOp = sc.clean(combOp)
    val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)//2
    val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)//3
    sc.runJob(this, aggregatePartition, mergeResult)//4
    jobResult//5
  }
  1. 定义一个结果汇总变量,它将存储aggregate方法最终的返回结果,初始值为zeroValue
  2. 在每个RDD数据分区上,使用迭代器,应用aggregate方法,初始值为都为zeroValue
  3. 对每个分区的结果,使用combOp方法进行汇总计算。输入index为分区的编号,taskResult为上一步每个分区计算后的结果,同汇总变量jobResult再来进行combOp计算。从第一步可知,jobResult的初始值为zeroValue
  4. 将上述两个函数作为入参,传递给sc.runJob方法,在spark集群进行执行
  5. 返回结果

举例:

val inputrdd = sc.parallelize(List(("maths", 21),("english", 22),("science", 31)),2)
    val result = inputrdd.aggregate(3)(
      (acc, value) => {
        println(acc+":"+value)
        (acc + value._2)
      },
      (acc1, acc2) => (acc1 * acc2)
    )
    println(result)//输出4032

解释:

  • 上述RDD,被切分成两个分区。第一个分区数据是("maths", 21),另一个是:("english", 22),("science", 31)
  • (acc + value._2)是每个分区要执行的操作,迭代器带入zeroValue=3后,两个分片的计算中间值如下 3+21=24//分区1 3+22+31=56//分区2
  • 最后将每个分区结果带入(acc1 * acc2)函数,从aggregate源码得知,结果计算也要运用zeroValue,在这里也就是3.于是最终步执行的计算如下: 3 24 56=4032

6.3 fold

fold函数同aggregate类似,同样是调用SparkContext的runJob函数,只不过fold只接受一个值参数,和一个函数参数,其内部在调用runJob时,分区计算和结果计算都使用同样的函数。源码如下:

def fold(zeroValue: T)(op: (T, T) => T): T = withScope {
    // Clone the zero value since we will also be serializing it as part of tasks
    var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
    val cleanOp = sc.clean(op)
    val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp)
    val mergeResult = (index: Int, taskResult: T) => jobResult = op(jobResult, taskResult)
    sc.runJob(this, foldPartition, mergeResult)
    jobResult
  }

举例:

val inputrdd = sc.parallelize(List(("maths", 21),("english", 22),("science", 31)),2)//1
val result = inputrdd.fold(("test",3))(
  (acc, ele) => {
    println(acc+":"+ele)
    ("result",acc._2 + ele._2)
  }
)
println(result)//输出:(result,83)

假设注释1中切分的2个分区为("maths", 21)和("english", 22),("science", 31),那么执行过程如下:

  1. 3+21=24
  2. 3+22+31=56
  3. 3+24+56=83

6.4 reduce

reduce同样调用了SparkContext的runJob函数,但reduce接收的参数在fold上进一步简化,少了zeroValue参数,只接收一个函数参数即可。同样该参数,在调用runJob时,即作为分区收敛的函数,记作为分区汇总计算的函数

def reduce(f: (T, T) => T): T = withScope {
    val cleanF = sc.clean(f)
    val reducePartition: Iterator[T] => Option[T] = iter => {
      if (iter.hasNext) {
        Some(iter.reduceLeft(cleanF))
      } else {
        None
      }
    }
    var jobResult: Option[T] = None
    val mergeResult = (index: Int, taskResult: Option[T]) => {
      if (taskResult.isDefined) {
        jobResult = jobResult match {
          case Some(value) => Some(f(value, taskResult.get))
          case None => taskResult
        }
      }
    }
    sc.runJob(this, reducePartition, mergeResult)
    // Get the final result out of our Option, or throw an exception if the RDD was empty
    jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
  }

举例:

val inputrdd = sc.parallelize(List(("maths", 21),("english", 22),("science", 31)),2)
    val result = inputrdd.reduce(
      (acc, ele) => {
        println(acc+":"+ele)
        ("result",acc._2 + ele._2)
      }
    )
    println(result)//结果为:(result,74)

6.5 collect&top

collect和top方法都会将数据收集到driver本地,前者是收集全部,后者是收集指定条数。所以最好知道收集的数据集较小时使用。否则会有很大的性能问题,比如大数量的传输,以及driver本地的内存压力

6.6 reduce和reduceByKey

前者是action操作,后者是transformation操作

七、RDD cache优化

RDD的数据,来至于外部存储介质,比如磁盘。而每一次用该RDD,都要去磁盘加载,这有时间和性能上的损耗。可以使用rdd的cahce方法,将该RDD缓存到内存,这样后续重复使用该RDD时,直接去内存拿。

cache的几个级别

  • MEMORY_ONLY 只缓存到内存,内存装不下的部分,下次用到时再重新计算
  • MEMORY_AND_DISK缓存到内存,内存装不下的,缓存到磁盘。这样,下次需要时,不在内存部分的数据直接从磁盘获取就行,不用重新计算
  • MEMORY_ONLY_SER只缓存到内存,但为了节约空间,将缓存对象序列化后存储
  • MEMORY_AND_DISK_SER缓存到内存,装不下的数据缓存到磁盘,都是用序列化方式存储
  • DISK_ONLY只缓存到磁盘

7.1 stage

按数据是否在分区间迁移,来划分stage。一个stage有多个task,他们会并发的在不同的分区上执行相同的计算代码。比如紧邻的map和filter就会被划在同一个stage,因为他们可以并发在各分区上执行,而不需要数据移动。而reduceByKey则会单独成为一个stage,因为其涉及到数据移动

八、RDD  lineage& DAG

RDD

从一个RDD转化成另一个RDD时,每一步都会记录上一个RDD关系。于是这形成一个血统谱系。具体

val wordCount1 = sc.textFile("InputText").flatMap(_.split("\\s+")).map((_, 1)).reduceByKey(_ + _)
    println(wordCount1.toDebugString)

最终输出:

(1) ShuffledRDD[4] at reduceByKey at SparkTest.scala:124 []
 +-(1) MapPartitionsRDD[3] at map at SparkTest.scala:124 []
    |  MapPartitionsRDD[2] at flatMap at SparkTest.scala:124 []
    |  InputText MapPartitionsRDD[1] at textFile at SparkTest.scala:124 []
    |  InputText HadoopRDD[0] at textFile at SparkTest.scala:124 []

可以看到结果以倒序的方式输出,有点像java异常时,打出的依赖栈。从最近的依赖点,一直回溯

九、DataFrame

在RDD上进一步封装的数据结构。这种数据结构可以使用SparkSql去操作处理数据,这降低了对分布式数据集的使用难度。因为你只要会sql,就可以进行一些处理

十、GraphX

十一、 如何调优

一个Spark应用最会对应多个JVM进程。分布式driver,以及该应用在每个worknode上起的JVM进程,由于driver担任的协调者角色,实际执行是worknode上的EXECUTOR,所以对于JVM的调优,主要指对Executor的调优。这些JVM进程彼此会通信,比如数据shuffle。所以优化Spark应用的思路主要从以下个方面入手:

  • 做个一个JVM应用,需要关注JVM的垃圾回收情况,各年龄带的内存分配。这个需要基于具体应用具体分析
  • 由于多个JVM进程之间设计跨网络,跨机器的数据传输,那么需要考虑如何减小传输数据量。比如将数据序列化
  • 对于Spark计算框架本身的特点,还有对数据量较大的输入,采用提高并发度,来切分输入大小。频繁使用的RDD,进行缓存,减小集群重复计算加载的开销。将各分区都要用到的公共大变量,提前brodcast到各集群等

11.1 序列化

通过sparkConf conf.set(“spark.serializer”, “org.apache.spark.serializer.KyroSerializer”)来配置,指定数据对象的序列化方式

十二、参考资料


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Pro CSS and HTML Design Patterns

Pro CSS and HTML Design Patterns

Michael Bowers / Apress / April 23, 2007 / $44.99

Design patterns have been used with great success in software programming. They improve productivity, creativity, and efficiency in web design and development, and they reduce code bloat and complexit......一起来看看 《Pro CSS and HTML Design Patterns》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具