内容简介:今天,这个话题开启的原因是很多球友或者粉丝都私聊过浪尖,关于Spark任务运行时发生不可序列话的问题。所以,浪尖今天就统一讲解一下这块的内容。首先,要先读懂scala的闭包是怎么回事儿。可以参考我前面的文章
今天,这个话题开启的原因是很多球友或者粉丝都私聊过浪尖,关于Spark任务运行时发生不可序列话的问题。所以,浪尖今天就统一讲解一下这块的内容。
首先,要先读懂scala的闭包是怎么回事儿。可以参考我前面的文章
接着就是要理解Spark 算子闭包生成及我们编写的闭包执行的原理。接下来我们就拿map和mapPartition两个算子来开启本文讲解:
def map[U: ClassTag](f: T => U): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF)) } def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = withScope { val cleanedF = sc.clean(f) new MapPartitionsRDD( this, (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter), preservesPartitioning) }
对于任务划分,调度,执行,结果返回的原理浪尖就不在这里扩展了,浪尖在知识星球里分享过一套Spark 源码的视频,可以参考阅读。
map和mapPartitions的区别面试常考的,对于两者的区别从源码里看很明显,一个是f被迭代器迭代调用,一个是f的参数是迭代器。浪尖很早以前发过一篇文章,彻底讲解过foreach和foreachPartition的区别。可以参考理解
Spark源码系列之foreach和foreachPartition的区别
回到正题,之所以会发生不可序列化的错误,主要原因是传递给map的f函数不是在driver端执行的,所以会被序列化传输到executor节点,然后在executor节点反序列化然后执行。假如f函数里引用了map外部不可序列化的对象就会报不可序列化的异常。
但是,很多时候我们并没有直接去在闭包里使用不可序列化的对象,这个时候报异常就有点不合适了。比如下面的例子:
* class SomethingNotSerializable { * def someValue = 1 * def scope(name: String)(body: => Unit) = body * def someMethod(): Unit = scope("one") { * def x = someValue * def y = 2 * scope("two") { println(y + 1) } * } * }
此示例中,scope(two) 不可序列化,因为它引用了scope(one)(通过y),而scope(one)引用了SomethingNotSerializable(通过someValue)。但是,其实scope(two)并不直接依赖于SomethingNotSerializable。假如这种情况下抛出不可序列化异常就不科学了,所以Spark会对闭包进行一些清理操作,也即是本文中所要讲的。
主要 工具 类是ClosureCleaner。 该工具的主要作用是遍历闭包的层次结构,并且将没有被闭包实际引用的链路设置为 null ,但是仍然包含在已经编译的匿名类中。请注意直接修改封闭中的闭包是不安全的,因为可能有其他代码路径会依赖于他们。所以,我们会克隆封闭中的闭包并且相应地设置父指针。
默认情况下,可以传递清除闭包。这就意味着,我们需要检测封闭对象是否由起始对象实际引用,(要么直接引用要么间接引用),如果没有被实际使用则从层次结构中切断这些闭包。换句话说,除了清空无用字段的引用之外,也会将没有被起始闭包引用的引用封闭对象的父指针清空。传递性的确定是通过遍历闭包所调用的
再回到前面的例子, scope(two) 不可序列化,因为它引用了scope(one)(通过y),而scope(one)引用了SomethingNotSerializable(通过someValue)。但是,其实scope(two)并不直接依赖于SomethingNotSerializable。这就意味着我们可以安全的将其副本scope(one)的父指针清空,同时将其设置为scope(two)的父级,这样scope(two)就不再需要间接传递引用SomethingNotSerializable了。
解决方法
实现序列化是最直接的,假如不能的话。那就读下面的话:
那么为了不实现序列化还能尽量避免不可序列化错误,就不要在map等算子里引用外部变量,而是直接在算子中实例化,假如每次实例化代价高,那就使用mapPartitions。
还不懂,下次就只能上源码了~
推荐阅读:
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。