Spark学习笔记之RDD持久化(四)

栏目: 编程工具 · 发布时间: 7年前

内容简介:Spark学习笔记之RDD持久化(四)

一:SparkRDD持久化的有点

Spark最重要的一个功能,就是在不同操作间,持久化(或缓存)一个数据集在内存中。当你持久化一个RDD,每一个结点都将把它的计算分块结果保存在内存中,并在对此数据集(或者衍生出的数据集)进行的其它动作中重用。这将使得后续的动作(action)变得更加迅速(通常快10倍)。缓存是用Spark构建迭代算法的关键。RDD的缓存能够在第一次计算完成后,将计算结果保存到内存、本地文件系统或者Tachyon(分布式内存文件系统)中。通过缓存,Spark避免了RDD上的重复计算,能够极大地提升计算速度。

二:如何持久化

Spark通过 persist()或cache() 方法可以标记一个要被持久化的RDD,一旦首次被触发,该RDD将会被保留在计算节点的内存中并重用。实际上 cache()是使用persist()的快捷方法

首先,在action中计算得到rdd;然后,将其保存在每个节点的内存中。Spark的缓存是一个容错的技术,如果RDD的任何一个分区丢失,它可以通过原有的转换(transformations)操作自动的重复计算并且创建出这个分区。

此外,我们可以利用不同的存储级别存储每一个被持久化的RDD。例如,它允许我们持久化集合到磁盘上、将集合作为序列化的 Java 对象持久化到内存中、在节点间复制集合或者存储集合到Tachyon中。我们可以通过传递一个StorageLevel对象给persist()方法设置这些存储级别。 cache()方法使用了默认的存储级别—StorageLevel.MEMORY_ONLY 。完整的存储级别介绍如下图

Spark学习笔记之RDD持久化(四)

StorageLevel 源码

val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

说明: 上面"_2"代表的是份数,就是把持久化的数据存为2份

StorageLevel有五个属性分别是

    private var _useDisk: Boolean, //useDisk_是否使用磁盘
    private var _useMemory: Boolean, //useMemory_是否使用内存
    private var _useOffHeap: Boolean, //useOffHeap_是否使用堆外内存如:Tachyon,
    private var _deserialized: Boolean,//deserialized_是否进行反序列化
    private var _replication: Int = 1) //replication_备份数目。

三:存储级别的选择

Spark的多个存储级别意味着在内存利用率和cpu利用效率间的不同权衡。我们推荐通过下面的过程选择一个合适的存储级别:

1:如果你的RDD适合 默认的存储级别(MEMORY_ONLY) ,就选择默认的存储级别。因为这是cpu利用率最高的选项,会使RDD上的操作尽可能的快。

2:如果不适合用默认的级别,选择MEMORY_ONLY_SER。选择一个更快的序列化库提高对象的空间使用率,但是仍能够相当快的访问。

3:除非函数计算RDD的花费较大或者它们需要过滤大量的数据,不要将RDD存储到磁盘上,否则,重复计算一个分区就会和重磁盘上读取数据一样慢。

4:如果你希望更快的错误恢复,可以利用重复存储级别。所有的存储级别都可以通过重复计算丢失的数据来支持完整的容错,但是重复的数据能够使你在RDD上继续运行任务,而不需要重复计算丢失的数据。

看下图

Spark学习笔记之RDD持久化(四)

注意只能设置一种:不然会抛异常: Cannot change storage level of an RDD after it was already assigned a level

异常源码如下

 private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
    // TODO: Handle changes of StorageLevel
    if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {
      throw new UnsupportedOperationException(
        "Cannot change storage level of an RDD after it was already assigned a level")
    }
    // If this is the first time this RDD is marked for persisting, register it
    // with the SparkContext for cleanups and accounting. Do this only once.
    if (storageLevel == StorageLevel.NONE) {
      sc.cleaner.foreach(_.registerRDDForCleanup(this))
      sc.persistRDD(this)
    }
    storageLevel = newLevel
    this
  }

四:如何使用缓存

1: 调用rdd.persist(); 变量可以这样设置 如:rdd.persist(StorageLevel.MEMORY_ONLY); 这里使用了MEMORY_ONLY级别存储。当然也可以选择其他的如: rdd.persist(StorageLevel.DISK_ONLY());

2: 调用rdd.cache() 方法, cache()是rdd.persist(StorageLevel.MEMORY_ONLY)的简写 ,效果和他一模一样的。

3: 调用rdd.unpersist()清除缓存

我通过一个demo看下

 public class SparkCacheDemo {
    private static JavaSparkContext sc;
    public static void main(String[] args) {
        List list = Arrays.asList(5, 4, 3, 2, 1, 6, 9);
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SparkCacheDemo");
        sc = new JavaSparkContext(conf);
        JavaRDD rdd = sc.parallelize(list);
        // rdd.persist(StorageLevel.DISK_ONLY()); //磁盘存储
        rdd.persist(StorageLevel.MEMORY_ONLY());//内存
        // rdd.persist(StorageLevel.MEMORY_ONLY_2()); //内存存储两份
        rdd.collect();
        rdd.collect();   //这里可以设置debug断点便于查看
        rdd.unpersist(); //清楚缓存
        rdd.collect();  //这里也可以设置debug断点便于查看
    }
}

启动后设置上面连个debug点  然后查看页面 http://127.0.0.1:4040/storage/ 可以看到相关信息 如下图

磁盘

Spark学习笔记之RDD持久化(四)

内存

Spark学习笔记之RDD持久化(四)

 五:缓存性能测试

我们知道StorageLevel.MEMORY_ONLY级别和不用缓存的级别相差10倍,我们一起来验证下,看代码

package com.demo.spark.cache;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.storage.StorageLevel;
public class SparkCacheTest {
    private static JavaSparkContext sc;
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SparkCacheTest");
        sc = new JavaSparkContext(conf);
        sc.setLogLevel("error");
        noCache();
        cache();
        System.out.println("");
    }
    /**
     * 不用缓存
     * 
     * @Title: noCache
     * @author zhuhuipei
     * @date 2017年6月2日 下午4:22:01
     */
    public static void noCache() {
        JavaRDD rdd = sc.textFile("./test.txt");
        rdd.count();
        Long t1 = System.currentTimeMillis();
        System.out.println("noCache()=rdd.count()=" + rdd.count());
        Long t2 = System.currentTimeMillis();
        Long t2_t1 = t2 - t1;
        System.out.println("nocache()=" + t2_t1);
    }
    /**
     * 用缓存
     * @Title: cache
     * @author zhuhuipei
     * @date 2017年6月2日 下午5:03:51
     */
    public static void cache() {
        JavaRDD rdd = sc.textFile("./test.txt").persist(StorageLevel.MEMORY_ONLY());
        rdd.count();
        Long t1 = System.currentTimeMillis();
        System.out.println(" cache()=rdd.count()=" + rdd.count());
        Long t2 = System.currentTimeMillis();
        Long t2_t1 = t2 - t1;
        System.out.println("cache()=" + t2_t1);
    }
}

本人本地直接运行两者相差不大差不多在(3~5倍左右),并没有达到10倍性能,可能和数据量和运行环境有关系。

源码地址:

https://github.com/zhp8341/sparkdemo/blob/master/src/main/java/com/demo/spark/cache/SparkCacheTest.java

https://github.com/zhp8341/sparkdemo/blob/master/src/main/java/com/demo/spark/cache/SparkCacheDemo.java

测试文件: https://raw.githubusercontent.com/zhp8341/sparkdemo/master/test.txt 文件有点大16M 


以上所述就是小编给大家介绍的《Spark学习笔记之RDD持久化(四)》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

产品经理的20堂必修课

产品经理的20堂必修课

徐建极 / 人民邮电出版社 / 2013-9-1 / 59.00元

《产品经理的20堂必修课》以作者八年的产品经理工作实践为基础,通过系统的理论结合丰富的实例的方法,全面地总结了作为一名互联网产品经理所应掌握的知识。 《产品经理的20堂必修课》分为三大部分。 讲产品:深入剖析互联网产品成功的要素,分别从需求导向、简单原则、产品运营、战略布局等维度,分析如何让产品在残酷的互联网竞争中脱颖而出。 讲方法:着重分析优秀的产品团队运作的工作方法和程序,详......一起来看看 《产品经理的20堂必修课》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

SHA 加密
SHA 加密

SHA 加密工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换