内容简介:最近公司有一个安全方面的业务,需要实时监控所有访客的情况。之前是定时去查询这类方法仅仅是定义逻辑,并不会立即执行,即lazy特性。目的是将一个RDD转为新的RDD。不会产生新的RDD,而是直接运行,得到我们想要的结果。
最近公司有一个安全方面的业务,需要实时监控所有访客的情况。之前是定时去查询 Elasticsearch
的接口进行统计分析,但是这个时间间隔不好把握,并且 Elasticsearch
并不适合特别实时的查询操作。实时的分布式流计算引擎首推 Spark
,它与 Hadoop
等相比的优势在 这里
讲得比较清楚了。
try...catch... pip install pyspark
基本运算
- 下面是所有运算方法的集合,其中有些方法仅用于键值对,有些方法仅用于数据流
Transformation(转换)
这类方法仅仅是定义逻辑,并不会立即执行,即lazy特性。目的是将一个RDD转为新的RDD。
datas.map(lambda a, (a, 1))
Action(执行)
不会产生新的RDD,而是直接运行,得到我们想要的结果。
- collect(): 以数组的形式,返回数据集中所有的元素
- count(): 返回数据集中元素的个数
- take(n): 返回数据集的前N个元素
- takeOrdered(n): 升序排列,取出前N个元素
- takeOrdered(n, lambda x: -x): 降序排列,取出前N个元素
- first(): 返回数据集的第一个元素
- min(): 取出最小值
- max(): 取出最大值
- stdev(): 计算标准差
- sum(): 求和
- mean(): 平均值
- countByKey(): 统计各个key值对应的数据的条数
- lookup(key): 根据传入的key值来查找对应的Value值
- foreach(func): 对集合中每个元素应用func
Persistence(持久化)
- persist(): 将数据按默认的方式进行持久化
- unpersist(): 取消持久化
- saveAsTextFile(path): 将数据集保存至文件
应用场景
创建简单的RDD
from pyspark.sql import SparkConf, SparkContext rdd = sc.parallelize(['abc', def']) // 直接创建rdd
读取CSV文件
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("test") \
.config("spark.some.config.option", "一些设置") \
.getOrCreate()
df = spark.read.csv("/home/Users/haofly/test.csv", header=True, sep="|") # 读取文件
print(df.collect())
Spark Streaming流计算
Spark Streaming StreamingContext
从文件流读取数据
conf = SparkConf().setAppName("test").setMaster("local[2]") # 表示运行在本地模式,并且启动2个工作线程
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 30) # 每隔10秒钟自动进行一次流计算
lines = ssc.textFileStream('file:///Users/haofly/log')
words = lines.map(lambda line: line.strip())
words.pprint()
ssc.start()
ssc.awaitTermination()
从kafka读取数据
首先得从 maven仓库
下载对应的版本,注意这里需要下载 assembly
的包,这里的2.11是scala的版本,2.4.3是pyspark的版本号,也是spark的版号,如果下载后的包不能用,那就尝试换一个版本吧。可以通过这篇文章搭建测试用的kafka集群
# 指定spark-streaming-kafka的jar包
os.environ[
"PYSPARK_SUBMIT_ARGS"
] = "--jars /test/jars/kafka/spark-streaming-kafka-0-8-assembly_2.11-2.4.3.jar pyspark-shell"
conf = SparkConf().setAppName("test").setMaster("local[2]") # 表示运行在本地模式,并且启动2个工作线程
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 10) # 每隔10秒钟自动进行一次流计算
zkQuorum, topic = "zookeeper:2181", "test"
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
lines = kvs.map(lambda x: x[1])
def myadd(a, b): # 只能在传入的函数中捕获异常
try:
return a+b
catch:
pass # tolog
def myadd_inv(a, b):
return a-b
rdd = lines.map(lambda x: (x, 1)).reduceByKeyAndWIndow(myadd, myadd_inv, 60) # 统计时间窗口60秒内的数据
rdd.pprint() # 每次统计都打印rdd的数据
ssc.start() # 异步执行
ssc.awaitTermination() # 等待终止信号
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
大思维:集体智慧如何改变我们的世界
杰夫·摩根 / 郭莉玲、尹玮琦、徐强 / 中信出版集团股份有限公司 / 2018-8-1 / CNY 65.00
智能时代,我们如何与机器互联,利用技术来让我们变得更聪明?为什么智能技术不会自动导致智能结果呢?线上线下群体如何协作?社会、政府或管理系统如何解决复杂的问题?本书从哲学、计算机科学和生物学等领域收集见解,揭示了如何引导组织和社会充分利用人脑和数字技术进行大规模思考,从而提高整个集体的智力水平,以解决我们时代的巨大挑战。是英国社会创新之父的洞见之作,解析企业、群体、社会如何明智决策、协作进化。一起来看看 《大思维:集体智慧如何改变我们的世界》 这本书的介绍吧!
RGB HSV 转换
RGB HSV 互转工具
RGB CMYK 转换工具
RGB CMYK 互转工具