内容简介:sparktransformationaction
spark 算子分为两大种,一种是 transformation 算子,另一种是 action 算子。
transformation 又叫转换算子,它从一个 RDD 到另一个 RDD 是延迟执行的,不会马上触发作业的提交,只有在后续遇到某个 action 算子时才执行;
action 算子会触发 SparkContext 提交 Job ,并将数据输出 spark 系统。今天举例讲解一下 action 算子。
1) count
就是统计 RDD 中元素个数的算子。
举个栗子:
val rdd = sc.parallelize(
List( "hello" , "world!" , "hi" , "beijing" ))
println (rdd.count())
输出:
4
2) co llect
把 RDD 中的元素提取到 driver 内存中,返回数组形式。
举个栗子:
val rdd = sc .parallelize(
List ( "hello" , "world!" , "hi" , "beijing" ) , 2 )
val arr : Array[ String ] = rdd.collect()
println ( arr )
arr .foreach( println )
输出:
[Ljava.lang.String;@760e8cc0
hello
world!
hi
beijing
3) foreach
遍历 RDD 中的每一个元素,无返回值。此算子用法参考上下文。
4) saveAsTextFile
把 RDD 中的数据以文本的形式保存
举个栗子:
val rdd = sc.parallelize(
List( 5 , 4 , 7 , 1 , 9 ) , 3 )
rdd.saveAsTextFile(
"/home/myname/test" )
5) saveAsSequenceFile
是个 k-v 算子,把 RDD 中的数据以序列化的形式保存。使用此算子的前提是 RDD 中元素是键值对格式。
举个栗子:
val rdd = sc .parallelize(
List (( "a" , 1 ) , ( "b" , 2 ) , ( "c" , 3 ) , ( "c" , 4 )) , 2 )
rdd.saveAsSequenceFile(
"/home/myname/test" )
6) countByKey
是个 k-v 算子,按 key 统计各 key 的次数,返回 Map
举个栗子:
val rdd = sc.parallelize(
List(( "a" , 1 ) , ( "b" , 2 ) , ( "c" , 3 ) , ( "c" , 4 )) , 2 )
val res: Map[ String , Long ] = rdd
.countByKey()
res.foreach( println )
输出:
(b,1)
(a,1)
(c,2)
7) collectAsMap
把 RDD 中元素以 Map 形式提取到 driver 端。需要注意的是如果存在多个相同 key ,后面出现的会覆盖前面的。
举个栗子:
val rdd = sc .parallelize(
List (( "a" , 1 ) , ( "b" , 2 ) , ( "c" , 3 ) , ( "c" , 4 )) , 2 )
val res: Map[ String , Int ] = rdd
.collectAsMap()
res.foreach( println )
输出:
(b,2)
(a,1)
(c,4)
8) take
从 RDD 中取下标前 n 个元素,不排序。返回数组。
举个栗子:
val rdd = sc.parallelize(
List( 5 , 4 , 7 , 1 , 9 ) , 3 )
val take: Array[ Int ] = rdd
.take( 2 )
take.foreach( println )
输出:
5
4
9) takeSample
从指定 RDD 中抽取样本。第一个参数为 false 表示取过的元素不再取,为 true 表示取过的元素可以再次被抽样;第二个参数表示取样数量;第三个参数不好把握建议默认值
举个栗子:
val rdd = sc .makeRDD(
Array ( "aaa" , "bbb" , "ccc" , "ddd" , "eee" ))
val sample: Array[ String ] = rdd
.takeSample( false , 2 )
sample.foreach( println )
输出:
eee
bbb
10) first
返回 RDD 中第一个元素。
举个栗子:
val rdd = sc.parallelize(
List( 5 , 4 , 7 , 1 , 9 ) , 3 )
val first: Int = rdd.first()
println (first)
输出:
5
11) top
从 RDD 中按默认顺序 ( 降序 ) 或指定顺序取 n 个元素
举个栗子:
val rdd = sc.parallelize(
List( 5 , 4 , 7 , 1 , 9 ) , 3 )
val take: Array[ Int ] = rdd.top( 2 )
take.foreach(println)
输出:
9
7
12) takeOrdered
从 RDD 中取 n 个元素,与 top 算子不同的是它是以和 top 相反的顺序返回元素。
举个栗子:
val rdd = sc.parallelize(
List( 5 , 4 , 7 , 1 , 9 ) , 3 )
val take: Array[ Int ] = rdd
.takeOrdered( 2 )
take.foreach( println )
输出:
1
4
13) saveAsObjectFile
把 RDD 中元素序列化并保存,底层依赖 saveAsSequenceFile
举个栗子:
val rdd = sc.parallelize(
List( 5 , 4 , 7 , 1 , 9 ) , 3 )
rdd.saveAsObjectFile(
"/home/myname/test" )
14)reduce
reduce 参数是一个函数,把 RDD 中的元素两两传递给此函数,然后进行计算产生一个结果,结果再与下一个值计算,如此迭代。
举个栗子:
val rdd = sc .makeRDD(
List ( 1 , 2 , 3 , 4 , 5 ))
val result: Int = rdd
.reduce((x , y) => x + y)
println (result)
输出:
15
15) lookup
是个 k-v 算子,指定 key 值,返回此 key 对应的所有 v 值
举个栗子:
val rdd1 = sc.makeRDD(
Array(( "A" , 0 ) , ( "A" , 2 ) ,
( "B" , 1 ) , ( "B" , 2 ) , ( "C" , 1 )))
val rdd2: Seq [ Int ] = rdd1
.lookup( "A" )
rdd2.foreach( println )
输出:
0
2
16) aggregate
aggregate 用户聚合 RDD 中的元素,先指定初始值,再对 RDD 中元素进行局部求和,最后全局求和。此算子理解起来不是特别直观,大家感受一下。
举个栗子:
val rdd = sc.parallelize(
List( 1 , 2 , 3 , 4 ))
val res: Int = rdd
.aggregate( 2 )(_+_ , _+_)
println (res)
输出:
14
17) fold
fold是aggregate的简化
举个栗子:
val rdd = sc .parallelize(
List ( 1 , 2 , 3 , 4 ))
val res: Int = rdd
.fold( 2 )((x , y) => x + y)
println (res)
输出:
14
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- Python 图像处理 OpenCV (13): Scharr 算子和 LOG 算子边缘检测技术
- Option和Result相关的组合算子
- Apache Flink 漫谈系列 - JOIN 算子
- Spark 系列(四)—— RDD常用算子详解
- 边缘检测原理 - Sobel, Laplace, Canny算子
- Spark 性能调优:RDD 算子调优篇
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。