内容简介:过去一段时间很长的一段时间内都在写用 spark streaming 来做一些规则引擎的工作,工作第一阶段暂时告一段落,这里简单做一下总结。streaming 不言而喻,也就是实时流式处理。严格来说 spark streaming 并不能算实时流式处理,它的工作原理是一种 micro batch 的方式,也就是说它会将很多 record 放在一起组成一个 batch 然后当成一个批处理作业进行处理。这也是它和 storm, flink 最本质的区别。
过去一段时间很长的一段时间内都在写用 spark streaming 来做一些规则引擎的工作,工作第一阶段暂时告一段落,这里简单做一下总结。
0. spark streaming 是什么
streaming 不言而喻,也就是实时流式处理。严格来说 spark streaming 并不能算实时流式处理,它的工作原理是一种 micro batch 的方式,也就是说它会将很多 record 放在一起组成一个 batch 然后当成一个批处理作业进行处理。这也是它和 storm, flink 最本质的区别。
micro batch 的好处在于吞吐更大,延迟取决于 batch interval,如果对于实时要求不是特别的高,同时也在使用 Spark 的其他功能,Spark Streaming 往往是一个不错的选择。
1. 数据模型
DStream,也就是 discretized stream,是 Spark Streaming 提供了一种 high level 的抽象,用来表示数据流,数据一般从 Receiver(比如 Kafka, Flume等) 中获取。在内部,DStream 由一系列的 RDD 组成。RDD 是 Spark 中定义的一种数据模型,全称是 Resilent Distributed Dataset,可以简单理解为一个不可变的分布式数据集合。这些写代码的时候就可以像下面这么写:
DStream.foreachRDD(rdd => {rdd 处理})
大括号里面的 rdd 处理和普通的 Spark 程序处理基本没有区别,主要是通过一系列的 RDD 算子构造一个 DAG。这样其实就是把 Spark Streaming 转化成了一个个 Spark 作业了。
2. Streaming Context
StreamingContext 是 Spark Streaming 程序的入口,我们一般先初始化一个 SparkConf,然后 StreamingContext 初始化的时候使用这个 SparkConf,代码如下。
val conf = new SparkConf().setMaster("local[2]").setAppName("Example App") val ssc = new StreamingContext(conf, batchInterval) // create DStream with ssc // Dstream process ssc.start() ssc.awaitTermination()
local[2] 表示 local 模式使用 2 个线程运行 Spark Streaming 程序,注意如果是 local 模式一定要多初始化几个线程,因为 receiver 会独占一个线程,也就是 n > receiver_num。
3. Receiver
通过上面初始化的 ssc 就可以构造 DStream 了,比如 Spark 自带的 WordCount 示例代码。
// tcp as receiver val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER) // kafka as receiver val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) // flume as receiver // Create a flume stream val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)
Spark Streaming 支持的 receiver 有 Kafka, Kinesis, Flume, Tcp socket,已经通过其他算子产生的流。除此之外还支持 custom Receiver,customReceiver 继承类 org.apache.spark.streaming.receiver.Receiver
,然后实现特定的方法即可。示例代码如下。
// Create an input stream with the custom receiver on target ip:port and count the // words in input stream of \n delimited text (eg. generated by 'nc') val lines = ssc.receiverStream(new CustomReceiver(args(0), args(1).toInt)) class CustomReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging { def onStart() { // Start the thread that receives data over a connection new Thread("Socket Receiver") { override def run() { receive() } }.start() } def onStop() { // There is nothing much to do as the thread calling receive() // is designed to stop by itself isStopped() returns false } /** Create a socket connection and receive data until receiver is stopped */ private def receive() { var socket: Socket = null var userInput: String = null try { logInfo("Connecting to " + host + ":" + port) socket = new Socket(host, port) logInfo("Connected to " + host + ":" + port) val reader = new BufferedReader( new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8)) userInput = reader.readLine() while(!isStopped && userInput != null) { store(userInput) userInput = reader.readLine() } reader.close() socket.close() logInfo("Stopped receiving") restart("Trying to connect again") } catch { case e: java.net.ConnectException => restart("Error connecting to " + host + ":" + port, e) case t: Throwable => restart("Error receiving data", t) } } }
4. DStream Join
如果在一个 Spark Streaming 程序里面要处理多个 DStream 怎么办呢?DStream Join
val stream1: DStream[String, String] = ... val stream2: DStream[String, String] = ... val joinedStream = stream1.join(stream2)
5. 算子介绍
Spark RDD 支持的算子基本都可以应用到 DStream 上。
Transformation
- map (func):DStream 每个 record 通过 func 产生一条新的记录,并组成一个新的 DStream
- flatMap (func): 类似 map ,区别在于每个 record 可以产生多个 record
- filter (func): 返回 func(record) = true 的 record 组成的新的 DStream
- repartition (numPartition): 调整 DStream 的 partition 个数
- union (otherStream): 组合多个 DStream
- count (): 返回 DStream 的个数
- reduce (func): 通过 func 处理 Dstream 里的所有元素最后返回一个值
- countByValue (): 返回 key 的频率统计
- reduceByKey (func, [numTasks]): 对于 pair 数据 (K,V),对相同的 K 进行聚合执行操作 func(V1, V2)
- join (otherStream, [numTasks])
- cogroup (otherStream, [numTasks])
- transform (func): 对 DStream 里面的每个 RDD 都执行一下 func 操作,返回一个新的 DStrem
- updateStateByKey (func): 通过联合多个 DStream,保存 key 的状态信息。
Output Operation
- print (): 返回 DStream 里面每个 batch 的前十个元素
- saveAsTextFiles (prefix, [suffix]): 保存为 Text File
- saveAsObjectFiles (prefix, [suffix]): 保存为 SequenceFiles
- saveAsHadoopFiles (prefix, [suffix]): 保存为 Hadoop 文件
- foreachRDD (func): 对于 DStream 中的每个 RDD 执行 func 操作,func 操作执行在 Driver 上。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Google API开发详解
江宽,龚小鹏等编 / 电子工业 / 2008-1 / 59.80元
《Google API开发详解:Google Maps与Google Earth双剑合璧》从易到难、由浅入深、循序渐进地介绍了Google Maps API和Google Earth API的开发技术。《Google API开发详解:Google Maps与Google Earth双剑合璧》知识讲解通俗易懂,并有大量的实例供读者更加深刻地巩固所学习的知识,帮助读者更好地进行开发实践。 《Go......一起来看看 《Google API开发详解》 这本书的介绍吧!
MD5 加密
MD5 加密工具
html转js在线工具
html转js在线工具