Apache Spark和Hive有用的功能

栏目: 编程工具 · 发布时间: 5年前

内容简介:尝试Spark和Apache Hive的一些方法和功能。1. Spark和countByValue函数让我们遵循以下RDD值:

尝试Spark和Apache Hive的一些方法和功能。

1. Spark和countByValue函数

让我们遵循以下RDD值:

<b>var</b> rddVal = sc.parallelize(Array(1,2,2,3,4,4,5,5,5,6));

我们的任务是创建新的RDD,其中key将是rddVal中的唯一项值,value将是rddVal项出现的次数。

countByValue 是很好的工具:

%spark

<b>var</b> rddVal = sc.parallelize(Array(1,2,2,3,4,4,4,5,5,5,6));

val countedRDD = sc.parallelize(rddVal.countByValue().toSeq);

countedRDD.collect();

zeppelin output:

rddVal: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:29
countedRDD: org.apache.spark.rdd.RDD[(Int, Long)] = ParallelCollectionRDD[7] at parallelize at <console>:31
res2: Array[(Int, Long)] = Array((5,3), (1,1), (6,1), (2,2), (3,1), (4,3))

2. Spark和countByKey函数

有时我们有(密钥,值)RDD,我们想要计算所有密钥的出现次数。 countByKey 动作函数是一个很好的工具!

%spark
<b>var</b> rddKeyValues = sc.parallelize(Array((<font>"A"</font><font>, 99), (</font><font>"A"</font><font>,88), (</font><font>"B"</font><font>,22), (</font><font>"C"</font><font>,33)));

val countedKeys = sc.parallelize(rddKeyValues.countByKey().toSeq);

countedKeys.collect();
</font>

zeppelin output:

rddKeyValues: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[10] at parallelize at <console>:29
countedKeys: org.apache.spark.rdd.RDD[(String, Long)] = ParallelCollectionRDD[13] at parallelize at <console>:31
res4: Array[(String, Long)] = Array((B,1), (A,2), (C,1))

3. Spark和keyBy函数

如果您有值的RDD,你想在应用一个函数到每一个元素的,函数结果应该是新RDD的item key, keyBy 功能是你的朋友。

%spark

def multiply(num: Int):Int={
    <b>return</b> num*num;
}

val inputRDD = sc.parallelize(Array(1,2,3,4,5,6));

val resRDD = inputRDD.keyBy(multiply);

resRDD.collect();

zeppelin output:

multiply: (num: Int)Int
inputRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[14] at parallelize at <console>:29
resRDD: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[15] at keyBy at <console>:33
res5: Array[(Int, Int)] = Array((1,1), (4,2), (9,3), (16,4), (25,5), (36,6))

4. Apache Hive和切换到另一个数据库

%spark
<b>import</b> org.apache.spark.sql.hive.HiveContext;

val hc = <b>new</b> HiveContext(sc);

hc.sql(<font>"USE xademo"</font><font>);
hc.sql(</font><font>"SHOW TABLES"</font><font>).show();
</font>

zeppelin output:

<b>import</b> org.apache.spark.sql.hive.HiveContext
hc: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@235d1d35
res11: org.apache.spark.sql.DataFrame = [result: string]
+-------------------+-----------+
|          tableName|isTemporary|
+-------------------+-----------+
|call_detail_records|      false|
|   customer_details|      false|
|             genres|      false|
|            justone|      false|
|            mytable|      false|
|      mytablexademo|      false|
|   recharge_details|      false|
|          workersxa|      false|
+-------------------+-----------+

5.数据帧分区

有时您被要求以某种特定格式将数据帧保存到HDFS中,并且您被迫使用动态分区。让我们来说明如何做到这一点:

输入文件workers.txt:

[root@sandbox ~]# cat workers.txt 
1,Jerry,man,USA
2,Cathy,female,GBR
3,Teresa,female,GBR
4,Rut,female,USA
5,Roasie,female,AUS
6,Garry,man,GBR
7,Adam,man,GER
8,John,man,GBR
9,Jerremy,man,AUS
10,Angela,female,AUS
11,Ivanka,female,USA
12,Melania,female,USA

Spark code:

%spark

<font><i>// Dynamic partitioning when saving from Dataframe to HDFS</i></font><font>

<b>case</b> <b>class</b> worker(id: Int, name: String, sex: String, country: String);

val fileRDD = sc.textFile(</font><font>"/tests/workers.txt"</font><font>);

val workerDF = fileRDD.map(line=><b>new</b> worker(line.split(</font><font>","</font><font>)(0).toInt, 
                                        line.split(</font><font>","</font><font>)(1), 
                                        line.split(</font><font>","</font><font>)(2), 
                                        line.split(</font><font>","</font><font>)(3))).toDF();

</font><font><i>// save dataframe also into Hive for further use                                        </i></font><font>
workerDF.saveAsTable(</font><font>"tableWorkers"</font><font>);

workerDF.write
        .mode(</font><font>"overwrite"</font><font>)
        .partitionBy(</font><font>"country"</font><font>)
        .json(</font><font>"/tests/partition/result"</font><font>);
</font>

zeppelin output:

defined <b>class</b> worker
fileRDD: org.apache.spark.rdd.RDD[String] = /tests/workers.txt MapPartitionsRDD[289] at textFile at <console>:114
workerDF: org.apache.spark.sql.DataFrame = [id: <b>int</b>, name: string, sex: string, country: string]
warning: there were 1 deprecation warning(s); re-run with -deprecation <b>for</b> details

HDFS 更有趣结果:

[root@sandbox ~]# hdfs dfs -ls /tests/partition/result
Found 5 items
-rw-r--r--   1 zeppelin hdfs          0 2018-08-13 23:35 /tests/partition/result/_SUCCESS
drwxr-xr-x   - zeppelin hdfs          0 2018-08-13 23:35 /tests/partition/result/country=AUS
drwxr-xr-x   - zeppelin hdfs          0 2018-08-13 23:35 /tests/partition/result/country=GBR
drwxr-xr-x   - zeppelin hdfs          0 2018-08-13 23:35 /tests/partition/result/country=GER
drwxr-xr-x   - zeppelin hdfs          0 2018-08-13 23:35 /tests/partition/result/country=USA
<p>[root@sandbox ~]#

Spark为每个分区包含分组数据的每个分区创建了文件夹(例如):

[root@sandbox ~]# hdfs dfs -cat /tests/partition/result/country=USA/part-r-00000-9adc651a-1260-466d-ba37-720a0395d450
{<font>"id"</font><font>:1,</font><font>"name"</font><font>:</font><font>"Jerry"</font><font>,</font><font>"sex"</font><font>:</font><font>"man"</font><font>}
{</font><font>"id"</font><font>:4,</font><font>"name"</font><font>:</font><font>"Rut"</font><font>,</font><font>"sex"</font><font>:</font><font>"female"</font><font>}
</font>

6.将分区的HDFS数据读回数据帧

%spark
val backDFJson = sqlContext.read.json(<font>"/tests/partition/result"</font><font>);

backDFJson.show();
</font>

zeppelin output:

backDFJson: org.apache.spark.sql.DataFrame = [id: bigint, name: string, sex: string, country: string]
+---+-------+------+-------+
| id|   name|   sex|country|
+---+-------+------+-------+
|  1|  Jerry|   man|    USA|
|  4|    Rut|female|    USA|
| 11| Ivanka|female|    USA|
| 12|Melania|female|    USA|
|  5| Roasie|female|    AUS|
|  9|Jerremy|   man|    AUS|
| 10| Angela|female|    AUS|
|  7|   Adam|   man|    GER|
|  2|  Cathy|female|    GBR|
|  3| Teresa|female|    GBR|
|  6|  Garry|   man|    GBR|
|  8|   John|   man|    GBR|
+---+-------+------+-------+

7. Apache Hive和ORC表的动态分区

Apache Hive支持两种分区:

  • 静态分区
  • 动态分区

有关更多信息,我可以推荐以下 博客 。基本区别在于,当您将数据保存到静态分区表时,您必须使用区分分区的值来命名分区列。我是动态分区的情况,如果不存在则创建分区。不需要任何值,只需分区列。

测试任务:让我们创建按country动态分区的工作人员ORC表。并从之前创建的表“tableWorkers”中将数据保存到其中。

%spark

<font><i>// dynamic partitioning on Hive Table...</i></font><font>
<b>import</b> org.apache.spark.sql.hive.HiveContext;

<b>var</b> hc = <b>new</b> HiveContext(sc);

hc.sql(</font><font>" DROP TABLE IF EXISTS WorkersPartitioned "</font><font>);
hc.sql(</font><font>" CREATE TABLE WorkersPartitioned(id INT, name String, sex String) "</font><font>+
   </font><font>" PARTITIONED BY (country STRING) "</font><font>+
   </font><font>" STORED AS ORC "</font><font>
);

hc.sql(</font><font>" SET set hive.exec.dynamic.partition=true "</font><font>);
hc.sql(</font><font>" SET hive.exec.dynamic.partition.mode=nonstric "</font><font>);

hc.sql(</font><font>" INSERT OVERWRITE TABLE WorkersPartitioned PARTITION(country) SELECT id, name, sex, country FROM tableWorkers "</font><font>);

hc.sql(</font><font>" SELECT * FROM WorkersPartitioned "</font><font>).show();
</font>

注意代码“PARTITION(country)”,我们不需要输入确切的country,即动态分区。

Zeppelin output:

res165: org.apache.spark.sql.DataFrame = [key: string, value: string]
res166: org.apache.spark.sql.DataFrame = []
+---+-------+------+-------+
| id|   name|   sex|country|
+---+-------+------+-------+
|  5| Roasie|female|    AUS|
|  9|Jerremy|   man|    AUS|
| 10| Angela|female|    AUS|
|  2|  Cathy|female|    GBR|
|  3| Teresa|female|    GBR|
|  6|  Garry|   man|    GBR|
|  8|   John|   man|    GBR|
|  7|   Adam|   man|    GER|
|  1|  Jerry|   man|    USA|
|  4|    Rut|female|    USA|
| 11| Ivanka|female|    USA|
| 12|Melania|female|    USA|
+---+-------+------+-------+

8. Apache Hive在表格中描述分区

<b>import</b> org.apache.spark.sql.hive.HiveContext;

val hc = <b>new</b> HiveContext(sc);

hc.sql(<font>"show partitions WorkersPartitioned"</font><font>).show();
</font>

Zeppelin output:

<b>import</b> org.apache.spark.sql.hive.HiveContext
hc: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@2a117cf4
+-----------+
|     result|
+-----------+
|country=AUS|
|country=GBR|
|country=GER|
|country=USA|
+-----------+

点击标题见原文


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

精通Nginx

精通Nginx

[瑞士]艾维利 / 陶利军 / 人民邮电出版社 / 2015-2 / 49.00元

Nginx是一个高性能的轻量级Web服务器,本书从配置文件的角度出发,介绍了多种关于 Nginx配置文件的技巧。 本书以模块化风格写成,几乎每一章都是一个独立的模块,读者将能够自由地在各个模块间切换阅读。全书分两部分,第一部分用8章内容介绍了安装Nginx及第三方模块、配置指南、使用mail模块、Nginx作为反向代理、反向代理高级话题、Nginx Http服务器、Nginx的开发以及故障排......一起来看看 《精通Nginx》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

URL 编码/解码
URL 编码/解码

URL 编码/解码

MD5 加密
MD5 加密

MD5 加密工具