spark 创建 dataframe

栏目: Scala · 发布时间: 7年前

内容简介:dataframe类似于关系型数据库的表,从dataframe中查询数据,需要调用api来实现,到目前为止spark支持的语言scala,java,r,python。4,数据的追加

dataframe类似于关系型数据库的表,从dataframe中查询数据,需要调用api来实现,到目前为止spark支持的语言scala,java,r,python。

一,启动spark-shell

spark-shell --master yarn

二,创建SparkSession

scala> import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession

scala> val spark = SparkSession.builder().appName("Spark SQL basic example").enableHiveSupport().getOrCreate()
2019-01-04 11:44:21 WARN SparkSession$Builder:66 - Using an existing SparkSession; some configuration may not take effect.
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@2565b097

三,toDF方法创建dataframe

1,将seq序列转换成dataframe

scala> import spark.implicits._
import spark.implicits._

scala> val df = Seq(
 | (1, "tank", 25),
 | (2, "zhang", 26)
 | ).toDF("id", "name", "age")
df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

scala> df.show()
+---+-----+---+
| id| name|age|
+---+-----+---+
| 1| tank| 25|
| 2|zhang| 26|
+---+-----+---+

2,列表转换成dataframe

scala> val array = List((1, "tank1", 25),(2, "tank2", 26),(3, "tank3", 29))
array: List[(Int, String, Int)] = List((1,tank1,25), (2,tank2,26), (3,tank3,29))

scala> val df1 = array.toDF("id", "name", "age")
df1: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

scala> df1.show()
+---+-----+---+
| id| name|age|
+---+-----+---+
| 1|tank1| 25|
| 2|tank2| 26|
| 3|tank3| 29|
+---+-----+---+

四,通过createDataFrame创建dataframe

scala> val schema = StructType(List(
 | StructField("id", IntegerType, nullable = false),
 | StructField("name", StringType, nullable = true),
 | StructField("age", IntegerType, nullable = true),
 | StructField("birthday", DateType, nullable = true)
 | ))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,false), StructField(name,StringType,true), StructField(age,IntegerType,true), StructField(birthday,DateType,true))

scala> val rdd = spark.sparkContext.parallelize(Seq(
 | Row(1, "tank1", 25, java.sql.Date.valueOf("1982-07-07")),
 | Row(2, "zhang", 26, java.sql.Date.valueOf("1983-02-19"))
 | ))
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = ParallelCollectionRDD[107] at parallelize at <console>:39

scala> val df2 = spark.createDataFrame(rdd, schema)
df2: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]

scala> df2.show()
+---+-----+---+----------+
| id| name|age| birthday|
+---+-----+---+----------+
| 1|tank1| 25|1982-07-06|
| 2|zhang| 26|1983-02-18|
+---+-----+---+----------+

创建的dataframe,以及里面的数据,是没有落地的

五,存储数据到hdfs

1,加载存储包

scala> import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SaveMode

2,不分区存储

scala> df.write.parquet("hdfs://bigserver1:9000/test/spark/tank");

scala> var test = spark.read.load("hdfs://bigserver1:9000/test/spark/tank");
test: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

scala> test.show();
+---+-----+---+
| id| name|age|
+---+-----+---+
| 2|zhang| 26|
| 1| tank| 25|
+---+-----+---+
spark 创建 dataframe

spark dataframe 不分区

3,根据id分区存储

scala> df2.write.partitionBy("id").mode(SaveMode.Overwrite).parquet("hdfs://bigserver1:9000/test/spark/tank1");

scala> var test1 = spark.read.load("hdfs://bigserver1:9000/test/spark/tank1");
test1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 2 more fields]

scala> test1.show();
+-----+---+----------+---+
| name|age| birthday| id|
+-----+---+----------+---+
|zhang| 26|1983-02-18| 2|
|tank1| 25|1982-07-06| 1|
+-----+---+----------+---+
spark 创建 dataframe

spark dataframe 分区存储

4,数据的追加

scala> df1.write.mode(SaveMode.Append).parquet("hdfs://bigserver1:9000/test/spark/tank");

scala> var test2 = spark.read.load("hdfs://bigserver1:9000/test/spark/tank");
test2: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

scala> test2.show();
+---+-----+---+
| id| name|age|
+---+-----+---+
| 1|tank1| 25|
| 2|zhang| 26|
| 1| tank| 25|
| 2|tank2| 26|
| 3|tank3| 29|
+---+-----+---+

5,数据的修改,分区的dataframe才有效

scala> test1.show();
+-----+---+----------+---+
| name|age| birthday| id|
+-----+---+----------+---+
|zhang| 26|1983-02-18| 2|
|tank1| 25|1982-07-06| 1|
+-----+---+----------+---+

scala> val df4 = Seq(
 | (2, "zhangying", 37, java.sql.Date.valueOf("1986-11-11"))
 | ).toDF("id", "name", "age","birthday")
df4: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]

scala> df4.write.mode(SaveMode.Overwrite).parquet("hdfs://bigserver1:9000/test/spark/tank1/id=2");

scala> var test1 = spark.read.load("hdfs://bigserver1:9000/test/spark/tank1")
test1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 2 more fields]

scala> test1.show();
+---------+---+----------+---+
| name|age| birthday| id|
+---------+---+----------+---+
|zhangying| 37|1986-11-11| 2|
| tank1| 25|1982-07-06| 1|
+---------+---+----------+---+

对数据处理操作非常的多,这里只是简单介绍。这种数据存取的方式,实时性差


以上所述就是小编给大家介绍的《spark 创建 dataframe》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Practical JavaScript, DOM Scripting and Ajax Projects

Practical JavaScript, DOM Scripting and Ajax Projects

Frank Zammetti / Apress / April 16, 2007 / $44.99

http://www.amazon.com/exec/obidos/tg/detail/-/1590598164/ Book Description Practical JavaScript, DOM, and Ajax Projects is ideal for web developers already experienced in JavaScript who want to ......一起来看看 《Practical JavaScript, DOM Scripting and Ajax Projects》 这本书的介绍吧!

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

URL 编码/解码
URL 编码/解码

URL 编码/解码

SHA 加密
SHA 加密

SHA 加密工具