spark 创建 dataframe

栏目: Scala · 发布时间: 7年前

内容简介:dataframe类似于关系型数据库的表,从dataframe中查询数据,需要调用api来实现,到目前为止spark支持的语言scala,java,r,python。4,数据的追加

dataframe类似于关系型数据库的表,从dataframe中查询数据,需要调用api来实现,到目前为止spark支持的语言scala,java,r,python。

一,启动spark-shell

spark-shell --master yarn

二,创建SparkSession

scala> import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession

scala> val spark = SparkSession.builder().appName("Spark SQL basic example").enableHiveSupport().getOrCreate()
2019-01-04 11:44:21 WARN SparkSession$Builder:66 - Using an existing SparkSession; some configuration may not take effect.
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@2565b097

三,toDF方法创建dataframe

1,将seq序列转换成dataframe

scala> import spark.implicits._
import spark.implicits._

scala> val df = Seq(
 | (1, "tank", 25),
 | (2, "zhang", 26)
 | ).toDF("id", "name", "age")
df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

scala> df.show()
+---+-----+---+
| id| name|age|
+---+-----+---+
| 1| tank| 25|
| 2|zhang| 26|
+---+-----+---+

2,列表转换成dataframe

scala> val array = List((1, "tank1", 25),(2, "tank2", 26),(3, "tank3", 29))
array: List[(Int, String, Int)] = List((1,tank1,25), (2,tank2,26), (3,tank3,29))

scala> val df1 = array.toDF("id", "name", "age")
df1: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

scala> df1.show()
+---+-----+---+
| id| name|age|
+---+-----+---+
| 1|tank1| 25|
| 2|tank2| 26|
| 3|tank3| 29|
+---+-----+---+

四,通过createDataFrame创建dataframe

scala> val schema = StructType(List(
 | StructField("id", IntegerType, nullable = false),
 | StructField("name", StringType, nullable = true),
 | StructField("age", IntegerType, nullable = true),
 | StructField("birthday", DateType, nullable = true)
 | ))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,false), StructField(name,StringType,true), StructField(age,IntegerType,true), StructField(birthday,DateType,true))

scala> val rdd = spark.sparkContext.parallelize(Seq(
 | Row(1, "tank1", 25, java.sql.Date.valueOf("1982-07-07")),
 | Row(2, "zhang", 26, java.sql.Date.valueOf("1983-02-19"))
 | ))
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = ParallelCollectionRDD[107] at parallelize at <console>:39

scala> val df2 = spark.createDataFrame(rdd, schema)
df2: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]

scala> df2.show()
+---+-----+---+----------+
| id| name|age| birthday|
+---+-----+---+----------+
| 1|tank1| 25|1982-07-06|
| 2|zhang| 26|1983-02-18|
+---+-----+---+----------+

创建的dataframe,以及里面的数据,是没有落地的

五,存储数据到hdfs

1,加载存储包

scala> import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SaveMode

2,不分区存储

scala> df.write.parquet("hdfs://bigserver1:9000/test/spark/tank");

scala> var test = spark.read.load("hdfs://bigserver1:9000/test/spark/tank");
test: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

scala> test.show();
+---+-----+---+
| id| name|age|
+---+-----+---+
| 2|zhang| 26|
| 1| tank| 25|
+---+-----+---+
spark 创建 dataframe

spark dataframe 不分区

3,根据id分区存储

scala> df2.write.partitionBy("id").mode(SaveMode.Overwrite).parquet("hdfs://bigserver1:9000/test/spark/tank1");

scala> var test1 = spark.read.load("hdfs://bigserver1:9000/test/spark/tank1");
test1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 2 more fields]

scala> test1.show();
+-----+---+----------+---+
| name|age| birthday| id|
+-----+---+----------+---+
|zhang| 26|1983-02-18| 2|
|tank1| 25|1982-07-06| 1|
+-----+---+----------+---+
spark 创建 dataframe

spark dataframe 分区存储

4,数据的追加

scala> df1.write.mode(SaveMode.Append).parquet("hdfs://bigserver1:9000/test/spark/tank");

scala> var test2 = spark.read.load("hdfs://bigserver1:9000/test/spark/tank");
test2: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

scala> test2.show();
+---+-----+---+
| id| name|age|
+---+-----+---+
| 1|tank1| 25|
| 2|zhang| 26|
| 1| tank| 25|
| 2|tank2| 26|
| 3|tank3| 29|
+---+-----+---+

5,数据的修改,分区的dataframe才有效

scala> test1.show();
+-----+---+----------+---+
| name|age| birthday| id|
+-----+---+----------+---+
|zhang| 26|1983-02-18| 2|
|tank1| 25|1982-07-06| 1|
+-----+---+----------+---+

scala> val df4 = Seq(
 | (2, "zhangying", 37, java.sql.Date.valueOf("1986-11-11"))
 | ).toDF("id", "name", "age","birthday")
df4: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]

scala> df4.write.mode(SaveMode.Overwrite).parquet("hdfs://bigserver1:9000/test/spark/tank1/id=2");

scala> var test1 = spark.read.load("hdfs://bigserver1:9000/test/spark/tank1")
test1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 2 more fields]

scala> test1.show();
+---------+---+----------+---+
| name|age| birthday| id|
+---------+---+----------+---+
|zhangying| 37|1986-11-11| 2|
| tank1| 25|1982-07-06| 1|
+---------+---+----------+---+

对数据处理操作非常的多,这里只是简单介绍。这种数据存取的方式,实时性差


以上所述就是小编给大家介绍的《spark 创建 dataframe》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Mission Python

Mission Python

Sean McManus / No Starch Press / 2018-9-18 / GBP 24.99

Launch into coding with Mission Python, a space-themed guide to building a complete computer game in Python. You'll learn programming fundamentals like loops, strings, and lists as you build Escape!, ......一起来看看 《Mission Python》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具