WLDA--Spark与MPI碰撞的火花

栏目: 服务器 · 发布时间: 5年前

内容简介:恰逢Spark-2.4.0发布不久之际,接到LDA算法实现需求,经过一番了解,算法同学反馈SparkLDA极其的慢,而且非常占资源,虽然业界也有各种各样的实现,但是很难跟现有的平台融合起来,始终找不到一个好用的版本,为此,我们开始着手自行开发实现LDA。Spark目前对LDA有两种实现,一种是基于GraphX实现的EM算法,使用Gibbs采样,但是这种实现训练速度慢,同时极其耗费内存,另一种是基于在线变分贝叶斯实现的Online LDA算法,训练速度相对于EM算法快,但是它的模型是全部存放在Driver上,

恰逢Spark-2.4.0发布不久之际,接到LDA算法实现需求,经过一番了解,算法同学反馈SparkLDA极其的慢,而且非常占资源,虽然业界也有各种各样的实现,但是很难跟现有的平台融合起来,始终找不到一个好用的版本,为此,我们开始着手自行开发实现LDA。Spark目前对LDA有两种实现,一种是基于GraphX实现的EM算法,使用Gibbs采样,但是这种实现训练速度慢,同时极其耗费内存,另一种是基于在线变分贝叶斯实现的Online LDA算法,训练速度相对于EM算法快,但是它的模型是全部存放在Driver上,每轮迭代需要广播,如果模型较大,性能会极度恶化。自行开发的话需要同时兼顾性能、易用性,同时要跟现有的平台完美融合,不能专门发明新的玩法,否则也只能用于吹牛。在Review Spark的新特性中发现,Spark-2.4.0版本提出一种新的调度方式 Barrier Scheduling ,通过该特性可以很好地将Spark与MPI融合,所以我们想到可以尝鲜这种新特性,通过MPI实现LDA核心算法,再通过Spark包装提升易用性,可以完美地解决用户的痛点。虽然Spark的Barrier Scheduling特性还处于实验阶段,但是总体上符合我们的需求,通过一些扩展,我们基于Spark和MPI开发了WLDA,性能上可以大幅超越SparkLDA,同时易用性方面可以与SparkLDA媲美。

Spark与MPI的恩怨情仇

早在Spark问世以前,分布式机器学习领域始终没出现一个大家都认可的框架,基本都是各玩各的,有使用Mahout的,有使用MPI的,等等,但是这些框架或接口要么是性能太差,要么是易用性太差,例如Mahout,开始是基于MapReduce实现,可以很容易提交到Hadoop集群,但是其性能始终是个痛点,基于MPI自己撸的话,使用灵活性又很差,跟Hadoop生态很难融合到一起。Spark的横空出世打破了这种尴尬的局面,其MLlib在易用性、以及性能方面都做到用户较为满意的一个程度,MPI的易用性和灵活性曾一度被Spark拥护者们痛批,然而近年来由于数据量的爆发以及深度学习的风靡,Spark MLlib大有被各种深度学习框架淘汰的趋势,让Spark哭笑不得的是,各深度学习框架,如TensorFlow、PyTorch等都开始拥抱MPI,为了求生存,Spark也开始不得不打出AI的口号,支持MPI,融合深度学习框架,做Spark+AI的拓展,最近Spark-2.4.0版本推出新的调度方式 Barrier Execution Mode 就是为了支持MPI调度(详细解读请参考我的另一篇博客)。

总体流程

根据Spark Barrier Scheduling思路,我们可以在Task中拉起MPI进程,每个MPI进程处理单个Partition的数据,这样Spark就将每个Task的工作委托给MPI进程来完成,整体分布式运行架构如下图所示。

WLDA--Spark与MPI碰撞的火花

实际运行过程中,Spark Task(JVM)与MPI Process(C++ Native)之间的时序关系如下图所示。

WLDA--Spark与MPI碰撞的火花

在Spark Task拉起MPI-Process后,首先会给MPI-Process喂数据,喂完数据后,一直等待MPI-Process的输出;MPI-Process加载完Spark Task喂过来的数据后,开始迭代训练,训练完毕后,将输出传回给Spark Task,并结束退出;Spark-Task获取MPI-Process输出后可以继续其他工作。

MPI核心实现

为了对标Spark,WLDA核心是参考Spark LDA EM算法来实现的,分布式运行基础是基于MPI实现,使用openmp并行加速,如下图所示,MPI进程分为2组:word进程和document进程。

WLDA--Spark与MPI碰撞的火花

Word进程

Word进程是用来存储WLDA的模型,每个进程只负责模型的一部分,模型均匀分布在不同的word进程中。Word进程负责为doc进程提供word-topic模型参数,响应doc进程发送过来的模型更新以及拉取消息。其中WLDA的模型指的是word-topic矩阵(矩阵大小 = 词汇数目 x 主题数目),矩阵每行表示语料库中的一个词在各个topic中的权重,由于LDA算法本身算是一种无监督算法,一般训练完毕后模型只是一个产物。

Document进程

Doc进程是用来训练WLDA文档的,采用数据并行,每个进程只持有语料库的一部分文档,同时维护训练过程中该部分文档的主题分布,即doc-topic矩阵(矩阵大小 = 文档数目 x 主题数目),也就是我们需要得到的结果。doc进程从word进程获取word-topic参数和global_topic参数(由word-topic矩阵按行叠加),根据Gibbs Sampling算法为每个词的重新选取主题,将词的主题选取情况发送消息给word进程,通知其更新对应的模型参数。

训练过程

整个分布式训练过程描述如下:

  1. 初始化输入
    • Doc进程读取文档输入,每行一个文档,与Spark兼容,libsvm格式,(docId wordid:wordcount …)
    • Word进程启动模型服务,开始时word-topic矩阵均为0
  2. 随机初始化word-topic矩阵和doc-topic矩阵
    • Doc进程随机为每个词选取主题,更新本地的doc-topic矩阵,以及通过MPI消息通知word进程更新word-topic矩阵
    • Word进程等待doc进程推过来的word-topic矩阵,更新模型参数
  3. 训练迭代
    • Doc进程从word进程拉取global_topic参数(由word-topic矩阵按行叠加),使用Gibbs Sampling算法对本地每个词重新选取主题,更新doc-topic矩阵,并通过MPI消息通知word进程更新word-topic矩阵
    • Word进程等待doc进程拉取word-topic参数和global_topic参数、以及更新word-topic矩阵
  4. 训练完毕输出
    • Doc进程输出doc-topic矩阵
    • Word进程输出word-topic矩阵

Spark包装

上述MPI实现实际上是很难提交到我们现有的平台,例如需要提交Hadoop Yarn集群运行,首先需要基于Yarn的API写一个分布式程序来驱动,另外需要在MPI程序中支持不同文件系统的访问,这么一来需要连带的开发量特别大,而且用起来也不灵活,因为已有的平台要支持你这种新玩法需要开发新功能来支持。

为了解决易用性以及灵活性问题,我们通过Spark-2.4.0中的Barrier Scheduling来驱动我们的WLDA,这样在正式进入LDA算法前,可以通过Spark的一些操作提前做相关ETL工作,例如文档分词、词频向量化等前序工作。在我之前的一篇博客中已阐述Barrier Scheduling的用法,官方目前仅仅是实现了一个简易框架,很多细节还没完善,但是要包装我们的WLDA还是有些局限性,主要表现在以下几点:

mpirun

为解决以上几个局限性,我们在Barrier Scheduling基础上做了一些扩展,但是不污染Spark本身的源码,整体实现流程如下图所示。

WLDA--Spark与MPI碰撞的火花

上图流程描述如下:

  1. Spark-Task通过 MPICH 中的Hydra单独拉起拉起MPI-Process,不需要SSH支持,MPI-Process启动后准备好环境,创建“输入”和“输出”命名管道,“输入”命名管道用于Spark向MPI-Process喂数据,“输出”命名管道用户MPI-Process向Spark传回计算结果。
  2. 通过一次Barrier操作,等待所有的MPI-Process进程都起来。
  3. Spark Task向“输入”命令管道写数据,MPI-Process读取“输入”命名管道,实现Spark给MPI-Process喂数据。
  4. MPI-Process开始迭代训练。
  5. 训练结束后,MPI-Process写“输出”命名管道,Spark Task读取“输出”命名管道完成结果的收集。
  6. 通过一次Barrier操作,等待所有的MPI-Process进程都结束。

该流程解决了Spark Task与MPI-Process之间数据交换问题,同时不借助SSH解决了MPI-Process拉起的问题,整个流程我们将它封装为一个API,通过Scala的隐式转换方式扩充为RDD的一个算子,对于选择性交换数据问题,我们通过API参数来控制,该算子的原型如下:

/**
 * Execute MPI Program
 *
 * @param mpiProgram The mpi start command
 * @param sendFunc Send the rdd data to mpi program
 * @param sendBlackList Partition id in sendBlackList will not be send data
 * @param outputPath The result output path
 * @param outputBlackList Partition id in outputBlackList will have not output
 */
def mpi(
    mpiProgram: String,
    sendFunc: Option[(Iterator[T], OutputStream) => Unit] = None,
    sendBlackList: Set[Int] = Set.empty,
    outputPath: Option[String] = None,
    outputBlackList: Set[Int] = Set.empty): Unit

mpi 算子为一个action操作,其中: mpiProgram 为mpi二进制启动命令; sendFunc 为用户自定义Spark给MPI-Process喂数据的逻辑,默认Spark将每条记录以文本形式喂给MPI-Process; sendBlackList 指示哪些partition不用喂数据, outputPath 为最终结果保存路径, outputBlackList 指示哪些partition没有输出。

如果需要通过Spark驱动一个MPI程序,可以通过如下调用方式实现:

rdd.mpi(...)

对于WLDA,借助我们扩充的 mpi 算子很容易按如下方式实现:

class WLDAPartitioner(override val numPartitions: Int, numWordProcesses: Int) extends Partitioner {
  override def getPartition(key: Any): Int = {
    val pid = key.asInstanceOf[Long] % (numPartitions - numWordProcesses) + numWordProcesses
    pid.toInt
  }
}

object WLDA {
  def main(args: Array[String]): Unit = {
    // val params = ...
    val rdd = sc.textFile(params.inputPath)
      .zipWithIndex()
      .map { x => (x._2, x._1) }
      .reduceByKey(
        new WLDAPartitioner(numPartitions, params.numWordProcesses),
        (a: String, _: String) => a)
      .map(_._2)

    val mpiProgram = s"./wlda " +
      s"--deploy_mode=spark " +
      s"--vocab_size=${params.numVocab} " +
      s"--num_topics=${params.numTopics} " +
      s"--num_iterations=${params.numIterations} " +
      s"--num_word_procs=${params.numWordProcesses}"

    val sendBlackList = (0 until params.numWordProcesses).toSet
    rdd.mpi(mpiProgram, sendBlackList = sendBlackList, outputPath = Some(params.outputPath))
  }
}

上述代码显示通过一个自定义的Partitioner,将数据均匀分区到doc processes对应的分区中,word processes对应的分区将不会有数据, mpi 算子中的 sendBlackList 参数包含word processes对应的分区Id,指示不用给这些分区传数据。具体提交上述Spark程序时,要通过 --files 选项上传二进制文件 mpiexec.hydrawlda ,这些二进制文件编译的时候是以静态链接方式构建,以避免每台机器都装mpi基础库,所以通过这种包装,是可以提交到公司任何Spark平台(例如Tesla、自建Hadoop集群等)运行的。

性能

基于上述实现,我们将WLDA与SparkLDA做了一个对比测试,数据集使用 pubmed ,820W篇文档,词汇量为141044,训练1000个主题,使用相同的资源(1000 vcore、2T memory),100轮迭代性能情况如下:

  • Spark EM LDA: >350min
  • Spark Online LDA: Driver OOM
  • WLDA: 12min

可以看到,WLDA性能大幅优于SparkLDA,然而使用上跟SparkLDA无异,对用户无门槛。

小结

本文阐述了我们自研的一种LDA实现”WLDA”,它同时基于Spark和MPI,在性能和易用性能方面做到兼顾,该种做法并不是仅对于WLDA有效,可以拓展到其他算法,我们扩展了一个 RDD 的算子 mpi ,该算子可以轻松方便地将Spark与MPI程序协同,可以起到Spark和MPI的桥梁作用。


以上所述就是小编给大家介绍的《WLDA--Spark与MPI碰撞的火花》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

淘宝、天猫电商运营百科全书

淘宝、天猫电商运营百科全书

刘涛 / 电子工业出版社 / 2016-7 / 59.00元

有人说淘宝、天猫上90%的卖家不赚钱,我认为说得有点大了。因为如果说大家都不赚钱或者在亏钱,为什么去年在做店铺的卖家,今年还在继续?那些不赚钱的卖家,多数是没意识到市场的变化,还在用原来的套路运营店铺。市场在变,但卖家的思路却没有转变,不赚钱也在情理之中,因为淘宝、天猫的玩法变了。做店铺就是好比一场“打怪”升级的游戏,每次的升级都需要强大的装备与攻略。优胜劣汰,能活下去并且能赚钱的卖家,都是在不停......一起来看看 《淘宝、天猫电商运营百科全书》 这本书的介绍吧!

SHA 加密
SHA 加密

SHA 加密工具

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换