Spark 的核心概念 RDD

栏目: 服务器 · 发布时间: 6年前

内容简介:RDD(Resilient Distributed Dataset) 叫着 弹性分布式数据集 ,是Spark 中最基本的抽象,它代表一个不可变、可分区、里面元素可以并行计算的集合。RDD 具有数据流模型特点:自动容错、位置感知性调度和可伸缩。RDD 允许用户在执行多个查询时,显示地将工作集缓存在内存中,后续的查询能够重用工作集,这将会极大的提升查询的效率。

RDD(Resilient Distributed Dataset) 叫着 弹性分布式数据集 ,是Spark 中最基本的抽象,它代表一个不可变、可分区、里面元素可以并行计算的集合。

RDD 具有数据流模型特点:自动容错、位置感知性调度和可伸缩。

RDD 允许用户在执行多个查询时,显示地将工作集缓存在内存中,后续的查询能够重用工作集,这将会极大的提升查询的效率。

我们可以认为 RDD 就是一个代理,我们操作这个代理就像操作本地集合一样,不需去关心任务调度、容错等问题。

1.2 RDD 的属性

在 RDD 源码中这样来描述 RDD

*  - A list of partitions
*  - A function for computing each split
*  - A list of dependencies on other RDDs
*  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
*  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

复制代码
  1. 一组分片(Partition),即数据集的基本组成单位 。 对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD 的时候指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Cores 的数目;
  2. 对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度 。用户可以在创建RDD 的时候指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Cores 的数目;
  3. RDD 之间互相存在依赖关系 。 RDD 的每次转换都会生成一个新的 RDD ,所以 RDD 之前就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark 可以通过这个依赖关系重新计算丢失部分的分区数据,而不是对 RDD 的所有分区进行重新计算。
  4. 一个Partitioner ,即 RDD 的分片函数 。当前Spark 中实现了两种类型的分片函数,一个是基于哈希的 HashPartitioner ,另外一个是基于范围的 RangePartitioner。只有对于key-value的RDD ,才会有 Partitioner,非 key-value 的RDD 的 Partitioner 的值是None。Partitioner 函数不但决定了RDD 本身的分片数量,也决定了 Parent RDD Shuffle 输出时的分片数量。
  5. 一个列表,存储存取每个Partition 的优先位置(preferred location) 。 对于一个HDFS 文件来说,这个列表保存的就是每个 Partition 所在的块位置。安装“移动数据不如移动计算”的理念,Spark 在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。

2 创建 RDD

2.1 由一个存在的 Scala 集合进行创建

#通过并行化scala集合创建RDD,一般在测试的时候使用
scala> var rdd = sc.parallelize(List(1,2,3,4,5,6,7,8,9))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
复制代码

2.2 由外部的存储系统的数据集创建,包括本地的文件系统,还有所有 Hadoop 支持的数据集,比如 HDFS、Cassandra、Hbase

var rdd1 = sc.textFile("/root/words.txt")
var rdd2 = sc.textFile("hdfs:192.168.80.131:9000/words.text")
复制代码

2.3 调用一个已经存在了的RDD 的 Transformation,会生成一个新的 RDD。

3 RDD 的编程 API

3.1 Transformation

这种 RDD 中的所有转换都是延迟加载的,也就是说,他们并不会直接就计算结果。相反的,他们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个返回结果的 Driver 的动作时,这些操作才会真正的运行。这种设计会让Spark 更加有效率的运行。

常用的 Transformation 操作:

转换 含义
map(func) 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
filter(func) 返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成
flatMap(func) 类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
mapPartitions(func) 类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func) 类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U]
sample(withReplacement, fraction, seed) 根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子
union(otherDataset) 对源RDD和参数RDD求并集后返回一个新的RDD
intersection(otherDataset) 对源RDD和参数RDD求交集后返回一个新的RDD
distinct([numTasks])) 对源RDD进行去重后返回一个新的RDD
groupByKey([numTasks]) 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD
reduceByKey(func, [numTasks]) 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) 先按分区聚合 再总的聚合 每次要跟初始值交流 例如:aggregateByKey(0)( + , + ) 对k/y的RDD进行操作
sortByKey([ascending], [numTasks]) 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行 排序 的(K,V)的RDD
sortBy(func,[ascending], [numTasks]) 与sortByKey类似,但是更灵活
join(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
cogroup(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD
cartesian(otherDataset) 笛卡尔积
pipe(command, [envVars]) 调用外部程序
coalesce(numPartitions) 重新分区 第一个参数是要分多少区,第二个参数是否shuffle 默认false ;少分区变多分区 true ; 多分区变少分区 false
repartition(numPartitions) 重新分区 必须shuffle 参数是要分多少区 少变多
repartitionAndSortWithinPartitions(partitioner) 重新分区+排序 比先分区再排序效率高 对K/V的RDD进行操作

3.2 Action

触发代码的运行操作,我们一个Spark 应用,至少需要一个 Action 操作。

动作 含义
reduce(func) 通过func函数聚集RDD中的所有元素,这个功能必须是课交换且可并联的
collect() 在驱动程序中,以数组的形式返回数据集的所有元素
count() 返回RDD的元素个数
first() 返回RDD的第一个元素(类似于take(1))
take(n) 返回一个由数据集的前n个元素组成的数组
takeSample(withReplacement,num, [seed]) 返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子
takeOrdered(n, [ordering])
saveAsTextFile(path) 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
saveAsSequenceFile(path) 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。
saveAsObjectFile(path)
countByKey() 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
foreach(func) 在数据集的每一个元素上,运行函数func进行更新。
foreachPartition(func) 在每个分区上,运行函数 func

3.3 Spark WordCount 代码示例

执行流程图:

Spark 的核心概念 RDD

pom.xml 依赖

<!-- 导入scala的依赖 -->
<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>2.2.0</version>
</dependency>
<!-- 导入spark的依赖 -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.2.0</version>
</dependency>

<!-- 指定hadoop-client API的版本 -->
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.6.0</version>
</dependency>
复制代码

scala 版本代码实现:

package com.zhouq.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * scala 版本实现 wc
  *
  */
object ScalaWordCount {
  def main(args: Array[String]): Unit = {
    //这行代码是因为我在windows 上直接跑,需要去读取 hadoop 上的文件,设置我的用户名。如果是 linux  环境可以不设置。视情况而定
    System.setProperty("HADOOP_USER_NAME", "root")
    //创建spark 配置,设置应用程序名字
//    val conf = new SparkConf().setAppName("scalaWordCount")
    val conf = new SparkConf().setAppName("scalaWordCount").setMaster("local[4]")

//    conf.set("spark.testing.memory","102457600")
    //创建spark 执行的入口
    val sc = new SparkContext(conf)

    //指定以后从哪里读取数据创建RDD (弹性分布式数据集)
    //取到一行数据
    val lines: RDD[String] = sc.textFile(args(0))

    //切分压平
    val words: RDD[String] = lines.flatMap(_.split(" "))

    //按单词和一组合
    val wordAndOne: RDD[(String, Int)] = words.map((_, 1))

    //按key 进行聚合
    val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)

    // 排序, false 表示倒序
    val sorted = reduced.sortBy(_._2, false)

    //将结果保存到hdfs中
    sorted.saveAsTextFile(args(1))

    //释放资源
    sc.stop()
  }
}
复制代码

Java7 版本:

package com.zhouq.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;

/**
* Java 版WordCount
*/
public class JavaWordCount {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("JavaWordCount");
        //创建SparkContext
        JavaSparkContext jsc = new JavaSparkContext(conf);
        //指定读取数据的位置
        JavaRDD<String> lines = jsc.textFile(args[0]);

        //切分压平
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String line) throws Exception{
                return Arrays.asList(line.split(" ")).iterator();
            }
        });

        //将单词进行组合 (a,1),(b,1),(c,1),(a,1)
        JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String tp) throws Exception {
                return new Tuple2<>(tp, 1);
            }
        });

        //先交换再排序,因为 只有groupByKey 方法
        JavaPairRDD<Integer, String> swaped = wordAndOne.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
            @Override
            public Tuple2<Integer, String> call(Tuple2<String, Integer> tp) throws Exception {
//                return new Tuple2<>(tp._2, tp._1);
                return tp.swap();
            }
        });

        //排序
        JavaPairRDD<Integer, String> sorted = swaped.sortByKey(false);

        //再次交换顺序
        JavaPairRDD<String, Integer> result = sorted.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Tuple2<Integer, String> tp) throws Exception {
                return tp.swap();
            }
        });

        //输出到hdfs
        result.saveAsTextFile(args[1]);

        jsc.stop();
    }
}
复制代码

Java8 版本:

package com.zhouq.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;

/**
* Java Lambda 表达式版本的  WordCount
*/
public class JavaLambdaWordCount {

    public static void main(String[] args) {

        SparkConf conf = new SparkConf().setAppName("JavaWordCount");
        //创建SparkContext
        JavaSparkContext jsc = new JavaSparkContext(conf);
        //指定读取数据的位置
        JavaRDD<String> lines = jsc.textFile(args[0]);

        //切分压平
//        lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
        JavaRDD<String> words = lines.flatMap((FlatMapFunction<String, String>) line -> Arrays.asList(line.split(" ")).iterator());

        //将单词进行组合 (a,1),(b,1),(c,1),(a,1)
//        words.mapToPair(tp -> new Tuple2<>(tp,1));
        JavaPairRDD<String, Integer> wordAndOne = words.mapToPair((PairFunction<String, String, Integer>) tp -> new Tuple2<>(tp, 1));

        //先交换再排序,因为 只有groupByKey 方法
//        swaped.mapToPair(tp -> tp.swap());
        JavaPairRDD<Integer, String> swaped = wordAndOne.mapToPair((PairFunction<Tuple2<String, Integer>, Integer, String>) tp -> {
//                return new Tuple2<>(tp._2, tp._1);
            return tp.swap();
        });

        //排序
        JavaPairRDD<Integer, String> sorted = swaped.sortByKey(false);

        //再次交换顺序
//        sorted.mapToPair(tp -> tp.swap());
        JavaPairRDD<String, Integer> result = sorted.mapToPair((PairFunction<Tuple2<Integer, String>, String, Integer>) tp -> tp.swap());

        //输出到hdfs
        result.saveAsTextFile(args[1]);

        jsc.stop();
    }
}
复制代码

4 RDD 的依赖关系

RDD 和它依赖的 父 RDD(可能有多个) 的关系有两种不同的类型,即 窄依赖(narrow dependency)和宽依赖(wide dependency)。

Spark 的核心概念 RDD

窄依赖:窄依赖指的是每一个父 RDD 的 Partition 最多被子 RDD 的一个分区使用。可以比喻为独生子女。 宽依赖 :宽依赖是多个字 RDD 的Partition 会依赖同一个父 RDD 的 Partition

5 RDD 的持久化

5.1 RDD 的 cache(持久化)

Spark中最重要的功能之一是跨操作在内存中持久化(或缓存)数据集。当您持久保存RDD时,每个节点都会存储它在内存中计算的任何分区,并在该数据集(或从中派生的数据集)的其他操作中重用它们。这使得未来的行动更快(通常超过10倍)。缓存是迭代算法和快速交互使用的关键工具。

您可以使用persist()或cache()方法标记要保留的RDD 。第一次在动作中计算它,它将保留在节点的内存中。Spark的缓存是容错的 - 如果丢失了RDD的任何分区,它将使用最初创建它的转换自动重新计算。

5.2 什么时候我们需要持久化?

  1. 要求的计算速度快
  2. 集群的资源要足够大
  3. 重要: cache 的数据会多次触发Action
  4. 建议先进行数据过滤,然后将缩小范围后的数据再cache 到内存中.

5.3 如何使用

使用 rdd.persist()或者rdd.cache()

val lines: RDD[String] = sc.textFile("hdfs://xxx/user/accrss")
//使用cache 方法来缓存数据到内存
val cache = lines.cache()
//注意查看下面两次count 的时间
cached.count
cached.count

复制代码

5.4 数据缓存的存储级别 StorageLevel

我们在 StorageLevel.scala 源码中可以看到:

val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
复制代码

解释一下各个参数的意思:

第一个参数表示: 放到磁盘 第二个参数表示: 放到内存 第三个参数表示: 磁盘中的数据是否以 Java 对象的方式保存,true 表示是, false表示以序列化的方式存放 第四个参数表示: 内存中的数据是否以Java 对象的方式保存,true 表示是, false表示以序列化的方式存放 第五个参数表示: 存放几份数据(目的是为了怕executor 出现故障导致分区数据丢失,当重新分配任务时,去另外的机器读取备份数据进行重新计算)

OFF_HEAP : 堆外内存,以序列化的格式存储RDD到Tachyon(一个分布式内存存储系统)中

5.5 如何选择存储级别

Spark的多个存储级别意味着在内存利用率和cpu利用效率间的不同权衡。我们推荐通过下面的过程选择一个合适的存储级别:

  1. 如果你的RDD适合默认的存储级别(MEMORY_ONLY),就选择默认的存储级别。因为这是cpu利用率最高的选项,会使RDD上的操作尽可能的快。
  2. 如果不适合用默认的级别,选择MEMORY_ONLY_SER。选择一个更快的序列化库提高对象的空间使用率,但是仍能够相当快的访问。
  3. 除非函数计算RDD的花费较大或者它们需要过滤大量的数据,不要将RDD存储到磁盘上,否则,重复计算一个分区就会和重磁盘上读取数据一样慢。
  4. 如果你希望更快的错误恢复,可以利用重复(replicated)存储级别。所有的存储级别都可以通过重复计算丢失的数据来支持完整的容错,但是重复的数据能够使你在RDD上继续运行任务,而不需要重复计算丢失的数据。
  5. 在拥有大量内存的环境中或者多应用程序的环境中,OFF_HEAP具有如下优势:
    1. 它运行多个执行者共享Tachyon中相同的内存池
    2. 它显著地减少垃圾回收的花费
    3. 如果单个的执行者崩溃,缓存的数据不会丢失

5.6 删除 cache

Spark自动的监控每个节点缓存的使用情况,利用最近最少使用原则删除老旧的数据。如果你想手动的删除RDD,可以使用 RDD.unpersist()方法

5.7 RDD 的 checkpoint机制

我们除了把数据缓存到内存中,还可以把数据缓存到HDFS 中,保证中间数据不丢失.

什么时候我们需要做chechpoint?

  1. 做复杂的迭代计算,要求保证数据安全,不丢失
  2. 对速度要求不高(跟 cache 到内存进行对比)
  3. 将中间结果保存到 hdfs 中

怎么做 checkpoint ?

首先设置 checkpoint 目录,然后再执行计算逻辑,再执行 checkpoint() 操作。

下面代码使用cache 和 checkpoint 两种方式实现计算每门课最受欢迎老师的 topN

package com.zhouq.spark

import java.net.URL
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * 求每门课程最受欢迎老师TopN  --2
  *   -- 使用cache
  *   -- 使用checkpoint 一般设置hdfs 目录
  */
object GroupFavTeacher2_cache_checkpoint {
  def main(args: Array[String]): Unit = {
    //前 N
    val topN = args(1).toInt
    //学科集合
    val subjects = Array("bigdata", "javaee", "php")
    val conf = new SparkConf().setAppName("FavTeacher").setMaster("local[4]")
    //创建spark 执行入口
    val sc = new SparkContext(conf)
    //checkpoint 得先设置 sc 的checkpoint 的dir
//    sc.setCheckpointDir("hdfs://hdfs://hadoop1:8020/user/root/ck20190215")

    //指定读取数据
    val lines: RDD[String] = sc.textFile(args(0))
    val subjectTeacherAndOne: RDD[((String, String), Int)] = lines.map(line => {
      val index = line.lastIndexOf("/")
      var teacher = line.substring(index + 1)
      var httpHost = line.substring(0, index)
      var subject = new URL(httpHost).getHost.split("[.]")(0)
      ((subject, teacher), 1)
    })
    //将学科,老师联合当做key
    val reduced: RDD[((String, String), Int)] = subjectTeacherAndOne.reduceByKey(_ + _)

    //第一种使用cache RDD 把数据缓存在内存中.标记为cache 的RDD 以后被反复使用,才使用cache
    val cached: RDD[((String, String), Int)] = reduced.cache()

    //第二种 使用checkpoint,得先设置 sc 的 checkpointDir
//   val cached: RDD[((String, String), Int)] = reduced.checkpoint()

    /**
      * 先对学科进行过滤,然后再进行排序,调用RDD 的sortBy进行排序,避免scala 的排序当数据量大时,内存不足的情况.
      * take 是Action 操作,每次take 都会进行一次任务提交,具体查看日志打印情况
      */
    for (sub <- subjects) {
      //过滤出当前的学科
      val filtered: RDD[((String, String), Int)] = cached.filter(_._1._1 == sub)
      //使用RDD 的 sortBy ,内存+磁盘排序,避免scala 中的排序因内存不足导致异常情况.
      //take 是Action 的,所以每次循环都会触发一次提交任务,祥见日志打印情况
      val favTeacher: Array[((String, String), Int)] = filtered.sortBy(_._2, false).take(topN)
      println(favTeacher.toBuffer)
    }

    /**
      * 前面cache的数据已经计算完了,后面还有很多其他的指标要计算
      * 后面计算的指标也要触发很多次Action,最好将数据缓存到内存
      * 原来的数据占用着内存,把原来的数据释放掉,才能缓存新的数据
      */

    //把原来缓存的数据释放掉
    cached.unpersist(true)

    sc.stop()
  }
}
复制代码

6 DAG 的生成

DAG(Directed Acyclic Graph)叫做有向无环图,原始的RDD通过一系列的转换就就形成了DAG,根据RDD之间的依赖关系的不同将DAG划分成不同的Stage,对于窄依赖,partition的转换处理在Stage中完成计算。对于宽依赖, 由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的依据。

Spark 的核心概念 RDD

微信公众号文章链接:Spark RDD

有兴趣欢迎关注,大家一起交流学习。

Spark 的核心概念 RDD

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Pro JavaScript Design Patterns

Pro JavaScript Design Patterns

Dustin Diaz、Ross Harmes / Apress / 2007-12-16 / USD 44.99

As a web developer, you’ll already know that JavaScript™ is a powerful language, allowing you to add an impressive array of dynamic functionality to otherwise static web sites. But there is more power......一起来看看 《Pro JavaScript Design Patterns》 这本书的介绍吧!

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具