内容简介:Pandas、Numpy是做数据分析最常使用的Python包,如果数据存在Hadoop又想用Pandas做一些数据处理,通常会使用PySpark的可以看到,近500w数据的toPandas操作,开启arrow后,粗略耗时统计从39s降低为2s。如何开启arrow,就是spark.sql.execution.arrow.enabled=true这个配置了,spark2.3开始支持。
用ApacheArrow加速PySpark
Pandas、Numpy是做数据分析最常使用的 Python 包,如果数据存在Hadoop又想用Pandas做一些数据处理,通常会使用PySpark的 DataFrame.toPandas() 这个方法。让人不爽的是,这个方法执行很慢,数据量越大越慢。
做个测试
Using Python version 2.7.14 (default, Oct 5 2017 02:28:52) SparkSession available as 'spark'. >>> def test(): ... from pyspark.sql.functions import rand ... from better_utils import TimeUtil ... start = TimeUtil.now_unix() ... df = spark.range(1 << 22).toDF('id').withColumn("x", rand()) ... df.toPandas() ... cost = TimeUtil.now_unix() - start ... print "耗时:{}s".format(cost) ... >>> test() 耗时:39s >>> spark.conf.set("spark.sql.execution.arrow.enabled", "true") >>> test() /Users/yulian/anaconda3/envs/python2/lib/python2.7/site-packages/pyarrow/__init__.py:152: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream warnings.warn("pyarrow.open_stream is deprecated, please use " 耗时:2s >>>
可以看到,近500w数据的toPandas操作,开启arrow后,粗略耗时统计从39s降低为2s。
如何开启arrow,就是spark.sql.execution.arrow.enabled=true这个配置了,spark2.3开始支持。
另外需要安装pip install pyarrow。
是什么
Arrow是一种跨语言的基于内存的列式数据结构。
在分布式系统内部,每个系统都有自己的内存格式,大量的 CPU 资源被消耗在序列化和反序列化过程中,并且由于每个项目都有自己的实现,没有一个明确的标准,造成各个系统都在重复着复制、转换工作,这种问题在微服务系统架构出现之后更加明显,Arrow 的出现就是为了解决这一问题。作为一个跨平台的数据层,我们可以使用 Arrow 加快大数据分析项目的运行速度。
需要明确的是,Apache Arrow 不是一个引擎,也不是一个存储系统,它是用来处理分层的列式内存数据的一系列格式和算法。
为什么
PySpark中使用DataFrame.toPandas()将数据从Spark DataFrame转换到Pandas中是非常低效的。
Spark和Python基于Socket通信,使用serializers/deserializers交换数据。
Python的反序列化pyspark.serializers.PickleSerializer使用cPickle模块的标准pickle格式。
Spark先把所有的行汇聚到driver上,然后通过初始转换,以消除Scala和 Java 之间的任何不兼容性,使用Pyrolite库的org.apache.spark.api.python.SerDeUtil.AutoBatchedPickler去把Java对象序列化成pickle格式。
然后序列化后的数据分批发送个Python的worker子进程,这个子进程会反序列化每一行,拼成一个大list;最后利用 pandas.DataFrame.from_records() 从这个list来创建一个Pandas DataFrame。
上面的过程有两个明显问题:
1)即使使用CPickle,Python的序列化也是一个很慢的过程。
2)利用 from_records 来创建一个 pandas.DataFrame 需要遍历Python list,将每个value转换成Pandas格式。
Arrow可以优化这几个步骤:
1)一旦数据变成了Arrow的内存格式,就不再有序列化的需要,因为Arrow数据可以直接发送到Python进程。
2)当在Python里接收到Arrow数据后,pyarrow可以利用zero-copy技术,一次性的从整片数据来创建 pandas.DataFrame,而不需要轮询去处理每一行记录。另外转换成Arrow数据的过程可以在JVM里并行完成,这样可以显著降低driver的压力。
Arrow有点可以总结为:
* 序列化友好
* 向量化
序列化友好指的是,Arrow提供了一个内存格式,该格式本身是跨应用的,无论你放到哪,都是这个格式,中间如果需要网络传输这个格式,那么也是序列化友好的,只要做下格式调整(不是序列化)就可以将数据发送到另外一个应用里。这样就大大的降低了序列化开销。
向量化指的是,首先Arrow是将数据按block进行传输的,其次是可以对立面的数据按列进行处理的。这样就极大的加快了处理速度。
感兴趣的话可以看下Python各种序列化方案的对比:
http://satoru.rocks/2018/08/fastest-way-to-serialize-array/以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Rationality for Mortals
Gerd Gigerenzer / Oxford University Press, USA / 2008-05-02 / USD 65.00
Gerd Gigerenzer's influential work examines the rationality of individuals not from the perspective of logic or probability, but from the point of view of adaptation to the real world of human behavio......一起来看看 《Rationality for Mortals》 这本书的介绍吧!