内容简介:最近公司有一个安全方面的业务,需要实时监控所有访客的情况。之前是定时去查询这类方法仅仅是定义逻辑,并不会立即执行,即lazy特性。目的是将一个RDD转为新的RDD。不会产生新的RDD,而是直接运行,得到我们想要的结果。
最近公司有一个安全方面的业务,需要实时监控所有访客的情况。之前是定时去查询 Elasticsearch
的接口进行统计分析,但是这个时间间隔不好把握,并且 Elasticsearch
并不适合特别实时的查询操作。实时的分布式流计算引擎首推 Spark
,它与 Hadoop
等相比的优势在 这里
讲得比较清楚了。
try...catch... pip install pyspark
基本运算
- 下面是所有运算方法的集合,其中有些方法仅用于键值对,有些方法仅用于数据流
Transformation(转换)
这类方法仅仅是定义逻辑,并不会立即执行,即lazy特性。目的是将一个RDD转为新的RDD。
datas.map(lambda a, (a, 1))
Action(执行)
不会产生新的RDD,而是直接运行,得到我们想要的结果。
- collect(): 以数组的形式,返回数据集中所有的元素
- count(): 返回数据集中元素的个数
- take(n): 返回数据集的前N个元素
- takeOrdered(n): 升序排列,取出前N个元素
- takeOrdered(n, lambda x: -x): 降序排列,取出前N个元素
- first(): 返回数据集的第一个元素
- min(): 取出最小值
- max(): 取出最大值
- stdev(): 计算标准差
- sum(): 求和
- mean(): 平均值
- countByKey(): 统计各个key值对应的数据的条数
- lookup(key): 根据传入的key值来查找对应的Value值
- foreach(func): 对集合中每个元素应用func
Persistence(持久化)
- persist(): 将数据按默认的方式进行持久化
- unpersist(): 取消持久化
- saveAsTextFile(path): 将数据集保存至文件
应用场景
创建简单的RDD
from pyspark.sql import SparkConf, SparkContext rdd = sc.parallelize(['abc', def']) // 直接创建rdd
读取CSV文件
from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("test") \ .config("spark.some.config.option", "一些设置") \ .getOrCreate() df = spark.read.csv("/home/Users/haofly/test.csv", header=True, sep="|") # 读取文件 print(df.collect())
Spark Streaming流计算
Spark Streaming StreamingContext
从文件流读取数据
conf = SparkConf().setAppName("test").setMaster("local[2]") # 表示运行在本地模式,并且启动2个工作线程 sc = SparkContext(conf=conf) ssc = StreamingContext(sc, 30) # 每隔10秒钟自动进行一次流计算 lines = ssc.textFileStream('file:///Users/haofly/log') words = lines.map(lambda line: line.strip()) words.pprint() ssc.start() ssc.awaitTermination()
从kafka读取数据
首先得从 maven仓库
下载对应的版本,注意这里需要下载 assembly
的包,这里的2.11是scala的版本,2.4.3是pyspark的版本号,也是spark的版号,如果下载后的包不能用,那就尝试换一个版本吧。可以通过这篇文章搭建测试用的kafka集群
# 指定spark-streaming-kafka的jar包 os.environ[ "PYSPARK_SUBMIT_ARGS" ] = "--jars /test/jars/kafka/spark-streaming-kafka-0-8-assembly_2.11-2.4.3.jar pyspark-shell" conf = SparkConf().setAppName("test").setMaster("local[2]") # 表示运行在本地模式,并且启动2个工作线程 sc = SparkContext(conf=conf) ssc = StreamingContext(sc, 10) # 每隔10秒钟自动进行一次流计算 zkQuorum, topic = "zookeeper:2181", "test" kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1}) lines = kvs.map(lambda x: x[1]) def myadd(a, b): # 只能在传入的函数中捕获异常 try: return a+b catch: pass # tolog def myadd_inv(a, b): return a-b rdd = lines.map(lambda x: (x, 1)).reduceByKeyAndWIndow(myadd, myadd_inv, 60) # 统计时间窗口60秒内的数据 rdd.pprint() # 每次统计都打印rdd的数据 ssc.start() # 异步执行 ssc.awaitTermination() # 等待终止信号
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。