RDD编程练习

栏目: Scala · 发布时间: 6年前

内容简介:1、使用sc.textFile(“文件的路径”)从文件系统中加载,sc是SparkContext2、通过并行集合创建RDD操作转换得到的RDD是

RDD

RDD创建

1、使用sc.textFile(“文件的路径”)从文件系统中加载,sc是SparkContext

2、通过并行集合创建

val array = Array(1,2,3,4,5)
val rdd = sc.parallelize(array)//sc是SparkContext

RDD操作转换得到的RDD是 惰性操作 ,也就是说, 整个转换( transformation)过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到动作(Action)操作时,才会发生真正的计算,开始从血缘关系源头开始,进行物理的转换操作

常用的转换操作( transformation)

  • filter(func) 筛选出满足函数func的元素,并返回一个新的数据集

  • map(func) 将每个元素传递到函数func中,并将结果返回为一个新的数据集 一对一

  • flatMap(func) 与map()相似,但每个输入元素都可以映射到0或多个输出结果 可以一对多
  • groupByKey() 应用于(K,V)键值对的数据集时,返回一个新的(K, Iterable)形式的数据集
  • reduceByKey(func) 应用于(K,V)键值对的数据集时,返回一个新的(K, V)形式的数据集,其中每个值是将每个key传递到函数func中进行聚合后的结果

常用的动作操作(Action)

  • count() 返回数据集中的元素个数
  • collect() 以数组的形式返回数据集中的所有元素
  • first() 返回数据集中的第一个元素
  • take(n) 以数组的形式返回数据集中的前n个元素
  • reduce(func) 通过函数func(输入两个参数并返回一个值)聚合数据集中的元素
  • foreach(func) 将数据集中的每个元素传递到函数func中运行

RDD持久化RDD采用惰性求值的机制, 每次遇到行动操作,都会从头开始执行计算 。每次调用行动操作,都会触发一次从头开始的计算。这对于迭代计算而言,代价是很大的

persist()方法可以 标记 为持久化

  • persist(MEMORY_ONLY):表示将RDD作为反序列化的对象存储于JVM中,如果内存不足,就要按照LRU原则替换缓存中的内容
  • persist(MEMORY_AND_DISK)表示将RDD作为反序列化的对象存储在JVM中,如果内存不足,超出的分区将会被存放在硬盘上
    一般而言,使用cache()方法时,会调用persist(MEMORY_ONLY)
    可以使用unpersist()方法手动地把持久化的RDD从缓存中移除

RDD分区

为什么要分区 : 增加并行度,减少通信开销

RDD分区的一个原则是使得分区的个数尽量等于集群中的CPU核心(core)数目

对于不同的Spark部署模式而言(本地模式、Standalone模式、YARN模式、Mesos模式),都可以通过设置 spark.default.parallelism 这个参数的值,来配置默认的分区数目

一般而言:

  • 本地模式:默认为本地机器的CPU数目,若设置了local[N],则默认为N
  • Apache Mesos:默认的分区数为8
  • Standalone或YARN:在“集群中所有CPU核心数目总和”和“2”二者中取较大值作为默认值

手动创建分区

sc.textFile("文件的路径",2) //2就是分区数
sc.parallelize(array,2)  //2就是分区数
repartition(1) //此方法可以重新定义分区 

Pair RDD

键值对RDD 主要使用map()函数来实现

常用的对键值对RDD的转换操作

reduceByKey(func)

groupByKey()

keys

values

sortByKey()默认升序 sortByKey(false)降序

mapValues(func)

join 输入(K,V1)和(K,V2)输出 (K,(V1,V2))

combineByKey

共享变量广播变量 Broadcast Variables

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

累加器Accumulators

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Long = 10

数据的读写本地文件系统的数据读写

//读文件
scala> val  textFile = sc. textFile("file:///usr/local/spark/mycode/wordcount/word.txt")
//写文件
scala> textFile. saveAsTextFile("file:///usr/local/spark/mycode/wordcount/writeback")

读取hdfs上的文件

scala> val  textFile = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")
scala> textFile.first()
//或者
scala> val textFile = sc.textFile("/user/hadoop/word.txt")
scala> val textFile = sc.textFile("word.txt")
//写文件
scala> textFile.saveAsTextFile("writeback")

读取json文件 并解析

val  jsonStr = sc.textFile("file:///usr/local/spark/examples/src/main/resources/people.json")

使用JSON.parseFull(jsonString:String)解析

部署到集群进入到spark 安装目录

bin/spark-submit \
  --class <main-class> \
  --master <master-url> \
  --deploy-mode <deploy-mode> \
  --conf <key>=<value> \
  ... # other options
  <application-jar> \
  [application-arguments]
  • –class 入口类名(例如org.apache.spark.examples.SparkPi)
  • –master 群集的主URL(例如spark://23.195.26.187:7077)
  • –deploy-mode 是在工作节点(cluster)上部署驱动程序还是在本地部署外部客户端(client)(默认值: client)
  • –conf:key = value格式的任意Spark配置属性。对于包含空格的值,在引号中包含“key = value”
  • application-jar:捆绑jar的路径,包括您的应用程序和所有依赖项。URL必须在群集内部全局可见,例如,所有节点上都存在的hdfs://路径或file://路径。
  • application-arguments:main 方法的传入参数

以上所述就是小编给大家介绍的《RDD编程练习》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

重来2

重来2

[美] 贾森·弗里德、[美] 戴维·海涅迈尔·汉森 / 苏西 / 中信出版社 / 2014-4-8 / 39.00元

“不再需要办公室”,这不仅仅是未来才有的事——它已经发生了。现在,轮到你迈开脚步,跟上时代的步伐了。 上百万的员工和成千上万的企业已经发现了远程工作的乐趣和好处。然而,远程工作方式还没有成为常见的选择。事实上,远程工作的技术手段都已齐备。还没有升级换代的,是人们的思想。 这本书的目的就是帮你把想法升级换代。作者会向你展示远程工作的诸多好处:可以找到最优秀的人才,从摧残灵魂的通勤路上解脱......一起来看看 《重来2》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具