为啥Spark 的Broadcast要用单例模式

栏目: 服务器 · 发布时间: 5年前

内容简介:很多用Spark Streaming 的朋友应该使用过broadcast,大多数情况下广播变量都是以单例模式声明的有没有粉丝想过为什么?浪尖在这里帮大家分析一下,有以下几个原因:先看例子,后面逐步揭晓内部机制。下面是一个双重检查式的broadcast变量的声明方式。

很多用Spark Streaming 的朋友应该使用过broadcast,大多数情况下广播变量都是以单例模式声明的有没有粉丝想过为什么?浪尖在这里帮大家分析一下,有以下几个原因:

  1. 广播变量大多数情况下是不会变更的,使用单例模式可以减少spark streaming每次job生成执行,重复生成广播变量带来的开销。
  2. 单例模式也要做同步。这个对于很多新手来说可以不用考虑同步问题,原因很简单因为新手不会调整spark 程序task的调度模式,而默认采用FIFO的调度模式,基本不会产生并发问题。1).假如你配置了Fair调度模式,同时修改了Spark Streaming运行的并行执行的job数,默认为1,那么就要加上同步代码了。2).还有一个原因,在多输出流的情况下共享broadcast,同时配置了Fair调度模式,也会产生并发问题。
  3. 注意。有些时候比如广播配置文件,规则等需要变更broadcast,在使用fair的时候可以在foreachrdd里面使用局部变量作为广播,避免相互干扰。

先看例子,后面逐步揭晓内部机制。

1.例子

下面是一个双重检查式的broadcast变量的声明方式。

object WordBlacklist { 
 
  @volatile private var instance: Broadcast[Seq[String]] = null 
 
  def getInstance(sc: SparkContext): Broadcast[Seq[String]] = { 
    if (instance == null) { 
      synchronized { 
        if (instance == null) { 
          val wordBlacklist = Seq("a", "b", "c") 
          instance = sc.broadcast(wordBlacklist) 
        } 
      } 
    } 
    instance 
  } 
} 

广播变量的使用方法如下:

val lines = ssc.socketTextStream(ip, port) 
    val words = lines.flatMap(_.split(" ")) 
    val wordCounts = words.map((_, 1)).reduceByKey(_ + _) 
    wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) => 
      // Get or register the blacklist Broadcast 
      val blacklist = WordBlacklist.getInstance(rdd.sparkContext) 
      // Get or register the droppedWordsCounter Accumulator 
      val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext) 
      // Use blacklist to drop words and use droppedWordsCounter to count them 
      val counts = rdd.filter { case (word, count) => 
        if (blacklist.value.contains(word)) { 
          droppedWordsCounter.add(count) 
          false 
        } else { 
          true 
        } 
      }.collect().mkString("[", ", ", "]") 
      val output = s"Counts at time $time $counts" 
      println(output) 
      println(s"Dropped ${droppedWordsCounter.value} word(s) totally") 
      println(s"Appending to ${outputFile.getAbsolutePath}") 
      Files.append(output + "\n", outputFile, Charset.defaultCharset()) 
    } 

2.概念补充

为啥Spark 的Broadcast要用单例模式

首先,一个基本概念就是Spark应用程序从开始提交到task执行分了很多层。

  1. 应用调度器。 主要是资源管理器,比如standalone,yarn等负责Spark整个应用的调度和集群资源的管理。
  2. job调度器。s park 的算子分为主要两大类,transform和action,其中每一个action都会产生一个job。这个job需要在executor提供的资源池里调度执行,当然并不少直接调度执行job。
  3. stage划分及调度。 job具体会划分为若干stage,这个就有一个基本的概念就是宽依赖和窄依赖,宽依赖就会划分stage。stage也需要调度执行,从后往前划分,从前往后调度执行。
  4. task切割及调度。 stage往下继续细化就是会根据不太的并行度划分出task集合,这个就是在executor上调度执行的基本单元,目前的调度默认是一个task一个cpu。
  5. Spark Streaming 的job生成是周期性的。 当前job的执行时间超过生成周期就会产生job 累加。累加一定数目的job后有可能会导致应用程序失败。这个主要原因是由于FIFO的调度模式和Spark Streaming的默认单线程的job执行机制

3.Spark Streaming job生成

这个源码主要入口是StreamingContext#JobScheduler#JobGenerator对象,内部有个RecurringTimer,主要负责按照批处理时间周期产生GenrateJobs事件,当然在存在windows的情况下,该周期有可能不会生成job,要取决于滑动间隔,有兴趣自己去揭秘,浪尖星球里分享的视频教程里讲到了。具体代码块如下

private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, 
   longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator") 

我们直接看其实现代码块:

eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") { 
      override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) 
 
      override protected def onError(e: Throwable): Unit = { 
        jobScheduler.reportError("Error in job generator", e) 
      } 
    } 
    eventLoop.start() 

event处理函数是processEvent方法

/** Processes all events */ 
  private def processEvent(event: JobGeneratorEvent) { 
    logDebug("Got event " + event) 
    event match { 
      case GenerateJobs(time) => generateJobs(time) 
      case ClearMetadata(time) => clearMetadata(time) 
      case DoCheckpoint(time, clearCheckpointDataLater) => 
        doCheckpoint(time, clearCheckpointDataLater) 
      case ClearCheckpointData(time) => clearCheckpointData(time) 
    } 
  } 

在接受到GenerateJob事件的时候,会执行generateJobs代码,就是在该代码内部产生和调度job的。

/** Generate jobs and perform checkpointing for the given `time`.  */ 
  private def generateJobs(time: Time) { 
    // Checkpoint all RDDs marked for checkpointing to ensure their lineages are 
    // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847). 
    ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true") 
    Try { 
      jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch 
      graph.generateJobs(time) // generate jobs using allocated block 
    } match { 
      case Success(jobs) => 
        val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) 
        jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) 
      case Failure(e) => 
        jobScheduler.reportError("Error generating jobs for time " + time, e) 
        PythonDStream.stopStreamingContextIfPythonProcessIsDead(e) 
    } 
    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) 
  } 

可以看到代码里首先会执行job生成代码

graph.generateJobs(time) 
 
具体代码块儿 
 
def generateJobs(time: Time): Seq[Job] = { 
    logDebug("Generating jobs for time " + time) 
    val jobs = this.synchronized { 
      outputStreams.flatMap { outputStream => 
        val jobOption = outputStream.generateJob(time) 
        jobOption.foreach(_.setCallSite(outputStream.creationSite)) 
        jobOption 
      } 
    } 
    logDebug("Generated " + jobs.length + " jobs for time " + time) 
    jobs 
  } 

每个输出流都会生成一个job,输出流就类似于foreachrdd,print这些。其实内部都是ForEachDStream。所以生成的是一个job集合。

然后就会将job集合提交到线程池里去执行,这些都是在driver端完成的哦。

jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) 
 
具体h函数内容 
def submitJobSet(jobSet: JobSet) { 
    if (jobSet.jobs.isEmpty) { 
      logInfo("No jobs added for time " + jobSet.time) 
    } else { 
      listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo)) 
      jobSets.put(jobSet.time, jobSet) 
      jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job))) 
      logInfo("Added jobs for time " + jobSet.time) 
    } 
  } 

其实就是遍历生成的job集合,然后提交到线程池jobExecutor内部执行。这个也是在driver端的哦。

jobExecutor就是一个固定线程数的线程池,默认是1个线程。

private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1) 
  private val jobExecutor = 
    ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor") 

需要的话可以配置spark.streaming.concurrentJobs来同时提交执行多个job。

那么这种情况下,job就可以并行执行了吗?

显然不是的!

还要修改一下调度模式为Fair,详细的配置可以参考:

http://spark.apache.org/docs/2.3.3/job-scheduling.html#scheduling-within-an-application

简单的均分的话只需要

conf.set("spark.scheduler.mode", "FAIR") 

然后,同时运行的job就会均分所有executor提供的资源。

这就是整个job生成的整个过程了哦。

因为Spark Streaming的任务存在Fair模式下并发的情况,所以需要在使用单例模式生成broadcast的时候要注意声明同步。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

从零到百亿

从零到百亿

〔美〕卡罗·白朗、Karel Baloun、卡罗·白朗 / 译言网 / 中国书籍出版社 / 2007 / 15.00元

Facebook是一个发源于哈佛大学、为全美大学生服务的社交网站。按照流量,这个网站在世界范围排名第8名;按照价值,业界对Facebook公司的估值超过10亿美元。Facebook创建于2004年2月,这样的高速增长成为当今互联网发展的一个奇迹。 《Inside Facebook》这本书是原作者卡罗·白朗(Karel Baloun)作为Facebook的第一位高级软件开发人员之一,在Face......一起来看看 《从零到百亿》 这本书的介绍吧!

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具