Apache Spark和Hive有用的功能

栏目: 编程工具 · 发布时间: 7年前

内容简介:尝试Spark和Apache Hive的一些方法和功能。1. Spark和countByValue函数让我们遵循以下RDD值:

尝试Spark和Apache Hive的一些方法和功能。

1. Spark和countByValue函数

让我们遵循以下RDD值:

<b>var</b> rddVal = sc.parallelize(Array(1,2,2,3,4,4,5,5,5,6));

我们的任务是创建新的RDD,其中key将是rddVal中的唯一项值,value将是rddVal项出现的次数。

countByValue 是很好的工具:

%spark

<b>var</b> rddVal = sc.parallelize(Array(1,2,2,3,4,4,4,5,5,5,6));

val countedRDD = sc.parallelize(rddVal.countByValue().toSeq);

countedRDD.collect();

zeppelin output:

rddVal: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:29
countedRDD: org.apache.spark.rdd.RDD[(Int, Long)] = ParallelCollectionRDD[7] at parallelize at <console>:31
res2: Array[(Int, Long)] = Array((5,3), (1,1), (6,1), (2,2), (3,1), (4,3))

2. Spark和countByKey函数

有时我们有(密钥,值)RDD,我们想要计算所有密钥的出现次数。 countByKey 动作函数是一个很好的工具!

%spark
<b>var</b> rddKeyValues = sc.parallelize(Array((<font>"A"</font><font>, 99), (</font><font>"A"</font><font>,88), (</font><font>"B"</font><font>,22), (</font><font>"C"</font><font>,33)));

val countedKeys = sc.parallelize(rddKeyValues.countByKey().toSeq);

countedKeys.collect();
</font>

zeppelin output:

rddKeyValues: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[10] at parallelize at <console>:29
countedKeys: org.apache.spark.rdd.RDD[(String, Long)] = ParallelCollectionRDD[13] at parallelize at <console>:31
res4: Array[(String, Long)] = Array((B,1), (A,2), (C,1))

3. Spark和keyBy函数

如果您有值的RDD,你想在应用一个函数到每一个元素的,函数结果应该是新RDD的item key, keyBy 功能是你的朋友。

%spark

def multiply(num: Int):Int={
    <b>return</b> num*num;
}

val inputRDD = sc.parallelize(Array(1,2,3,4,5,6));

val resRDD = inputRDD.keyBy(multiply);

resRDD.collect();

zeppelin output:

multiply: (num: Int)Int
inputRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[14] at parallelize at <console>:29
resRDD: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[15] at keyBy at <console>:33
res5: Array[(Int, Int)] = Array((1,1), (4,2), (9,3), (16,4), (25,5), (36,6))

4. Apache Hive和切换到另一个数据库

%spark
<b>import</b> org.apache.spark.sql.hive.HiveContext;

val hc = <b>new</b> HiveContext(sc);

hc.sql(<font>"USE xademo"</font><font>);
hc.sql(</font><font>"SHOW TABLES"</font><font>).show();
</font>

zeppelin output:

<b>import</b> org.apache.spark.sql.hive.HiveContext
hc: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@235d1d35
res11: org.apache.spark.sql.DataFrame = [result: string]
+-------------------+-----------+
|          tableName|isTemporary|
+-------------------+-----------+
|call_detail_records|      false|
|   customer_details|      false|
|             genres|      false|
|            justone|      false|
|            mytable|      false|
|      mytablexademo|      false|
|   recharge_details|      false|
|          workersxa|      false|
+-------------------+-----------+

5.数据帧分区

有时您被要求以某种特定格式将数据帧保存到HDFS中,并且您被迫使用动态分区。让我们来说明如何做到这一点:

输入文件workers.txt:

[root@sandbox ~]# cat workers.txt 
1,Jerry,man,USA
2,Cathy,female,GBR
3,Teresa,female,GBR
4,Rut,female,USA
5,Roasie,female,AUS
6,Garry,man,GBR
7,Adam,man,GER
8,John,man,GBR
9,Jerremy,man,AUS
10,Angela,female,AUS
11,Ivanka,female,USA
12,Melania,female,USA

Spark code:

%spark

<font><i>// Dynamic partitioning when saving from Dataframe to HDFS</i></font><font>

<b>case</b> <b>class</b> worker(id: Int, name: String, sex: String, country: String);

val fileRDD = sc.textFile(</font><font>"/tests/workers.txt"</font><font>);

val workerDF = fileRDD.map(line=><b>new</b> worker(line.split(</font><font>","</font><font>)(0).toInt, 
                                        line.split(</font><font>","</font><font>)(1), 
                                        line.split(</font><font>","</font><font>)(2), 
                                        line.split(</font><font>","</font><font>)(3))).toDF();

</font><font><i>// save dataframe also into Hive for further use                                        </i></font><font>
workerDF.saveAsTable(</font><font>"tableWorkers"</font><font>);

workerDF.write
        .mode(</font><font>"overwrite"</font><font>)
        .partitionBy(</font><font>"country"</font><font>)
        .json(</font><font>"/tests/partition/result"</font><font>);
</font>

zeppelin output:

defined <b>class</b> worker
fileRDD: org.apache.spark.rdd.RDD[String] = /tests/workers.txt MapPartitionsRDD[289] at textFile at <console>:114
workerDF: org.apache.spark.sql.DataFrame = [id: <b>int</b>, name: string, sex: string, country: string]
warning: there were 1 deprecation warning(s); re-run with -deprecation <b>for</b> details

HDFS 更有趣结果:

[root@sandbox ~]# hdfs dfs -ls /tests/partition/result
Found 5 items
-rw-r--r--   1 zeppelin hdfs          0 2018-08-13 23:35 /tests/partition/result/_SUCCESS
drwxr-xr-x   - zeppelin hdfs          0 2018-08-13 23:35 /tests/partition/result/country=AUS
drwxr-xr-x   - zeppelin hdfs          0 2018-08-13 23:35 /tests/partition/result/country=GBR
drwxr-xr-x   - zeppelin hdfs          0 2018-08-13 23:35 /tests/partition/result/country=GER
drwxr-xr-x   - zeppelin hdfs          0 2018-08-13 23:35 /tests/partition/result/country=USA
<p>[root@sandbox ~]#

Spark为每个分区包含分组数据的每个分区创建了文件夹(例如):

[root@sandbox ~]# hdfs dfs -cat /tests/partition/result/country=USA/part-r-00000-9adc651a-1260-466d-ba37-720a0395d450
{<font>"id"</font><font>:1,</font><font>"name"</font><font>:</font><font>"Jerry"</font><font>,</font><font>"sex"</font><font>:</font><font>"man"</font><font>}
{</font><font>"id"</font><font>:4,</font><font>"name"</font><font>:</font><font>"Rut"</font><font>,</font><font>"sex"</font><font>:</font><font>"female"</font><font>}
</font>

6.将分区的HDFS数据读回数据帧

%spark
val backDFJson = sqlContext.read.json(<font>"/tests/partition/result"</font><font>);

backDFJson.show();
</font>

zeppelin output:

backDFJson: org.apache.spark.sql.DataFrame = [id: bigint, name: string, sex: string, country: string]
+---+-------+------+-------+
| id|   name|   sex|country|
+---+-------+------+-------+
|  1|  Jerry|   man|    USA|
|  4|    Rut|female|    USA|
| 11| Ivanka|female|    USA|
| 12|Melania|female|    USA|
|  5| Roasie|female|    AUS|
|  9|Jerremy|   man|    AUS|
| 10| Angela|female|    AUS|
|  7|   Adam|   man|    GER|
|  2|  Cathy|female|    GBR|
|  3| Teresa|female|    GBR|
|  6|  Garry|   man|    GBR|
|  8|   John|   man|    GBR|
+---+-------+------+-------+

7. Apache Hive和ORC表的动态分区

Apache Hive支持两种分区:

  • 静态分区
  • 动态分区

有关更多信息,我可以推荐以下 博客 。基本区别在于,当您将数据保存到静态分区表时,您必须使用区分分区的值来命名分区列。我是动态分区的情况,如果不存在则创建分区。不需要任何值,只需分区列。

测试任务:让我们创建按country动态分区的工作人员ORC表。并从之前创建的表“tableWorkers”中将数据保存到其中。

%spark

<font><i>// dynamic partitioning on Hive Table...</i></font><font>
<b>import</b> org.apache.spark.sql.hive.HiveContext;

<b>var</b> hc = <b>new</b> HiveContext(sc);

hc.sql(</font><font>" DROP TABLE IF EXISTS WorkersPartitioned "</font><font>);
hc.sql(</font><font>" CREATE TABLE WorkersPartitioned(id INT, name String, sex String) "</font><font>+
   </font><font>" PARTITIONED BY (country STRING) "</font><font>+
   </font><font>" STORED AS ORC "</font><font>
);

hc.sql(</font><font>" SET set hive.exec.dynamic.partition=true "</font><font>);
hc.sql(</font><font>" SET hive.exec.dynamic.partition.mode=nonstric "</font><font>);

hc.sql(</font><font>" INSERT OVERWRITE TABLE WorkersPartitioned PARTITION(country) SELECT id, name, sex, country FROM tableWorkers "</font><font>);

hc.sql(</font><font>" SELECT * FROM WorkersPartitioned "</font><font>).show();
</font>

注意代码“PARTITION(country)”,我们不需要输入确切的country,即动态分区。

Zeppelin output:

res165: org.apache.spark.sql.DataFrame = [key: string, value: string]
res166: org.apache.spark.sql.DataFrame = []
+---+-------+------+-------+
| id|   name|   sex|country|
+---+-------+------+-------+
|  5| Roasie|female|    AUS|
|  9|Jerremy|   man|    AUS|
| 10| Angela|female|    AUS|
|  2|  Cathy|female|    GBR|
|  3| Teresa|female|    GBR|
|  6|  Garry|   man|    GBR|
|  8|   John|   man|    GBR|
|  7|   Adam|   man|    GER|
|  1|  Jerry|   man|    USA|
|  4|    Rut|female|    USA|
| 11| Ivanka|female|    USA|
| 12|Melania|female|    USA|
+---+-------+------+-------+

8. Apache Hive在表格中描述分区

<b>import</b> org.apache.spark.sql.hive.HiveContext;

val hc = <b>new</b> HiveContext(sc);

hc.sql(<font>"show partitions WorkersPartitioned"</font><font>).show();
</font>

Zeppelin output:

<b>import</b> org.apache.spark.sql.hive.HiveContext
hc: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@2a117cf4
+-----------+
|     result|
+-----------+
|country=AUS|
|country=GBR|
|country=GER|
|country=USA|
+-----------+

点击标题见原文


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Building Websites with Joomla!

Building Websites with Joomla!

H Graf / Packt Publishing / 2006-01-20 / USD 44.99

This book is a fast paced tutorial to creating a website using Joomla!. If you've never used Joomla!, or even any web content management system before, then this book will walk you through each step i......一起来看看 《Building Websites with Joomla!》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具