内容简介:SparkStreaming是流式处理框架,是Spark API的扩展,支持可扩展、高吞吐量、容错的实时数据流处理,实时数据的来源可以是:Kafka, Flume, Twitter, ZeroMQ或者TCP sockets,并且可以使用高级功能的复杂算子来处理流数据。例如:map,reduce,join,window 。最终,处理后的数据可以存放在文件系统,数据库等,方便实时展现。Spark Streaming是将流式计算分解成一系列短小的批处理作业。这里的批处理引擎是Spark Core,也就是把Spar
其他更多 java 基础文章:
java基础学习(目录)概述
SparkStreaming是流式处理框架,是Spark API的扩展,支持可扩展、高吞吐量、容错的实时数据流处理,实时数据的来源可以是:Kafka, Flume, Twitter, ZeroMQ或者TCP sockets,并且可以使用高级功能的复杂算子来处理流数据。例如:map,reduce,join,window 。最终,处理后的数据可以存放在文件系统,数据库等,方便实时展现。
运行原理
Spark Streaming架构
Spark Streaming是将流式计算分解成一系列短小的批处理作业。这里的批处理引擎是Spark Core,也就是把Spark Streaming的输入数据按照batch interval(如5秒)分成一段一段的数据(Discretized Stream),每一段数据都转换成Spark中的RDD(Resilient Distributed Dataset),然后将Spark Streaming中对DStream的Transformation操作变为针对Spark中对RDD的Transformation操作,将RDD经过操作变成中间结果保存在内存中。整个流式计算根据业务的需求可以对中间的结果进行叠加或者存储到外部设备
DStream
DStream(Discretized Stream)作为Spark Streaming的基础抽象,它代表持续性的数据流。这些数据流既可以通过外部输入源赖获取,也可以通过现有的Dstream的transformation操作来获得。在内部实现上,DStream由一组时间序列上连续的RDD来表示。每个RDD都包含了自己特定时间间隔内的数据流。
下面是DStream的创建例子:
SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("NetworkWordCount") .set("spark.testing.memory","2147480000"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1)); JavaReceiverInputDStream<String> lines = jssc.socketTextStream("master", 9999); 复制代码
API
transform算子
Transformation | 含义 |
---|---|
map(func) | 对DStream中的各个元素进行func函数操作,然后返回一个新的DStream |
flatMap(func) | 与map方法类似,只不过各个输入项可以被输出为零个或多个输出项 |
filter(func) | 过滤出所有函数func返回值为true的DStream元素并返回一个新的DStream |
repartition(numPartitions) | 增加或减少DStream中的分区数,从而改变DStream的并行度 |
union(otherStream) | 将源DStream和输入参数为otherDStream的元素合并,并返回一个新的DStream. |
count() | 通过对DStream中的各个RDD中的元素进行计数,然后返回只有一个元素的RDD构成的DStream |
reduce(func) | 对源DStream中的各个RDD中的元素利用func进行聚合操作,然后返回只有一个元素的RDD构成的新的DStream. |
countByValue() | 对于元素类型为K的DStream,返回一个元素为(K,Long)键值对形式的新的DStream,Long对应的值为源DStrea |
reduceByKey(func, [numTasks]) | 利用func函数对源DStream中的key进行聚合操作,然后返回新的(K,V)对构成的DStream |
join(otherStream, [numTasks]) | 输入为(K,V)、(K,W)类型的DStream,返回一个新的(K,(V,W)类型的DStream |
cogroup(otherStream, [numTasks]) | 输入为(K,V)、(K,W)类型的DStream,返回一个新的 (K, Seq[V], Seq[W]) 元组类型的DStream |
transform(func) | 通过RDD-to-RDD函数作用于DStream中的各个RDD,可以是任意的RDD操作,从而返回一个新的RDD |
updateStateByKey(func) | 根据于key的前置状态和key的新值,对key进行更新,返回一个新状态的DStream |
Windows Operation
总结:
- batch interval:5s
每隔5秒切割一次batch,封装成DStream - window length:15s
进行计算的DStream中包含15s的数据。即3个batch interval - sliding interval:10s
每隔10s取最近3个batch(window length)封装的DStream,封装成一个更大的DStream进行计算
/** * batch interval:5s * sliding interval:10s * window length: 60s * 所以每隔 10s 会取 12 个 rdd,在计算的时候会将这 12 个 rdd 聚合起来 * 然后一起执行 reduceByKeyAndWindow 操作 * reduceByKeyAndWindow 是针对窗口操作的而不是针对 DStream 操作的 */ JavaPairDStream<String, Integer> searchWordCountsDStream = searchWordPairDStream.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }, Durations.seconds(60), Durations.seconds(10)); 复制代码
优化Windows Operation
假设 batch=1s, window length=5s, sliding interval=1s, 那么每个 DStream 重复计算了 5 次,优化后, (t+4)时刻的 Window 由(t+3)时刻的 Window 和(t+4)时刻的 DStream 组成, 由于(t+3)时刻的 Window 包含(t-1)时刻的 DStream,而(t+4)时刻的 Window 中不需要包含(t-1) 时刻的 DStream,所以还需要减去(t-1)时刻的 DStream,所以: Window(t+4) = Window(t+3) + DStream(t+4) - DStream(t-1)。 注意,使用此方法必须设置checkpoint目录,用来保存Window(t+3)的数据//必须设置 checkpoint 目录 jssc.checkpoint("hdfs://node01:8020/spark/checkpoint"); JavaPairDStream<String, Integer> searchWordCountsDStream = searchWordPairDStream.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } },new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 - v2; } }, Durations.seconds(60), Durations.seconds(10)); 复制代码
Driver HA
提交任务时设置
spark-submit –supervise 复制代码
以集群方式提交到 yarn 上时, Driver 挂掉会自动重启,不需要任何设置
提交任务,在客户端启动 Driver,那么不管是提交到 standalone 还是 yarn, Driver 挂掉后 都无法重启
代码中配置
上面的方式重新启动的 Driver 需要重新读取 application 的信息然后进行任务调度,实 际需求是,新启动的 Driver 可以直接恢复到上一个 Driver 的状态(可以直接读取上一个 StreamingContext 的 DSstream 操作逻辑和 job 执行进度,所以需要把上一个 StreamingContext 的元数据保存到 HDFS 上) ,直接进行任务调度,这就需要在代码层面进 行配置。
public class SparkStreamingOnHDFS { public static void main(String[] args) { final SparkConf conf = new SparkConf() .setMaster("local[1]") .setAppName("SparkStreamingOnHDFS"); //这里可以设置一个线程,因为不需要一个专门接收数据的线程,而是监控一个目录 final String checkpointDirectory = "hdfs://node01:9000/spark/checkpoint"; JavaStreamingContextFactory factory = new JavaStreamingContextFactory() { @Override public JavaStreamingContext create() { return createContext(checkpointDirectory,conf); } }; JavaStreamingContext jsc = JavaStreamingContext.getOrCreate(checkpointDirectory, factory); jsc.start(); jsc.awaitTermination(); // jsc.close(); } @SuppressWarnings("deprecation") private static JavaStreamingContext createContext(String checkpointDirectory,SparkConf conf) { System.out.println("Creating new context"); SparkConf sparkConf = conf; //每隔 15s 查看一下监控的目录中是否新增了文件 JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(15)); ssc.checkpoint(checkpointDirectory); /** * 只是监控文件夹下新增的文件,减少的文件是监控不到的, 文件内容有改动也是监控不到 */ JavaDStream<String> lines = ssc.textFileStream("hdfs://node01:8020/spark"); /** * 接下来可以写业务逻辑,比如 wordcount */ return ssc; } } 复制代码
执行一次程序后, JavaStreamingContext 会在 checkpointDirectory 中保存,当修 改了业务逻辑后,再次运行程序, JavaStreamingContext.getOrCreate(checkpointDirectory, factory); 因为 checkpointDirectory 中有这个 application 的 JavaStreamingContext,所以不会 调用 JavaStreamingContextFactory 来创建 JavaStreamingContext,而是直接 checkpointDirectory 中的 JavaStreamingContext,所以即使业务逻辑改变了,执行的效 果也是之前的业务逻辑, 如果需要执行修改过的业务逻辑,可以修改或删除 checkpointDirectory
以上所述就是小编给大家介绍的《Spark Streaming学习——DStream》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- 一文读懂监督学习、无监督学习、半监督学习、强化学习这四种深度学习方式
- 学习:人工智能-机器学习-深度学习概念的区别
- 统计学习,机器学习与深度学习概念的关联与区别
- 混合学习环境下基于学习行为数据的学习预警系统设计与实现
- 学习如何学习
- 深度学习的学习历程
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
大型网站技术架构演进与性能优化
许令波 / 电子工业出版社 / 2018-6 / 79
《大型网站技术架构演进与性能优化》从一名亲历者的角度,阐述了一个网站在业务量飞速发展的过程中所遇到的技术转型等各种问题及解决思路。从技术发展上看,网站经历了Web应用系统从分布式、无线多端、中台到国际化的改造;在解决大流量问题的方向上,涉及了从端的优化到管道到服务端甚至到基础环境优化的各个层面。 《大型网站技术架构演进与性能优化》总结的宝贵经验教训可以帮助读者了解当网站遇到类似问题时,应如何......一起来看看 《大型网站技术架构演进与性能优化》 这本书的介绍吧!