内容简介:尝试Spark和Apache Hive的一些方法和功能。1. Spark和countByValue函数让我们遵循以下RDD值:
尝试Spark和Apache Hive的一些方法和功能。
1. Spark和countByValue函数
让我们遵循以下RDD值:
<b>var</b> rddVal = sc.parallelize(Array(1,2,2,3,4,4,5,5,5,6));
我们的任务是创建新的RDD,其中key将是rddVal中的唯一项值,value将是rddVal项出现的次数。
countByValue 是很好的工具:
%spark <b>var</b> rddVal = sc.parallelize(Array(1,2,2,3,4,4,4,5,5,5,6)); val countedRDD = sc.parallelize(rddVal.countByValue().toSeq); countedRDD.collect();
zeppelin output:
rddVal: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:29 countedRDD: org.apache.spark.rdd.RDD[(Int, Long)] = ParallelCollectionRDD[7] at parallelize at <console>:31 res2: Array[(Int, Long)] = Array((5,3), (1,1), (6,1), (2,2), (3,1), (4,3))
2. Spark和countByKey函数
有时我们有(密钥,值)RDD,我们想要计算所有密钥的出现次数。 countByKey 动作函数是一个很好的工具!
%spark <b>var</b> rddKeyValues = sc.parallelize(Array((<font>"A"</font><font>, 99), (</font><font>"A"</font><font>,88), (</font><font>"B"</font><font>,22), (</font><font>"C"</font><font>,33))); val countedKeys = sc.parallelize(rddKeyValues.countByKey().toSeq); countedKeys.collect(); </font>
zeppelin output:
rddKeyValues: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[10] at parallelize at <console>:29 countedKeys: org.apache.spark.rdd.RDD[(String, Long)] = ParallelCollectionRDD[13] at parallelize at <console>:31 res4: Array[(String, Long)] = Array((B,1), (A,2), (C,1))
3. Spark和keyBy函数
如果您有值的RDD,你想在应用一个函数到每一个元素的,函数结果应该是新RDD的item key, keyBy 功能是你的朋友。
%spark
def multiply(num: Int):Int={
<b>return</b> num*num;
}
val inputRDD = sc.parallelize(Array(1,2,3,4,5,6));
val resRDD = inputRDD.keyBy(multiply);
resRDD.collect();
zeppelin output:
multiply: (num: Int)Int inputRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[14] at parallelize at <console>:29 resRDD: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[15] at keyBy at <console>:33 res5: Array[(Int, Int)] = Array((1,1), (4,2), (9,3), (16,4), (25,5), (36,6))
4. Apache Hive和切换到另一个数据库
%spark <b>import</b> org.apache.spark.sql.hive.HiveContext; val hc = <b>new</b> HiveContext(sc); hc.sql(<font>"USE xademo"</font><font>); hc.sql(</font><font>"SHOW TABLES"</font><font>).show(); </font>
zeppelin output:
<b>import</b> org.apache.spark.sql.hive.HiveContext hc: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@235d1d35 res11: org.apache.spark.sql.DataFrame = [result: string] +-------------------+-----------+ | tableName|isTemporary| +-------------------+-----------+ |call_detail_records| false| | customer_details| false| | genres| false| | justone| false| | mytable| false| | mytablexademo| false| | recharge_details| false| | workersxa| false| +-------------------+-----------+
5.数据帧分区
有时您被要求以某种特定格式将数据帧保存到HDFS中,并且您被迫使用动态分区。让我们来说明如何做到这一点:
输入文件workers.txt:
[root@sandbox ~]# cat workers.txt 1,Jerry,man,USA 2,Cathy,female,GBR 3,Teresa,female,GBR 4,Rut,female,USA 5,Roasie,female,AUS 6,Garry,man,GBR 7,Adam,man,GER 8,John,man,GBR 9,Jerremy,man,AUS 10,Angela,female,AUS 11,Ivanka,female,USA 12,Melania,female,USA
Spark code:
%spark
<font><i>// Dynamic partitioning when saving from Dataframe to HDFS</i></font><font>
<b>case</b> <b>class</b> worker(id: Int, name: String, sex: String, country: String);
val fileRDD = sc.textFile(</font><font>"/tests/workers.txt"</font><font>);
val workerDF = fileRDD.map(line=><b>new</b> worker(line.split(</font><font>","</font><font>)(0).toInt,
line.split(</font><font>","</font><font>)(1),
line.split(</font><font>","</font><font>)(2),
line.split(</font><font>","</font><font>)(3))).toDF();
</font><font><i>// save dataframe also into Hive for further use </i></font><font>
workerDF.saveAsTable(</font><font>"tableWorkers"</font><font>);
workerDF.write
.mode(</font><font>"overwrite"</font><font>)
.partitionBy(</font><font>"country"</font><font>)
.json(</font><font>"/tests/partition/result"</font><font>);
</font>
zeppelin output:
defined <b>class</b> worker fileRDD: org.apache.spark.rdd.RDD[String] = /tests/workers.txt MapPartitionsRDD[289] at textFile at <console>:114 workerDF: org.apache.spark.sql.DataFrame = [id: <b>int</b>, name: string, sex: string, country: string] warning: there were 1 deprecation warning(s); re-run with -deprecation <b>for</b> details
HDFS 更有趣结果:
[root@sandbox ~]# hdfs dfs -ls /tests/partition/result Found 5 items -rw-r--r-- 1 zeppelin hdfs 0 2018-08-13 23:35 /tests/partition/result/_SUCCESS drwxr-xr-x - zeppelin hdfs 0 2018-08-13 23:35 /tests/partition/result/country=AUS drwxr-xr-x - zeppelin hdfs 0 2018-08-13 23:35 /tests/partition/result/country=GBR drwxr-xr-x - zeppelin hdfs 0 2018-08-13 23:35 /tests/partition/result/country=GER drwxr-xr-x - zeppelin hdfs 0 2018-08-13 23:35 /tests/partition/result/country=USA <p>[root@sandbox ~]#
Spark为每个分区包含分组数据的每个分区创建了文件夹(例如):
[root@sandbox ~]# hdfs dfs -cat /tests/partition/result/country=USA/part-r-00000-9adc651a-1260-466d-ba37-720a0395d450
{<font>"id"</font><font>:1,</font><font>"name"</font><font>:</font><font>"Jerry"</font><font>,</font><font>"sex"</font><font>:</font><font>"man"</font><font>}
{</font><font>"id"</font><font>:4,</font><font>"name"</font><font>:</font><font>"Rut"</font><font>,</font><font>"sex"</font><font>:</font><font>"female"</font><font>}
</font>
6.将分区的HDFS数据读回数据帧
%spark val backDFJson = sqlContext.read.json(<font>"/tests/partition/result"</font><font>); backDFJson.show(); </font>
zeppelin output:
backDFJson: org.apache.spark.sql.DataFrame = [id: bigint, name: string, sex: string, country: string] +---+-------+------+-------+ | id| name| sex|country| +---+-------+------+-------+ | 1| Jerry| man| USA| | 4| Rut|female| USA| | 11| Ivanka|female| USA| | 12|Melania|female| USA| | 5| Roasie|female| AUS| | 9|Jerremy| man| AUS| | 10| Angela|female| AUS| | 7| Adam| man| GER| | 2| Cathy|female| GBR| | 3| Teresa|female| GBR| | 6| Garry| man| GBR| | 8| John| man| GBR| +---+-------+------+-------+
7. Apache Hive和ORC表的动态分区
Apache Hive支持两种分区:
- 静态分区
- 动态分区
有关更多信息,我可以推荐以下 博客 。基本区别在于,当您将数据保存到静态分区表时,您必须使用区分分区的值来命名分区列。我是动态分区的情况,如果不存在则创建分区。不需要任何值,只需分区列。
测试任务:让我们创建按country动态分区的工作人员ORC表。并从之前创建的表“tableWorkers”中将数据保存到其中。
%spark <font><i>// dynamic partitioning on Hive Table...</i></font><font> <b>import</b> org.apache.spark.sql.hive.HiveContext; <b>var</b> hc = <b>new</b> HiveContext(sc); hc.sql(</font><font>" DROP TABLE IF EXISTS WorkersPartitioned "</font><font>); hc.sql(</font><font>" CREATE TABLE WorkersPartitioned(id INT, name String, sex String) "</font><font>+ </font><font>" PARTITIONED BY (country STRING) "</font><font>+ </font><font>" STORED AS ORC "</font><font> ); hc.sql(</font><font>" SET set hive.exec.dynamic.partition=true "</font><font>); hc.sql(</font><font>" SET hive.exec.dynamic.partition.mode=nonstric "</font><font>); hc.sql(</font><font>" INSERT OVERWRITE TABLE WorkersPartitioned PARTITION(country) SELECT id, name, sex, country FROM tableWorkers "</font><font>); hc.sql(</font><font>" SELECT * FROM WorkersPartitioned "</font><font>).show(); </font>
注意代码“PARTITION(country)”,我们不需要输入确切的country,即动态分区。
Zeppelin output:
res165: org.apache.spark.sql.DataFrame = [key: string, value: string] res166: org.apache.spark.sql.DataFrame = [] +---+-------+------+-------+ | id| name| sex|country| +---+-------+------+-------+ | 5| Roasie|female| AUS| | 9|Jerremy| man| AUS| | 10| Angela|female| AUS| | 2| Cathy|female| GBR| | 3| Teresa|female| GBR| | 6| Garry| man| GBR| | 8| John| man| GBR| | 7| Adam| man| GER| | 1| Jerry| man| USA| | 4| Rut|female| USA| | 11| Ivanka|female| USA| | 12|Melania|female| USA| +---+-------+------+-------+
8. Apache Hive在表格中描述分区
<b>import</b> org.apache.spark.sql.hive.HiveContext; val hc = <b>new</b> HiveContext(sc); hc.sql(<font>"show partitions WorkersPartitioned"</font><font>).show(); </font>
Zeppelin output:
<b>import</b> org.apache.spark.sql.hive.HiveContext hc: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@2a117cf4 +-----------+ | result| +-----------+ |country=AUS| |country=GBR| |country=GER| |country=USA| +-----------+
点击标题见原文
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- Elasticsearch基础但非常有用的功能之一:别名
- 看的书多,就有用吗?
- 如何绘制有用的技术架构图
- Laravel10个有用的用法
- [译] 50+ 有用的 DevOps 工具(二)
- [译] 50+ 有用的 DevOps 工具(三)
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Building Websites with Joomla!
H Graf / Packt Publishing / 2006-01-20 / USD 44.99
This book is a fast paced tutorial to creating a website using Joomla!. If you've never used Joomla!, or even any web content management system before, then this book will walk you through each step i......一起来看看 《Building Websites with Joomla!》 这本书的介绍吧!