内容简介:今天,这个话题开启的原因是很多球友或者粉丝都私聊过浪尖,关于Spark任务运行时发生不可序列话的问题。所以,浪尖今天就统一讲解一下这块的内容。首先,要先读懂scala的闭包是怎么回事儿。可以参考我前面的文章
今天,这个话题开启的原因是很多球友或者粉丝都私聊过浪尖,关于Spark任务运行时发生不可序列话的问题。所以,浪尖今天就统一讲解一下这块的内容。
首先,要先读懂scala的闭包是怎么回事儿。可以参考我前面的文章
接着就是要理解Spark 算子闭包生成及我们编写的闭包执行的原理。接下来我们就拿map和mapPartition两个算子来开启本文讲解:
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
preservesPartitioning)
}
对于任务划分,调度,执行,结果返回的原理浪尖就不在这里扩展了,浪尖在知识星球里分享过一套Spark 源码的视频,可以参考阅读。
map和mapPartitions的区别面试常考的,对于两者的区别从源码里看很明显,一个是f被迭代器迭代调用,一个是f的参数是迭代器。浪尖很早以前发过一篇文章,彻底讲解过foreach和foreachPartition的区别。可以参考理解
Spark源码系列之foreach和foreachPartition的区别
回到正题,之所以会发生不可序列化的错误,主要原因是传递给map的f函数不是在driver端执行的,所以会被序列化传输到executor节点,然后在executor节点反序列化然后执行。假如f函数里引用了map外部不可序列化的对象就会报不可序列化的异常。
但是,很多时候我们并没有直接去在闭包里使用不可序列化的对象,这个时候报异常就有点不合适了。比如下面的例子:
* class SomethingNotSerializable {
* def someValue = 1
* def scope(name: String)(body: => Unit) = body
* def someMethod(): Unit = scope("one") {
* def x = someValue
* def y = 2
* scope("two") { println(y + 1) }
* }
* }
此示例中,scope(two) 不可序列化,因为它引用了scope(one)(通过y),而scope(one)引用了SomethingNotSerializable(通过someValue)。但是,其实scope(two)并不直接依赖于SomethingNotSerializable。假如这种情况下抛出不可序列化异常就不科学了,所以Spark会对闭包进行一些清理操作,也即是本文中所要讲的。
主要 工具 类是ClosureCleaner。 该工具的主要作用是遍历闭包的层次结构,并且将没有被闭包实际引用的链路设置为 null ,但是仍然包含在已经编译的匿名类中。请注意直接修改封闭中的闭包是不安全的,因为可能有其他代码路径会依赖于他们。所以,我们会克隆封闭中的闭包并且相应地设置父指针。
默认情况下,可以传递清除闭包。这就意味着,我们需要检测封闭对象是否由起始对象实际引用,(要么直接引用要么间接引用),如果没有被实际使用则从层次结构中切断这些闭包。换句话说,除了清空无用字段的引用之外,也会将没有被起始闭包引用的引用封闭对象的父指针清空。传递性的确定是通过遍历闭包所调用的
再回到前面的例子, scope(two) 不可序列化,因为它引用了scope(one)(通过y),而scope(one)引用了SomethingNotSerializable(通过someValue)。但是,其实scope(two)并不直接依赖于SomethingNotSerializable。这就意味着我们可以安全的将其副本scope(one)的父指针清空,同时将其设置为scope(two)的父级,这样scope(two)就不再需要间接传递引用SomethingNotSerializable了。
解决方法
实现序列化是最直接的,假如不能的话。那就读下面的话:
那么为了不实现序列化还能尽量避免不可序列化错误,就不要在map等算子里引用外部变量,而是直接在算子中实例化,假如每次实例化代价高,那就使用mapPartitions。
还不懂,下次就只能上源码了~
推荐阅读:
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Music Recommendation and Discovery
Òscar Celma / Springer / 2010-9-7 / USD 49.95
With so much more music available these days, traditional ways of finding music have diminished. Today radio shows are often programmed by large corporations that create playlists drawn from a limited......一起来看看 《Music Recommendation and Discovery》 这本书的介绍吧!