内容简介:1. registerTempTable 与createOrReplaceTempView以下面数据JSON为案例people.json:输入:
1. registerTempTable 与createOrReplaceTempView
以下面数据JSON为案例people.json:
{
<font>"name"</font><font>: </font><font>"John"</font><font>,
</font><font>"age"</font><font>: </font><font>"28"</font><font>,
</font><font>"country"</font><font>:
</font><font>"UK"</font><font>
}
{
</font><font>"name"</font><font>: </font><font>"Cathy"</font><font>,
</font><font>"age"</font><font>: </font><font>"30"</font><font>,
</font><font>"country"</font><font>: </font><font>"AUS"</font><font>
}
{
</font><font>"name"</font><font>: </font><font>"Mark"</font><font>,
</font><font>"age"</font><font>: </font><font>"50"</font><font>,
</font><font>"country"</font><font>: </font><font>"USA"</font><font>
}
</font>
输入:
%spark2 val peopleJsonDF = sqlContext.read.format(<font>"json"</font><font>).load(</font><font>"/tests/people.json"</font><font>); peopleJsonDF.createOrReplaceTempView(</font><font>"jsonPeople"</font><font>); sqlContext.sql(</font><font>"select * from jsonPeople where age > 30"</font><font>).show(); </font>
Zeppelin output:
peopleJsonDF: org.apache.spark.sql.DataFrame = [age: string, country: string ... 1 more field] +---+-------+----+ |age|country|name| +---+-------+----+ | 50| USA|Mark| +---+-------+----+
结果:registerTempTable是Sparn 1.x语法,createOrReplaceTempView是Spark 2.x语法。两者都做同样的事情。
2. Apache Spark 2中的Spark-csv
任务:从JSON文件中获取数据,进行一些 排序 并将结果保存为CSV文件。Apache Spark 2在这里非常方便!
%spark2 val peopleJsonDF = sqlContext.read.option(<font>"multiline"</font><font>, <b>true</b>).format(</font><font>"json"</font><font>).load(</font><font>"/tests/people.json"</font><font>); peopleJsonDF.createOrReplaceTempView(</font><font>"jsonPeople"</font><font>); val orderedJsonDF = sqlContext.sql(</font><font>" select * from jsonPeople ORDER BY age DESC "</font><font>); orderedJsonDF.show(); orderedJsonDF.write.mode(</font><font>"overwrite"</font><font>).option(</font><font>"sep"</font><font>, </font><font>"|"</font><font>).option(</font><font>"header"</font><font>, <b>true</b>).format(</font><font>"csv"</font><font>).save(</font><font>"/tests/csvresult"</font><font>); </font>
Zeppelin output:
peopleJsonDF: org.apache.spark.sql.DataFrame = [age: string, country: string ... 1 more field] orderedJsonDF: org.apache.spark.sql.DataFrame = [age: string, country: string ... 1 more field] +---+-------+-----+ |age|country| name| +---+-------+-----+ | 50| USA| Mark| | 30| AUS|Cathy| | 28| UK| John| +---+-------+-----+
HDFS output:
[root@sandbox ~]# hdfs dfs -ls /tests/csvresult Found 5 items -rw-r--r-- 1 zeppelin hdfs 0 2018-09-04 17:14 /tests/csvresult/_SUCCESS -rw-r--r-- 1 zeppelin hdfs 29 2018-09-04 17:14 /tests/csvresult/part-00000-4f4204f7-636b-4f7c-a8b7-a9a8a4c395cf.csv -rw-r--r-- 1 zeppelin hdfs 30 2018-09-04 17:14 /tests/csvresult/part-00001-4f4204f7-636b-4f7c-a8b7-a9a8a4c395cf.csv -rw-r--r-- 1 zeppelin hdfs 28 2018-09-04 17:14 /tests/csvresult/part-00002-4f4204f7-636b-4f7c-a8b7-a9a8a4c395cf.csv -rw-r--r-- 1 zeppelin hdfs 0 2018-09-04 17:14 /tests/csvresult/part-00003-4f4204f7-636b-4f7c-a8b7-a9a8a4c395cf.csv
3.从CSV中推断模式并将结果保存到ORC中并通过Hive将其恢复
让我们将之前的结果'/ tests / csvresult'作为此任务的输入。
%spark2
<b>import</b> org.apache.spark.sql.hive.HiveContext;
<b>import</b> org.apache.spark.sql.types._;
<b>var</b> hiveContext = <b>new</b> HiveContext(sc);
<b>var</b> csvDF = sqlContext.read
.option(<font>"header"</font><font>, <b>true</b>)
.option(</font><font>"sep"</font><font>, </font><font>"|"</font><font>)
.option(</font><font>"inferSchema"</font><font>, <b>true</b>)
.format(</font><font>"csv"</font><font>).load(</font><font>"/tests/csvresult"</font><font>);
csvDF.createOrReplaceTempView(</font><font>"csvTable"</font><font>);
<b>var</b> sortedDF = sqlContext.sql(</font><font>"select age,country,name from csvTable order by age desc"</font><font>);
sortedDF.write.mode(</font><font>"overwrite"</font><font>).format(</font><font>"orc"</font><font>).save(</font><font>"/tests/orcresult"</font><font>);
hiveContext.sql(</font><font>" DROP TABLE IF EXISTS people "</font><font>);
hiveContext.sql(</font><font>"CREATE EXTERNAL TABLE people (age INT, country String, name String) "</font><font>+
</font><font>" STORED AS ORC "</font><font>+
</font><font>" LOCATION '/tests/orcresult'"</font><font>
);
hiveContext.sql(</font><font>"select * from people"</font><font>).show();
</font>
Zeppelin output:
<b>import</b> org.apache.spark.sql.hive.HiveContext <b>import</b> org.apache.spark.sql.types._ warning: there was one deprecation warning; re-run with -deprecation <b>for</b> details hiveContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@4fc938b8 csvDF: org.apache.spark.sql.DataFrame = [age: <b>int</b>, country: string ... 1 more field] sortedDF: org.apache.spark.sql.DataFrame = [age: <b>int</b>, country: string ... 1 more field] res135: org.apache.spark.sql.DataFrame = [] res136: org.apache.spark.sql.DataFrame = [] +---+-------+-----+ |age|country| name| +---+-------+-----+ | 50| USA| Mark| | 30| AUS|Cathy| | 28| UK| John| +---+-------+-----+
4.删除格式错误的记录
假设有文件malformed.csv:
50|USA|Mark 30|AUS|Cathy 28|UK
并且任务是删除格式错误的记录并创建新的DF:
%spark2
val schema = StructType(Array(
StructField(<font>"age"</font><font>, IntegerType, false),
StructField(</font><font>"country"</font><font>, StringType, false),
StructField(</font><font>"name"</font><font>, StringType, false)
));
val malformedDF = sqlContext.read.format(</font><font>"csv"</font><font>)
.schema(schema)
.option(</font><font>"mode"</font><font>, </font><font>"DROPMALFORMED"</font><font>)
.option(</font><font>"header"</font><font>, false)
.option(</font><font>"sep"</font><font>, </font><font>"|"</font><font>)
.load(</font><font>"/tests/malformed.csv"</font><font>);
malformedDF.show();
</font>
Zeppelin output:
schema: org.apache.spark.sql.types.StructType = StructType(StructField(age,IntegerType,false), StructField(country,StringType,false), StructField(name,StringType,false)) malformedDF: org.apache.spark.sql.DataFrame = [age: <b>int</b>, country: string ... 1 more field] +---+-------+-----+ |age|country| name| +---+-------+-----+ | 50| USA| Mark| | 30| AUS|Cathy| +---+-------+-----+
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- Terraform使用案例
- 【Git】之Git使用案例
- 结合案例使用 Java 注解和反射
- thinkphp Hook行为的使用案例
- scala匹配案例使用:: for lists
- redis使用场景和java测试案例
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Web2.0策划指南
艾美 / 2009-11 / 32.00元
《Web2.0策划指南(影印版)》是讲述战略的。书中的示例关注的是Web 20的效率,而不聚焦于技术。你将了解到这样一个事实:创建Web 20业务或将Web 20战略整合到业务中,意味着创建一个吸引人们前来访问的在线站点,让人们愿意到这里来共享他们的思想、见闻和行动。当人们通过Web走到一起时,可能得到总体远远大于各部分和的结果。随着传统的“口碑传诵”助推站点高速成长,客户本身就能够帮助建立站点。......一起来看看 《Web2.0策划指南》 这本书的介绍吧!