Apache Spark 2一些使用案例

栏目: 编程工具 · 发布时间: 5年前

内容简介:1. registerTempTable 与createOrReplaceTempView以下面数据JSON为案例people.json:输入:

1. registerTempTable 与createOrReplaceTempView

以下面数据JSON为案例people.json:

{
    <font>"name"</font><font>: </font><font>"John"</font><font>, 
    </font><font>"age"</font><font>: </font><font>"28"</font><font>, 
    </font><font>"country"</font><font>: 
    </font><font>"UK"</font><font>
}
{
    </font><font>"name"</font><font>: </font><font>"Cathy"</font><font>, 
    </font><font>"age"</font><font>: </font><font>"30"</font><font>, 
    </font><font>"country"</font><font>: </font><font>"AUS"</font><font>
}
{
    </font><font>"name"</font><font>: </font><font>"Mark"</font><font>, 
    </font><font>"age"</font><font>: </font><font>"50"</font><font>, 
    </font><font>"country"</font><font>: </font><font>"USA"</font><font>
}
</font>

输入:

%spark2

val peopleJsonDF = sqlContext.read.format(<font>"json"</font><font>).load(</font><font>"/tests/people.json"</font><font>);

peopleJsonDF.createOrReplaceTempView(</font><font>"jsonPeople"</font><font>);

sqlContext.sql(</font><font>"select * from jsonPeople where age > 30"</font><font>).show();
</font>

Zeppelin output:

peopleJsonDF: org.apache.spark.sql.DataFrame = [age: string, country: string ... 1 more field]
+---+-------+----+
|age|country|name|
+---+-------+----+
| 50|    USA|Mark|
+---+-------+----+

结果:registerTempTable是Sparn 1.x语法,createOrReplaceTempView是Spark 2.x语法。两者都做同样的事情。

2. Apache Spark 2中的Spark-csv

任务:从JSON文件中获取数据,进行一些 排序 并将结果保存为CSV文件。Apache Spark 2在这里非常方便!

%spark2

val peopleJsonDF = sqlContext.read.option(<font>"multiline"</font><font>, <b>true</b>).format(</font><font>"json"</font><font>).load(</font><font>"/tests/people.json"</font><font>);

peopleJsonDF.createOrReplaceTempView(</font><font>"jsonPeople"</font><font>);

val orderedJsonDF = sqlContext.sql(</font><font>" select * from jsonPeople ORDER BY age DESC "</font><font>);

orderedJsonDF.show();

orderedJsonDF.write.mode(</font><font>"overwrite"</font><font>).option(</font><font>"sep"</font><font>, </font><font>"|"</font><font>).option(</font><font>"header"</font><font>, <b>true</b>).format(</font><font>"csv"</font><font>).save(</font><font>"/tests/csvresult"</font><font>);
</font>

Zeppelin output:

peopleJsonDF: org.apache.spark.sql.DataFrame = [age: string, country: string ... 1 more field]
orderedJsonDF: org.apache.spark.sql.DataFrame = [age: string, country: string ... 1 more field]
+---+-------+-----+
|age|country| name|
+---+-------+-----+
| 50|    USA| Mark|
| 30|    AUS|Cathy|
| 28|     UK| John|
+---+-------+-----+

HDFS output:

[root@sandbox ~]# hdfs dfs -ls /tests/csvresult
Found 5 items
-rw-r--r--   1 zeppelin hdfs          0 2018-09-04 17:14 /tests/csvresult/_SUCCESS
-rw-r--r--   1 zeppelin hdfs         29 2018-09-04 17:14 /tests/csvresult/part-00000-4f4204f7-636b-4f7c-a8b7-a9a8a4c395cf.csv
-rw-r--r--   1 zeppelin hdfs         30 2018-09-04 17:14 /tests/csvresult/part-00001-4f4204f7-636b-4f7c-a8b7-a9a8a4c395cf.csv
-rw-r--r--   1 zeppelin hdfs         28 2018-09-04 17:14 /tests/csvresult/part-00002-4f4204f7-636b-4f7c-a8b7-a9a8a4c395cf.csv
-rw-r--r--   1 zeppelin hdfs          0 2018-09-04 17:14 /tests/csvresult/part-00003-4f4204f7-636b-4f7c-a8b7-a9a8a4c395cf.csv

3.从CSV中推断模式并将结果保存到ORC中并通过Hive将其恢复

让我们将之前的结果'/ tests / csvresult'作为此任务的输入。

%spark2

<b>import</b> org.apache.spark.sql.hive.HiveContext;
<b>import</b> org.apache.spark.sql.types._;

<b>var</b> hiveContext = <b>new</b> HiveContext(sc);

<b>var</b> csvDF = sqlContext.read
            .option(<font>"header"</font><font>, <b>true</b>)
            .option(</font><font>"sep"</font><font>, </font><font>"|"</font><font>)
            .option(</font><font>"inferSchema"</font><font>, <b>true</b>)
            .format(</font><font>"csv"</font><font>).load(</font><font>"/tests/csvresult"</font><font>);

csvDF.createOrReplaceTempView(</font><font>"csvTable"</font><font>);


<b>var</b> sortedDF = sqlContext.sql(</font><font>"select age,country,name from csvTable order by age desc"</font><font>);

sortedDF.write.mode(</font><font>"overwrite"</font><font>).format(</font><font>"orc"</font><font>).save(</font><font>"/tests/orcresult"</font><font>);

hiveContext.sql(</font><font>" DROP TABLE IF EXISTS people "</font><font>);
hiveContext.sql(</font><font>"CREATE EXTERNAL TABLE people (age INT, country String, name String) "</font><font>+
                        </font><font>" STORED AS ORC "</font><font>+
                        </font><font>" LOCATION '/tests/orcresult'"</font><font>
);

hiveContext.sql(</font><font>"select * from people"</font><font>).show();
</font>

Zeppelin output:

<b>import</b> org.apache.spark.sql.hive.HiveContext
<b>import</b> org.apache.spark.sql.types._
warning: there was one deprecation warning; re-run with -deprecation <b>for</b> details
hiveContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@4fc938b8
csvDF: org.apache.spark.sql.DataFrame = [age: <b>int</b>, country: string ... 1 more field]
sortedDF: org.apache.spark.sql.DataFrame = [age: <b>int</b>, country: string ... 1 more field]
res135: org.apache.spark.sql.DataFrame = []
res136: org.apache.spark.sql.DataFrame = []

+---+-------+-----+
|age|country| name|
+---+-------+-----+
| 50|    USA| Mark|
| 30|    AUS|Cathy|
| 28|     UK| John|
+---+-------+-----+

4.删除格式错误的记录

假设有文件malformed.csv:

50|USA|Mark
30|AUS|Cathy
28|UK

并且任务是删除格式错误的记录并创建新的DF:

%spark2

val schema = StructType(Array(
  StructField(<font>"age"</font><font>, IntegerType, false),
  StructField(</font><font>"country"</font><font>, StringType, false),
  StructField(</font><font>"name"</font><font>, StringType, false)
));

val malformedDF = sqlContext.read.format(</font><font>"csv"</font><font>)
                        .schema(schema)
                        .option(</font><font>"mode"</font><font>, </font><font>"DROPMALFORMED"</font><font>)
                        .option(</font><font>"header"</font><font>, false)
                        .option(</font><font>"sep"</font><font>, </font><font>"|"</font><font>)
                        .load(</font><font>"/tests/malformed.csv"</font><font>);

malformedDF.show();
</font>

Zeppelin output:

schema: org.apache.spark.sql.types.StructType = StructType(StructField(age,IntegerType,false), StructField(country,StringType,false), StructField(name,StringType,false))
malformedDF: org.apache.spark.sql.DataFrame = [age: <b>int</b>, country: string ... 1 more field]
+---+-------+-----+
|age|country| name|
+---+-------+-----+
| 50|    USA| Mark|
| 30|    AUS|Cathy|
+---+-------+-----+

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

素数之恋

素数之恋

(美)约翰·德比希尔 / 陈为蓬 / 上海科技教育出版社 / 2008-12-01 / 34.00元

1859年8月,没什么名气的32岁数学家黎曼向柏林科学院提交了一篇论文,题为“论小于一个给定值的素数的个数”。在这篇论文的中间部分,黎曼作了一个附带的备注——一个猜测,一个假设。他向那天被召集来审查论文的数学家们抛出的这个问题,结果在随后的年代里给无数的学者产生了近乎残酷的压力。时至今日,在经历了150年的认真研究和极力探索后,这个问题仍然悬而未决。这个假设成立还是不成立? 已经越来越清楚,......一起来看看 《素数之恋》 这本书的介绍吧!

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具