内容简介:很多重要的应用要处理大量在线流式数据, 并返结果,比如社交网络趋势追踪,网站指标统计,广告系统,可以使用Spark Streaming来处理流计算的处理流程一般包含三个阶段:数据实时采集、数据实时计算、实时查询服务Spark Streaming可整合多种输入数据源,如Kafka、Flume、HDFS,甚至是普通的TCP套接字。经处理后的数据可存储至文件系统、数据库,或显示在仪表盘里
很多重要的应用要处理大量在线流式数据, 并返结果,比如社交网络趋势追踪,网站指标统计,广告系统,可以使用Spark Streaming来处理
流计算的处理流程一般包含三个阶段:数据实时采集、数据实时计算、实时查询服务
Spark Streaming可整合多种输入数据源,如Kafka、Flume、HDFS,甚至是普通的TCP套接字。经处理后的数据可存储至文件系统、数据库,或显示在仪表盘里
Spark Streaming的 基本原理 是将实时输入数据流以时间片(秒级)为单位进行拆分,然后经Spark引擎以类似批处理的方式处理每个时间片数据
Storm和SparkStreaming对比
Storm | SparkStreaming |
---|---|
流式计算框架 | 批处理计算框架 |
可以实现毫秒级响应 | 只能到秒级 |
以record为单位处理数据 | 以RDD为单位处理数据 |
支持micro-batch方式(Trident) , 将批处理转化为流式处理问题 | 支持micro-batch流式处理数据(Spark Streaming),将流式处理转化为批处理问题 |
目前SparkStreaming用的越来越多,为什么呢?
易用性好
- 供很多高级算子, 实现复杂运算非常简单
-
流式API和批处理API很类似, 学习成本低
平台统一 - 不需要维护两套系统分别用于批处理和流式处理
-
可以自由调用Spark的组件如Spark SQL & Mllib
生态丰富 - 支持各种数据源和数据格式
- 社区活跃, 发展迅猛
Spark Streaming的运行步骤:
- Driver端会要求Executor端启动接收器Receiver,接收数据
- 每个Receiver都会负责一个inputDStream(比如从文件中读取数据的文件流,比如套接字流,或者从Kafka中读取的一个输入流等等)
- Receiver会把接收到的数据分解生多个block放在内存中,如果设置了多副本它会把数据备份,然后会把blocks的信息高速StreamingContext。
- 当一个批次的时间到了之后,StreamingContext会告诉SparkContext,让SparkContext把job分发到Eexcutor上去执行
编写Spark Streaming程序的基本步骤
- 通过创建输入DStream来定义输入源
- 通过对DStream应用转换操作和输出操作来定义流计算
- 用streamingContext.start()来开始接收数据和处理流程
- 通过streamingContext.awaitTermination()方法来等待处理结束(手动结束或因为错误而结束)
- 可以通过streamingContext.stop()来手动结束流计算进程
//创建Spark Streaming的上下文 JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1)); //获取数据 这里是从socket中获取 JavaReceiverInputDStream<String> lines = ssc.socketTextStream( args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER); //对获取的数据进行转换 JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String x) { return Arrays.asList(SPACE.split(x)).iterator(); } }); JavaPairDStream<String, Integer> wordCounts = words.mapToPair( new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) { return new Tuple2<>(s, 1); } }).reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); //输出结果 wordCounts.print(); //启动 ssc.start(); ssc.awaitTermination();
Spark Streaming的输入流的数据来源
监听一个文件夹下的文件的变化
应用场景: 日志处理
JavaDStream<String> lines = streamingContext.textFileStream("/data/input");
注意:textFileStream每个batch只会探测新产生的文件, 已有文件修改后, 不会
再被处理
以socket连接作为数据源读取数每隔固定周期从socket上拉取数据, 放入内
存形成RDD
JavaReceiverInputDStream<String> lines = streamingContext.socketTextStream("master", 9999 )
以RDD队列方式创建数据源
streamingContext.queueStream(queueOfRDD)
kafka作为输入源在公司的大数据生态系统中,可以把Kafka作为数据交换枢纽,不同类型的分布式系统(关系数据库、NoSQL数据库、流处理系统、批处理系统等),可以统一接入到Kafka,实现和Hadoop各个组件之间的不同类型数据的实时高效交换
安装文件为Kafka_2.11-0.10.2.0.tgz,前面的2.11就是该Kafka所支持的Scala版本号,后面的0.10.2.0是Kafka自身的版本号
对于Spark2.1.0版本,如果要使用Kafka,则需要下载spark-streaming-kafka-0-8_2.11相关jar包,下载地址:
http://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11/2.1.0Spark Streaming提供了两种访问Kafka的API
Receiver-based Approach:最原始的API,启动若干特殊的Task(Receiver),从Kafka上拉取数据,保存成RDD供处
理
Direct Approach:新API,无需启动Receiver,每一轮Sparkjob直接从Kafka上读取数据
import java.util.*; import org.apache.spark.SparkConf; import org.apache.spark.TaskContext; import org.apache.spark.api.java.*; import org.apache.spark.api.java.function.*; import org.apache.spark.streaming.api.java.*; import org.apache.spark.streaming.kafka010.*; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import scala.Tuple2; Map<String, Object> kafkaParams = new HashMap<>(); kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092"); kafkaParams.put("key.deserializer", StringDeserializer.class); kafkaParams.put("value.deserializer", StringDeserializer.class); kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream"); kafkaParams.put("auto.offset.reset", "latest"); kafkaParams.put("enable.auto.commit", false); Collection<String> topics = Arrays.asList("topicA", "topicB"); JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream( streamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams) ); stream.mapToPair(record -> new Tuple2<>(record.key(), record.value()));
事先设定一个滑动窗口的长度(也就是窗口的持续时间)
设定滑动窗口的时间间隔(每隔多长时间执行一次计算),让窗口按照指定时间间隔在源DStream上滑动
每次窗口停放的位置上,都会有一部分Dstream(或者一部分RDD)被框入窗口内,形成一个小段的Dstream
可以启动对这个小段DStream的计算
- window(windowLength, slideInterval) 基于源DStream产生的窗口化的批数据,计算得到一个新的Dstream
- countByWindow(windowLength, slideInterval) 返回流中元素的一个滑动窗口数
- reduceByWindow(func, windowLength, slideInterval) 返回一个单元素流。利用函数func聚集滑动时间间隔的流的元素创建这个单元素流。函数func必须满足结合律,从而可以支持并行计算
- reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) 应用到一个(K,V)键值对组成的DStream上时,会返回一个由(K,V)键值对组成的新的DStream。每一个key的值均由给定的reduce函数(func函数)进行聚合计算。注意:在默认情况下,这个算子利用了Spark默认的并发任务数去分组。可以通过numTasks参数的设置来指定不同的任务数
- reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) 更加高效的reduceByKeyAndWindow,每个窗口的reduce值,是基于先前窗口的reduce值进行增量计算得到的;它会对进入滑动窗口的新数据进行reduce操作,并对离开窗口的老数据进行“逆向reduce”操作。但是,只能用于“可逆reduce函数”,即那些reduce函数都有一个对应的“逆向reduce函数”(以InvFunc参数传入)
- countByValueAndWindow(windowLength, slideInterval, [numTasks]) 当应用到一个(K,V)键值对组成的DStream上,返回一个由(K,V)键值对组成的新的DStream。每个key的值都是它们在滑动窗口中出现的频率
SparkStreaming 一定要做CheckPoint
JavaStreamingContext ssc = JavaStreamingContext.getOrCreate(checkPointDir, new Function0<JavaStreamingContext>() { @Override public JavaStreamingContext call() throws Exception { return createContext(checkPointDir, host, port); } });
需要在跨批次之间维护状态时,就必须使用updateStateByKey操作
mapWithState由Spark Streaming自己维护历史状态信息,而不是借助外部存储系统, 比如redis
tweets.mapWithStates(tweet => updateMood(tweet))
JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateWordCounts = wordPairs.mapWithState(StateSpec.function(new Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> call(String word, Optional<Integer> one, State<Integer> state) throws Exception { Option<Integer> stateCount = state.getOption(); Integer sum = one.orElse(0); if (stateCount.isDefined()) { sum += stateCount.get(); } state.update(sum); return new Tuple2<String, Integer>(word, sum); } }));
将DStream的每个RDD转变为另外一个RDD
– 参数: transformFunc: RDD[T] => RDD[U]
words.transform(new Function<JavaRDD<String>, JavaRDD<String>>() { @Override public JavaRDD<String> call(JavaRDD<String> rdd) throws Exception { return rdd.distinct(); } });
将处理过的数据输出到外部系统,跟Spark SQL中的“action”一个概念
- saveAsTextFiles
- saveAsObjectFiles
- saveAsHadoopFiles
- 自定义输出 (用得最多)– foreachRDD
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。