内容简介:Spark源码解析:DStream
0x00 前言
本篇是Spark源码解析的第二篇,主要通过源码分析Spark Streaming设计中最重要的一个概念——DStream。
本篇主要来分析Spark Streaming中的Dstream,重要性不必多讲,明白了Spark这个几个数据结构,容易对Spark有一个整体的把握。
和RDD那篇文章类似,虽说是分析Dstream,但是整篇文章会围绕着一个具体的例子来展开。算是对Spark Streaming源码的一个概览。
文章结构
- Spark Streaming的一些概念,主要和Dstream相关
- Dstream的整体设计
- 通过一个具体例子深入讲解
0x01 概念
什么是Spark Streaming
Scalable, high-throughput, fault-tolerant stream processing of live data streams!
一个实时系统,或者说是准实时系统。详细不再描述。
提一点就是,Streaming 的任务最后都会转化为Spark任务,由Spark引擎来执行。
Dstream
It represents a continuous stream of data, either the input data stream received from source, or the processed data stream generated by transforming the input stream.
RDD 的定义是一个只读、分区的数据集(an RDD is a read-only, partitioned collection of records),而 DStream 又是 RDD 的模板,所以我们把 Dstream 也视同数据集。
我的简单理解,Dstream是在RDD上面又封了一层的数据结构。下面是官网对Dstream描述的图。
Spark Streaming和其它实时处理程序的区别
此处是来自Spark作者的论文,写的很好,我就不翻译了,摘出来我关注的点。
我们把实时处理框架分为两种:Record-at-a-time和D-Stream processing model。
Record-at-a-time:
D-Stream processing model:
两者的区别:
Record-at-a-time processing model. Each node continuously receives records, updates internal state, and sends new records. Fault tolerance is typically achieved through replication, using a synchronization protocol like Flux or DPC to ensure that replicas of each node see records in the same order (e.g., when they have multiple parent nodes).
D-Stream processing model. In each time interval, the records that arrive are stored reliably across the cluster to form an immutable, partitioned dataset. This is then processed via deterministic parallel operations to compute other distributed datasets that represent program output or state to pass to the next interval. Each series of datasets forms one D-Stream.
Record-at-a-time的问题:
In a record-at-a-time system, the major recovery challenge is rebuilding the state of a lost, or slow, node.
0x02 源码分析
Dstream
A DStream internally is characterized by a few basic properties:
- A list of other DStreams that the DStream depends on
- A time interval at which the DStream generates an RDD
- A function that is used to generate an RDD after each time interval
Dstream这个数据结构有三块比较重要。
- 父依赖
- 生成RDD的时间间隔
- 一个生成RDD的function
这些对应到代码中的话如下,这些都会有具体的子类来实现,我们在后面的分析中就能看到。 下面先顺着例子一点点讲。
abstract class DStream[T: ClassTag] ( @transient private[streaming] var ssc: StreamingContext ) extends Serializable with Logging { /** Time interval after which the DStream generates an RDD */ def slideDuration: Duration /** List of parent DStreams on which this DStream depends on */ def dependencies: List[DStream[_]] /** Method that generates an RDD for the given time */ def compute(validTime: Time): Option[RDD[T]] // RDDs generated, marked as private[streaming] so that testsuites can access it @transient private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]]() // Reference to whole DStream graph private[streaming] var graph: DStreamGraph = null }
举个栗子
官网最基本的wordcount例子,和Spark的类似。虽简单,但是代表性很强。
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(1)) val lines = ssc.socketTextStream("localhost", 9999) val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) // Print the first ten elements of each RDD generated in this DStream to the console wordCounts.print() ssc.start() // Start the computation ssc.awaitTermination() //
这里涉及到了Dstream之间的转换以及RDD的生成。在这里先看一下Dstream的转换。
Dstream依赖关系
Dstream的一些依赖关系还是要先弄明白的,不然不太容易理解。Dstream依赖图很大,我们只列几个这次关注的。
这里不再详细介绍每一个组件,只放一个图,后面在看源码的时候可以回过头再看,会更清晰。
1. 源码分析: StreamingContext
类
StreamingContext的主要组成,这里我们不再展开讲StreamingContext的作用,我们先讲这个具体的例子,后面会有专门的博客来分析其中一些主要的组件,比如DstreamGraph和JobGenerator。
- JobScheduler : 用于定期生成Spark Job
- JobGenerator
- JobExecutor
- DstreamGraph:包含Dstream之间依赖关系的容器
- StreamingJobProgressListener:监听Streaming Job,更新StreamingTab
- StreamingTab:Streaming Job的标签页
- SparkUI负责展示
class StreamingContext private[streaming] ( _sc: SparkContext, _cp: Checkpoint, _batchDur: Duration ) extends Logging {...}
先看第一行代码做了什么, val lines = ssc.socketTextStream("localhost", 9999)
,看过RDD源码的应该会记得,这一行代码就会做很多Dstream的转换,下面我们慢慢看。
socketTextStream 返回的时一个SocketInputDStream,那么SocketInputDStream是个什么东西?
def socketTextStream( hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): ReceiverInputDStream[String] = withNamedScope("socket text stream") { socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel) } def socketStream[T: ClassTag]( hostname: String, port: Int, converter: (InputStream) => Iterator[T], storageLevel: StorageLevel ): ReceiverInputDStream[T] = { new SocketInputDStream[T](this, hostname, port, converter, storageLevel) }
2. 源码分析: SocketInputDStream
类
这里我们看到SocketInputDStream其实继承了ReceiverInputDStream,这里就出现了第一层的继承关系,可以回头看一下前面的那个图。
它里面没做太多的东西,主要自己写了一个SocketReceiver,其余的主要方法都继承自ReceiverInputDStream。
class SocketInputDStream[T: ClassTag]( _ssc: StreamingContext, host: String, port: Int, bytesToObjects: InputStream => Iterator[T], storageLevel: StorageLevel ) extends ReceiverInputDStream[T](_ssc) { def getReceiver(): Receiver[T] = { new SocketReceiver(host, port, bytesToObjects, storageLevel) } }
3. 源码分析: ReceiverInputDStream
类
ReceiverInputDStream是一个比较重要的类,有很大一部分的Dstream都继承于它。 比如说Kafka的InputDStream。所以说这是一个比较关键的类。
Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]] that has to start a receiver on worker nodes to receive external data. Specific implementations of ReceiverInputDStream must define [[getReceiver]] function that gets the receiver object of type [[org.apache.spark.streaming.receiver.Receiver]] that will be sent to the workers to receive data.
注意:这里重写了一个重要的方法compute。它决定了如何生成RDD。
另外ReceiverInputDStream继承自InputDStream。
abstract class ReceiverInputDStream[T: ClassTag](_ssc: StreamingContext) extends InputDStream[T](_ssc) { /** * Generates RDDs with blocks received by the receiver of this stream. */ override def compute(validTime: Time): Option[RDD[T]] = { val blockRDD = { if (validTime < graph.startTime) { // If this is called for any time before the start time of the context, // then this returns an empty RDD. This may happen when recovering from a // driver failure without any write ahead log to recover pre-failure data. new BlockRDD[T](ssc.sc, Array.empty) } else { // Otherwise, ask the tracker for all the blocks that have been allocated to this stream // for this batch val receiverTracker = ssc.scheduler.receiverTracker val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty) // Register the input blocks information into InputInfoTracker val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum) ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) // Create the BlockRDD createBlockRDD(validTime, blockInfos) } } Some(blockRDD) }
4. 源码分析: InputDStream
类
InputDStream是一个比较重要的抽象,它是所有和Input相关Dstream的抽象类。比如FileInputDStream和我们刚才看的ReceiverInputDStream。
This is the abstract base class for all input streams. This class provides methods start() and stop() which are called by Spark Streaming system to start and stop receiving data, respectively.
Input streams that can generate RDDs from new data by running a service/thread only on the driver node (that is, without running a receiver on worker nodes), can be implemented by directly inheriting this InputDStream.
For example, FileInputDStream, a subclass of InputDStream, monitors a HDFS directory from the driver for new files and generates RDDs with the new files.
For implementing input streams that requires running a receiver on the worker nodes, use [[org.apache.spark.streaming.dstream.ReceiverInputDStream]] as the parent class.
abstract class InputDStream[T: ClassTag](_ssc: StreamingContext) extends DStream[T](_ssc) { override def dependencies: List[DStream[_]] = List() override def slideDuration: Duration = { if (ssc == null) throw new Exception("ssc is null") if (ssc.graph.batchDuration == null) throw new Exception("batchDuration is null") ssc.graph.batchDuration } ... }
注意:到这里,才看完了第一行代码,就是那个读数据的那一行。
5. 源码分析: Dstream.flatMap
方法(以及Dstream如何生成RDD)
Dstream前面已经做过了一些介绍,不再赘述,这里开始按照例子的顺序向下讲。
看我们的第一个转换flatMap。返回了个FlatMappedDStream,并传入一个function。
def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U] = ssc.withScope { new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc)) }
下面转到FlatMappedDStream的分析,里面会设计到如何生存RDD的操作。
class FlatMappedDStream[T: ClassTag, U: ClassTag]( parent: DStream[T], flatMapFunc: T => TraversableOnce[U] ) extends DStream[U](parent.ssc) { override def dependencies: List[DStream[_]] = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[U]] = { parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc)) } }
DStream如何生成RDD?
Get the RDD corresponding to the given time; either retrieve it from cache or compute-and-cache it.
DStream 内部用一个类型是 HashMap 的变量 generatedRDD 来记录已经生成过的 RDD。
注意:compute(time)是用来生成rdd的。
private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = { / 从 generatedRDDs 里 来取rdd:如果有 rdd 就返回,没有 rdd 就进行 orElse 的代码 generatedRDDs.get(time).orElse { // Compute the RDD if time is valid (e.g. correct time in a sliding window) // of RDD generation, else generate nothing. // 验证time是否valid if (isTimeValid(time)) { // 此处调用 compute(time) 方法获得 rdd 实例,并存入 rddOption 变量 val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) { // Disable checks for existing output directories in jobs launched by the streaming // scheduler, since we may need to write output to an existing directory during checkpoint // recovery; see SPARK-4835 for more details. We need to have this call here because // compute() might cause Spark jobs to be launched. // 这个函数在RDD的代码里面,看了一下不是很理解,只能通过注释知道大概意思是不检查输出目录。 PairRDDFunctions.disableOutputSpecValidation.withValue(true) { compute(time) } } rddOption.foreach { case newRDD => // Register the generated RDD for caching and checkpointing if (storageLevel != StorageLevel.NONE) { newRDD.persist(storageLevel) logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel") } if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) { newRDD.checkpoint() logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing") } // 将刚刚实例化出来的 rddOption 放入 generatedRDDs 对应的 time 位置 generatedRDDs.put(time, newRDD) } rddOption } else { None } } }
6. 源码分析: Dstream.map
方法
/** Return a new DStream by applying a function to all elements of this DStream. */ def map[U: ClassTag](mapFunc: T => U): DStream[U] = ssc.withScope { new MappedDStream(this, context.sparkContext.clean(mapFunc)) }
此处值得说明一下,看compute函数 parent.getOrCompute(validTime).map(_.map[U](mapFunc))
,在这里同样调用了Dstream的getOrCompute函数,由于validTime已经存在,因此不重新生成RDD,而是从generatedRDDs中取出来。
然后再执行 .map(_.map[U](mapFunc))
这部分。
class MappedDStream[T: ClassTag, U: ClassTag] ( parent: DStream[T], mapFunc: T => U ) extends DStream[U](parent.ssc) { override def dependencies: List[DStream[_]] = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[U]] = { parent.getOrCompute(validTime).map(_.map[U](mapFunc)) } }
7. 源码分析: reduceByKey
方法
有了看RDD源码的经验,我们很容易找到reduceByKey是在PairDStreamFunctions类中的。下面看一下它的源码。
Return a new DStream by applying reduceByKey
to each RDD. The values for each key are merged using the supplied reduce function. org.apache.spark.Partitioner is used to control the partitioning of each RDD.
def reduceByKey( reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)] = ssc.withScope { combineByKey((v: V) => v, reduceFunc, reduceFunc, partitioner) }
Combine elements of each key in DStream’s RDDs using custom functions. This is similar to the combineByKey for RDDs.
此处,我们仿佛看到了套路,感觉和RDD的设计何其的一致。
这里来了一个ShuffledDStream,具体的Shuffle过程可能会有一点小复杂,暂时不讲,关于shuffle的内容需要再详细地理解一下。
def combineByKey[C: ClassTag]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiner: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true): DStream[(K, C)] = ssc.withScope { val cleanedCreateCombiner = sparkContext.clean(createCombiner) val cleanedMergeValue = sparkContext.clean(mergeValue) val cleanedMergeCombiner = sparkContext.clean(mergeCombiner) new ShuffledDStream[K, V, C]( self, cleanedCreateCombiner, cleanedMergeValue, cleanedMergeCombiner, partitioner, mapSideCombine) }
8. 源码分析: DStream.print
方法
最后的打印函数也有点意思,它调用的时Dstream的print函数。
看 firstNum.take(num).foreach(println)
这一句,打印出了rdd的内容。
*/ def print(num: Int): Unit = ssc.withScope { def foreachFunc: (RDD[T], Time) => Unit = { (rdd: RDD[T], time: Time) => { val firstNum = rdd.take(num + 1) // scalastyle:off println println("-------------------------------------------") println(s"Time: $time") println("-------------------------------------------") firstNum.take(num).foreach(println) if (firstNum.length > num) println("...") println() // scalastyle:on println } } foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false) }
然后呢?
我又发现了一个新的Dstream:ForEachDStream。按照注释来讲,上面的print的操作应该生成的时一个ForEachDStream不过,没找到代码。只能暂时搁置。
An internal DStream used to represent output operations like DStream.foreachRDD.
class ForEachDStream[T: ClassTag] ( parent: DStream[T], foreachFunc: (RDD[T], Time) => Unit, displayInnerRDDOps: Boolean ) extends DStream[Unit](parent.ssc) { override def dependencies: List[DStream[_]] = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[Unit]] = None override def generateJob(time: Time): Option[Job] = { parent.getOrCompute(time) match { case Some(rdd) => val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) { foreachFunc(rdd, time) } Some(new Job(time, jobFunc)) case None => None } } }
0x03 总结
至此,分析完了Dstream的相关源码,这篇和RDD那篇相对来讲都比较基础,主要是对整个流程的梳理,后续会对一些细节的点进行分析。
参考
- Matei Zaharia’s paper (paper写的真心好)
- http://spark.apache.org/
- https://github.com/lw-lin/CoolplaySpark/tree/master/Spark%20Streaming%20%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90%E7%B3%BB%E5%88%97
2017-05-25 23:16:00 lkds
作者: dantezhao | 简书 | CSDN | GITHUB 文章推荐:http://dantezhao.com/readme个人主页:http://dantezhao.com 文章可以转载, 但必须以超链接形式标明文章原始出处和作者信息以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- ReactNative源码解析-初识源码
- Spring源码系列:BeanDefinition源码解析
- Spring源码分析:AOP源码解析(下篇)
- Spring源码分析:AOP源码解析(上篇)
- 注册中心 Eureka 源码解析 —— EndPoint 与 解析器
- 新一代Json解析库Moshi源码解析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
数据密集型应用系统设计
Martin Kleppmann / 赵军平、李三平、吕云松、耿煜 / 中国电力出版社 / 2018-9-1 / 128
全书分为三大部分: 第一部分,主要讨论有关增强数据密集型应用系统所需的若干基本原则。首先开篇第1章即瞄准目标:可靠性、可扩展性与可维护性,如何认识这些问题以及如何达成目标。第2章我们比较了多种不同的数据模型和查询语言,讨论各自的适用场景。接下来第3章主要针对存储引擎,即数据库是如何安排磁盘结构从而提高检索效率。第4章转向数据编码(序列化)方面,包括常见模式的演化历程。 第二部分,我们将......一起来看看 《数据密集型应用系统设计》 这本书的介绍吧!