内容简介:dataframe类似于关系型数据库的表,从dataframe中查询数据,需要调用api来实现,到目前为止spark支持的语言scala,java,r,python。4,数据的追加
dataframe类似于关系型数据库的表,从dataframe中查询数据,需要调用api来实现,到目前为止spark支持的语言scala,java,r,python。
一,启动spark-shell
spark-shell --master yarn
二,创建SparkSession
scala> import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession scala> val spark = SparkSession.builder().appName("Spark SQL basic example").enableHiveSupport().getOrCreate() 2019-01-04 11:44:21 WARN SparkSession$Builder:66 - Using an existing SparkSession; some configuration may not take effect. spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@2565b097
三,toDF方法创建dataframe
1,将seq序列转换成dataframe
scala> import spark.implicits._ import spark.implicits._ scala> val df = Seq( | (1, "tank", 25), | (2, "zhang", 26) | ).toDF("id", "name", "age") df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field] scala> df.show() +---+-----+---+ | id| name|age| +---+-----+---+ | 1| tank| 25| | 2|zhang| 26| +---+-----+---+
2,列表转换成dataframe
scala> val array = List((1, "tank1", 25),(2, "tank2", 26),(3, "tank3", 29)) array: List[(Int, String, Int)] = List((1,tank1,25), (2,tank2,26), (3,tank3,29)) scala> val df1 = array.toDF("id", "name", "age") df1: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field] scala> df1.show() +---+-----+---+ | id| name|age| +---+-----+---+ | 1|tank1| 25| | 2|tank2| 26| | 3|tank3| 29| +---+-----+---+
四,通过createDataFrame创建dataframe
scala> val schema = StructType(List( | StructField("id", IntegerType, nullable = false), | StructField("name", StringType, nullable = true), | StructField("age", IntegerType, nullable = true), | StructField("birthday", DateType, nullable = true) | )) schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,false), StructField(name,StringType,true), StructField(age,IntegerType,true), StructField(birthday,DateType,true)) scala> val rdd = spark.sparkContext.parallelize(Seq( | Row(1, "tank1", 25, java.sql.Date.valueOf("1982-07-07")), | Row(2, "zhang", 26, java.sql.Date.valueOf("1983-02-19")) | )) rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = ParallelCollectionRDD[107] at parallelize at <console>:39 scala> val df2 = spark.createDataFrame(rdd, schema) df2: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields] scala> df2.show() +---+-----+---+----------+ | id| name|age| birthday| +---+-----+---+----------+ | 1|tank1| 25|1982-07-06| | 2|zhang| 26|1983-02-18| +---+-----+---+----------+
创建的dataframe,以及里面的数据,是没有落地的 。
五,存储数据到hdfs
1,加载存储包
scala> import org.apache.spark.sql.SaveMode import org.apache.spark.sql.SaveMode
2,不分区存储
scala> df.write.parquet("hdfs://bigserver1:9000/test/spark/tank"); scala> var test = spark.read.load("hdfs://bigserver1:9000/test/spark/tank"); test: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field] scala> test.show(); +---+-----+---+ | id| name|age| +---+-----+---+ | 2|zhang| 26| | 1| tank| 25| +---+-----+---+
spark dataframe 不分区
3,根据id分区存储
scala> df2.write.partitionBy("id").mode(SaveMode.Overwrite).parquet("hdfs://bigserver1:9000/test/spark/tank1"); scala> var test1 = spark.read.load("hdfs://bigserver1:9000/test/spark/tank1"); test1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 2 more fields] scala> test1.show(); +-----+---+----------+---+ | name|age| birthday| id| +-----+---+----------+---+ |zhang| 26|1983-02-18| 2| |tank1| 25|1982-07-06| 1| +-----+---+----------+---+
spark dataframe 分区存储
4,数据的追加
scala> df1.write.mode(SaveMode.Append).parquet("hdfs://bigserver1:9000/test/spark/tank"); scala> var test2 = spark.read.load("hdfs://bigserver1:9000/test/spark/tank"); test2: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field] scala> test2.show(); +---+-----+---+ | id| name|age| +---+-----+---+ | 1|tank1| 25| | 2|zhang| 26| | 1| tank| 25| | 2|tank2| 26| | 3|tank3| 29| +---+-----+---+
5,数据的修改,分区的dataframe才有效
scala> test1.show(); +-----+---+----------+---+ | name|age| birthday| id| +-----+---+----------+---+ |zhang| 26|1983-02-18| 2| |tank1| 25|1982-07-06| 1| +-----+---+----------+---+ scala> val df4 = Seq( | (2, "zhangying", 37, java.sql.Date.valueOf("1986-11-11")) | ).toDF("id", "name", "age","birthday") df4: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields] scala> df4.write.mode(SaveMode.Overwrite).parquet("hdfs://bigserver1:9000/test/spark/tank1/id=2"); scala> var test1 = spark.read.load("hdfs://bigserver1:9000/test/spark/tank1") test1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 2 more fields] scala> test1.show(); +---------+---+----------+---+ | name|age| birthday| id| +---------+---+----------+---+ |zhangying| 37|1986-11-11| 2| | tank1| 25|1982-07-06| 1| +---------+---+----------+---+
对数据处理操作非常的多,这里只是简单介绍。这种数据存取的方式,实时性差
以上所述就是小编给大家介绍的《spark 创建 dataframe》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
禅与摩托车维修艺术
(美)罗伯特·M.波西格 / 张国辰 / 重庆出版社 / 2011-9 / 36.00元
在一个炎热的夏天,父子两人和约翰夫妇骑摩托车从明尼苏达到加州,跨越美国大陆,旅行的过程与一个青年斐德洛研修科学技术与西方经典,寻求自我的解脱,以及探寻生命的意义的过程相互穿插。一路上父亲以一场哲学肖陶扩的形式,将见到的自然景色,野外露营的经历,夜晚旅店的谈话,机车修护技术等等日常生活与西方从苏格拉底以来的理性哲学的深入浅出的阐述与评论相结合,进行了对形而上学传统的主客体二元论的反思,以及对科学与艺......一起来看看 《禅与摩托车维修艺术》 这本书的介绍吧!