内容简介:尝试Spark和Apache Hive的一些方法和功能。1. Spark和countByValue函数让我们遵循以下RDD值:
尝试Spark和Apache Hive的一些方法和功能。
1. Spark和countByValue函数
让我们遵循以下RDD值:
<b>var</b> rddVal = sc.parallelize(Array(1,2,2,3,4,4,5,5,5,6));
我们的任务是创建新的RDD,其中key将是rddVal中的唯一项值,value将是rddVal项出现的次数。
countByValue 是很好的工具:
%spark <b>var</b> rddVal = sc.parallelize(Array(1,2,2,3,4,4,4,5,5,5,6)); val countedRDD = sc.parallelize(rddVal.countByValue().toSeq); countedRDD.collect();
zeppelin output:
rddVal: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:29 countedRDD: org.apache.spark.rdd.RDD[(Int, Long)] = ParallelCollectionRDD[7] at parallelize at <console>:31 res2: Array[(Int, Long)] = Array((5,3), (1,1), (6,1), (2,2), (3,1), (4,3))
2. Spark和countByKey函数
有时我们有(密钥,值)RDD,我们想要计算所有密钥的出现次数。 countByKey 动作函数是一个很好的工具!
%spark <b>var</b> rddKeyValues = sc.parallelize(Array((<font>"A"</font><font>, 99), (</font><font>"A"</font><font>,88), (</font><font>"B"</font><font>,22), (</font><font>"C"</font><font>,33))); val countedKeys = sc.parallelize(rddKeyValues.countByKey().toSeq); countedKeys.collect(); </font>
zeppelin output:
rddKeyValues: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[10] at parallelize at <console>:29 countedKeys: org.apache.spark.rdd.RDD[(String, Long)] = ParallelCollectionRDD[13] at parallelize at <console>:31 res4: Array[(String, Long)] = Array((B,1), (A,2), (C,1))
3. Spark和keyBy函数
如果您有值的RDD,你想在应用一个函数到每一个元素的,函数结果应该是新RDD的item key, keyBy 功能是你的朋友。
%spark def multiply(num: Int):Int={ <b>return</b> num*num; } val inputRDD = sc.parallelize(Array(1,2,3,4,5,6)); val resRDD = inputRDD.keyBy(multiply); resRDD.collect();
zeppelin output:
multiply: (num: Int)Int inputRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[14] at parallelize at <console>:29 resRDD: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[15] at keyBy at <console>:33 res5: Array[(Int, Int)] = Array((1,1), (4,2), (9,3), (16,4), (25,5), (36,6))
4. Apache Hive和切换到另一个数据库
%spark <b>import</b> org.apache.spark.sql.hive.HiveContext; val hc = <b>new</b> HiveContext(sc); hc.sql(<font>"USE xademo"</font><font>); hc.sql(</font><font>"SHOW TABLES"</font><font>).show(); </font>
zeppelin output:
<b>import</b> org.apache.spark.sql.hive.HiveContext hc: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@235d1d35 res11: org.apache.spark.sql.DataFrame = [result: string] +-------------------+-----------+ | tableName|isTemporary| +-------------------+-----------+ |call_detail_records| false| | customer_details| false| | genres| false| | justone| false| | mytable| false| | mytablexademo| false| | recharge_details| false| | workersxa| false| +-------------------+-----------+
5.数据帧分区
有时您被要求以某种特定格式将数据帧保存到HDFS中,并且您被迫使用动态分区。让我们来说明如何做到这一点:
输入文件workers.txt:
[root@sandbox ~]# cat workers.txt 1,Jerry,man,USA 2,Cathy,female,GBR 3,Teresa,female,GBR 4,Rut,female,USA 5,Roasie,female,AUS 6,Garry,man,GBR 7,Adam,man,GER 8,John,man,GBR 9,Jerremy,man,AUS 10,Angela,female,AUS 11,Ivanka,female,USA 12,Melania,female,USA
Spark code:
%spark <font><i>// Dynamic partitioning when saving from Dataframe to HDFS</i></font><font> <b>case</b> <b>class</b> worker(id: Int, name: String, sex: String, country: String); val fileRDD = sc.textFile(</font><font>"/tests/workers.txt"</font><font>); val workerDF = fileRDD.map(line=><b>new</b> worker(line.split(</font><font>","</font><font>)(0).toInt, line.split(</font><font>","</font><font>)(1), line.split(</font><font>","</font><font>)(2), line.split(</font><font>","</font><font>)(3))).toDF(); </font><font><i>// save dataframe also into Hive for further use </i></font><font> workerDF.saveAsTable(</font><font>"tableWorkers"</font><font>); workerDF.write .mode(</font><font>"overwrite"</font><font>) .partitionBy(</font><font>"country"</font><font>) .json(</font><font>"/tests/partition/result"</font><font>); </font>
zeppelin output:
defined <b>class</b> worker fileRDD: org.apache.spark.rdd.RDD[String] = /tests/workers.txt MapPartitionsRDD[289] at textFile at <console>:114 workerDF: org.apache.spark.sql.DataFrame = [id: <b>int</b>, name: string, sex: string, country: string] warning: there were 1 deprecation warning(s); re-run with -deprecation <b>for</b> details
HDFS 更有趣结果:
[root@sandbox ~]# hdfs dfs -ls /tests/partition/result Found 5 items -rw-r--r-- 1 zeppelin hdfs 0 2018-08-13 23:35 /tests/partition/result/_SUCCESS drwxr-xr-x - zeppelin hdfs 0 2018-08-13 23:35 /tests/partition/result/country=AUS drwxr-xr-x - zeppelin hdfs 0 2018-08-13 23:35 /tests/partition/result/country=GBR drwxr-xr-x - zeppelin hdfs 0 2018-08-13 23:35 /tests/partition/result/country=GER drwxr-xr-x - zeppelin hdfs 0 2018-08-13 23:35 /tests/partition/result/country=USA <p>[root@sandbox ~]#
Spark为每个分区包含分组数据的每个分区创建了文件夹(例如):
[root@sandbox ~]# hdfs dfs -cat /tests/partition/result/country=USA/part-r-00000-9adc651a-1260-466d-ba37-720a0395d450 {<font>"id"</font><font>:1,</font><font>"name"</font><font>:</font><font>"Jerry"</font><font>,</font><font>"sex"</font><font>:</font><font>"man"</font><font>} {</font><font>"id"</font><font>:4,</font><font>"name"</font><font>:</font><font>"Rut"</font><font>,</font><font>"sex"</font><font>:</font><font>"female"</font><font>} </font>
6.将分区的HDFS数据读回数据帧
%spark val backDFJson = sqlContext.read.json(<font>"/tests/partition/result"</font><font>); backDFJson.show(); </font>
zeppelin output:
backDFJson: org.apache.spark.sql.DataFrame = [id: bigint, name: string, sex: string, country: string] +---+-------+------+-------+ | id| name| sex|country| +---+-------+------+-------+ | 1| Jerry| man| USA| | 4| Rut|female| USA| | 11| Ivanka|female| USA| | 12|Melania|female| USA| | 5| Roasie|female| AUS| | 9|Jerremy| man| AUS| | 10| Angela|female| AUS| | 7| Adam| man| GER| | 2| Cathy|female| GBR| | 3| Teresa|female| GBR| | 6| Garry| man| GBR| | 8| John| man| GBR| +---+-------+------+-------+
7. Apache Hive和ORC表的动态分区
Apache Hive支持两种分区:
- 静态分区
- 动态分区
有关更多信息,我可以推荐以下 博客 。基本区别在于,当您将数据保存到静态分区表时,您必须使用区分分区的值来命名分区列。我是动态分区的情况,如果不存在则创建分区。不需要任何值,只需分区列。
测试任务:让我们创建按country动态分区的工作人员ORC表。并从之前创建的表“tableWorkers”中将数据保存到其中。
%spark <font><i>// dynamic partitioning on Hive Table...</i></font><font> <b>import</b> org.apache.spark.sql.hive.HiveContext; <b>var</b> hc = <b>new</b> HiveContext(sc); hc.sql(</font><font>" DROP TABLE IF EXISTS WorkersPartitioned "</font><font>); hc.sql(</font><font>" CREATE TABLE WorkersPartitioned(id INT, name String, sex String) "</font><font>+ </font><font>" PARTITIONED BY (country STRING) "</font><font>+ </font><font>" STORED AS ORC "</font><font> ); hc.sql(</font><font>" SET set hive.exec.dynamic.partition=true "</font><font>); hc.sql(</font><font>" SET hive.exec.dynamic.partition.mode=nonstric "</font><font>); hc.sql(</font><font>" INSERT OVERWRITE TABLE WorkersPartitioned PARTITION(country) SELECT id, name, sex, country FROM tableWorkers "</font><font>); hc.sql(</font><font>" SELECT * FROM WorkersPartitioned "</font><font>).show(); </font>
注意代码“PARTITION(country)”,我们不需要输入确切的country,即动态分区。
Zeppelin output:
res165: org.apache.spark.sql.DataFrame = [key: string, value: string] res166: org.apache.spark.sql.DataFrame = [] +---+-------+------+-------+ | id| name| sex|country| +---+-------+------+-------+ | 5| Roasie|female| AUS| | 9|Jerremy| man| AUS| | 10| Angela|female| AUS| | 2| Cathy|female| GBR| | 3| Teresa|female| GBR| | 6| Garry| man| GBR| | 8| John| man| GBR| | 7| Adam| man| GER| | 1| Jerry| man| USA| | 4| Rut|female| USA| | 11| Ivanka|female| USA| | 12|Melania|female| USA| +---+-------+------+-------+
8. Apache Hive在表格中描述分区
<b>import</b> org.apache.spark.sql.hive.HiveContext; val hc = <b>new</b> HiveContext(sc); hc.sql(<font>"show partitions WorkersPartitioned"</font><font>).show(); </font>
Zeppelin output:
<b>import</b> org.apache.spark.sql.hive.HiveContext hc: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@2a117cf4 +-----------+ | result| +-----------+ |country=AUS| |country=GBR| |country=GER| |country=USA| +-----------+
点击标题见原文
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- Elasticsearch基础但非常有用的功能之一:别名
- 看的书多,就有用吗?
- 如何绘制有用的技术架构图
- Laravel10个有用的用法
- [译] 50+ 有用的 DevOps 工具(二)
- [译] 50+ 有用的 DevOps 工具(三)
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。