内容简介:Spark Sql的核心对象DataFrame提供了Sql查询的能力,极大的方便了数据的查询和统计。如果要对DataFrame进行Sql查询,需要指定Scheme信息。而文本文件往往缺少Scheme信息,这篇文章将演示3种从文本文件创建DataFrame,并设置其Scheme的方式。本文用来做示范的文件,是一个简化的手机端的事件日志,以“|”分隔,其数据格式为:一般我们在代码里习惯用英文,翻译一下就是:
Spark Sql的核心对象DataFrame提供了 Sql 查询的能力,极大的方便了数据的查询和统计。如果要对DataFrame进行Sql查询,需要指定Scheme信息。而文本文件往往缺少Scheme信息,这篇文章将演示3种从文本文件创建DataFrame,并设置其Scheme的方式。
范例文件介绍
本文用来做示范的文件,是一个简化的手机端的事件日志,以“|”分隔,其数据格式为:
客户端ip|事件时间|事件标识|机器码|操作系统|设备型号
一般我们在代码里习惯用英文,翻译一下就是:
ip|date|event|machine|os|device
事件日志的名称为 events.log,内容如下:
49.77.134.137|1536544453|touxiang_1267|868385038260432|android 7.1.1 R11s_11_A.25_180706|OPPO R11s 223.104.145.59|1536544452|icon_frame_314|865902037967669|android 7.1.1 NMF26F|MI MAX 2 222.93.202.10|1536544344|btn_speak_30|867701035125231|android 7.1.1 NMF26X release-keys|vivo X20Plus A 222.93.202.10|1536544346|img_sound_293|867701035125231|android 7.1.1 NMF26X release-keys|vivo X20Plus A 222.93.202.10|1536544437|btn_hui_fu_1997|867701035125231|android 7.1.1 NMF26X release-keys|vivo X20Plus A 117.91.166.96|1536544419|btn_speak_30|864088038729653|android 6.0.1 R9sPlus_11_A.11_180515|OPPO R9s Plus 117.91.166.96|1536544426|btn_speak_30|864088038729653|android 6.0.1 R9sPlus_11_A.11_180515|OPPO R9s Plus 117.91.166.96|1536544433|btn_speak_30|864088038729653|android 6.0.1 R9sPlus_11_A.11_180515|OPPO R9s Plus
先将这个文件从当前目录(~/downloads/events.log)上传到hdfs的/test文件夹下,方便后续测试:
# hdfs dfs -put ~/downloads/events.log /test
方式1:从RDD创建
第一种方式先创建RDD,然后将RDD的行转换为Row对象,再根据Row去推断Schema。也可以转换为元组,再明确指定Schema。
# coding=utf-8
from pyspark.sql import SparkSession
from pyspark import Row
from pyspark.sql import types as T
spark = SparkSession.builder\
.master("local[*]")\
.appName("test.dataframe")\
.getOrCreate()
sc = spark.sparkContext
linesRDD = sc.textFile("/test/events.log")
partsRDD = linesRDD.map(lambda x: x.split("|"))
# 方法1:创建ROW对象,根据ROW来创建Schema,自动推断字段类型
rowsRDD = partsRDD.map(lambda x:Row(ip=x[0],date=int(x[1]),event=x[2], machine=x[3], os=x[4], device=x[5]))
df = spark.createDataFrame(rowsRDD)
# 方法2:通过Schema字符串创建
# rowsRDD = partsRDD.map(lambda x:(x[0], int(x[1]), x[2], x[3], x[4], x[5]))
# schema = "ip: string, date: long, event: string, machine: string, os: string, device: string"
# df = spark.createDataFrame(rowsRDD, schema)
# 方法3:通过StructType对象创建
# rowsRDD = partsRDD.map(lambda x:(x[0], int(x[1]), x[2], x[3], x[4], x[5]))
# schema = T.StructType([
# T.StructField("ip", T.StringType(), True),
# T.StructField("date", T.LongType(), False),
# T.StructField("event", T.StringType()),
# T.StructField("machine", T.StringType(), True),
# T.StructField("os", T.StringType(), True),
# T.StructField("device", T.StringType(), True),
# ])
# df = spark.createDataFrame(rowsRDD, schema)
df.printSchema()
df.createOrReplaceTempView("events")
df = spark.sql("select * from events where device like '%OPPO%'")
df.show()
print "success"
这里采用了三种给RDD赋予Scheme的方法。最复杂(繁琐)的是第3种,但是第3种可以指定Field是否可以为Null。
结果如下(3种方法的结果大同小异):
# $SPARK_HOME/bin/spark-submit /data/pyjobs/sample/dataframe-rdd.py root |-- ip: string (nullable = true) |-- date: long (nullable = true) |-- event: string (nullable = true) |-- machine: string (nullable = true) |-- os: string (nullable = true) |-- device: string (nullable = true) +-------------+----------+-------------+---------------+--------------------+-------------+ | ip| date| event| machine| os| device| +-------------+----------+-------------+---------------+--------------------+-------------+ |49.77.134.137|1536544453|touxiang_1267|868385038260432|android 7.1.1 R11...| OPPO R11s| |117.91.166.96|1536544419| btn_speak_30|864088038729653|android 6.0.1 R9s...|OPPO R9s Plus| |117.91.166.96|1536544426| btn_speak_30|864088038729653|android 6.0.1 R9s...|OPPO R9s Plus| |117.91.166.96|1536544433| btn_speak_30|864088038729653|android 6.0.1 R9s...|OPPO R9s Plus| +-------------+----------+-------------+---------------+--------------------+-------------+ success
方式2:从DataFrame转换
从DataFrame转换,就是SparkSession的read方法,直接将文件读取为DataFrame,此时的Schema就一个string类型的字段,默认名为value,接着再对此DataFrame进行转换。
需要注意:pyspark.sql.functions.split函数,其接收的是一个正则表达式,而“|”在正则表达式中是一个特殊字符,因此需要用“\”进行转义。
# coding=utf-8
from pyspark.sql import SparkSession
from pyspark.sql import types as T
from pyspark.sql import functions as F
spark = SparkSession.builder\
.master("local[*]")\
.appName("test.dataframe")\
.getOrCreate()
# step1. 原始数据
df = spark.read.text("/test/events.log")
df.printSchema()
df.show()
# step2. 转成了数组
df = df.select(F.split(df["value"], '\|').alias("cols"))
df.printSchema()
df.show()
# step3. 将数组字段转成了列
cols = df["cols"]
df = df.withColumn("ip", cols.getItem(0)) \
.withColumn("date", cols.getItem(1).cast(T.LongType())) \
.withColumn("event", cols.getItem(2)) \
.withColumn("machine", cols.getItem(3)) \
.withColumn("os", cols.getItem(4)) \
.withColumn("device", cols.getItem(5)) \
.drop("cols")
df.printSchema()
df.createOrReplaceTempView("events")
df = spark.sql("select * from events where device like '%OPPO%'")
df.show()
print "success"
输出的结果如下:
# $SPARK_HOME/bin/spark-submit /data/pyjobs/sample/dataframe-df.py root |-- value: string (nullable = true) +--------------------+ | value| +--------------------+ |49.77.134.137|153...| |223.104.145.59|15...| |222.93.202.10|153...| |222.93.202.10|153...| |222.93.202.10|153...| |117.91.166.96|153...| |117.91.166.96|153...| |117.91.166.96|153...| +--------------------+ root |-- cols: array (nullable = true) | |-- element: string (containsNull = true) +--------------------+ | cols| +--------------------+ |[49.77.134.137, 1...| |[223.104.145.59, ...| |[222.93.202.10, 1...| |[222.93.202.10, 1...| |[222.93.202.10, 1...| |[117.91.166.96, 1...| |[117.91.166.96, 1...| |[117.91.166.96, 1...| +--------------------+ root |-- ip: string (nullable = true) |-- date: long (nullable = true) |-- event: string (nullable = true) |-- machine: string (nullable = true) |-- os: string (nullable = true) |-- device: string (nullable = true) +-------------+----------+-------------+---------------+--------------------+-------------+ | ip| date| event| machine| os| device| +-------------+----------+-------------+---------------+--------------------+-------------+ |49.77.134.137|1536544453|touxiang_1267|868385038260432|android 7.1.1 R11...| OPPO R11s| |117.91.166.96|1536544419| btn_speak_30|864088038729653|android 6.0.1 R9s...|OPPO R9s Plus| |117.91.166.96|1536544426| btn_speak_30|864088038729653|android 6.0.1 R9s...|OPPO R9s Plus| |117.91.166.96|1536544433| btn_speak_30|864088038729653|android 6.0.1 R9s...|OPPO R9s Plus| +-------------+----------+-------------+---------------+--------------------+-------------+ success
方式3:使用csv方法
最后一种就是使用csv方法了,可以直接指定分隔符和schema:
需要注意:这里的schema字符串格式,和SparkSession.CreateDataFrame方法中接收的schema字符串格式略有差异,少了“:”。
# coding=utf-8
from pyspark.sql import SparkSession
spark = SparkSession.builder\
.master("local[*]")\
.appName("test.dataframe")\
.getOrCreate()
schema = "ip string, date long, event string, machine string, os string, device string"
df = spark.read.csv("/test/events.log", sep="|", schema=schema)
df.printSchema()
df.createOrReplaceTempView("events")
df = spark.sql("select * from events where device like '%OPPO%'")
df.show()
print "success"
其运行结果和方式1是类似的,就不再演示了。
总结
不是后缀名是csv的文件就是csv文件,csv文件本意是以逗号分隔的文件,扩展一下,就是以某字符进行分隔的文件都可以视为csv文件。因此,对于上面的事件日志文件,也可以视为csv文件,那么显然,使用上面的方式3是最方便快捷的了。我本人最开始用的是方式1,兜兜转转才发现了方式3,文档还是看得不够仔细呀。总的来说,Spark的文档在几个开源大数据项目中,算是最优秀的一个了。有时间就看看说不定就会有新的收获。
感谢阅读,希望这篇文章能给你带来帮助!
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 游戏制作之路(28)创建界面文本显示
- 在Swift 5中从原始文本创建字符串
- 【PS-3D功能探索】如何在ps内创建膨胀的3D文本效果
- 基于标签特定文本表示的文本多标签分类
- 论文浅尝 | 通过文本到文本神经问题生成的机器理解
- 文本挖掘从小白到精通(三)---主题模型和文本数据转换
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
The Practice of Programming
Brian W. Kernighan、Rob Pike / Addison-Wesley / 1999-2-14 / USD 49.99
With the same insight and authority that made their book The Unix Programming Environment a classic, Brian Kernighan and Rob Pike have written The Practice of Programming to help make individual progr......一起来看看 《The Practice of Programming》 这本书的介绍吧!
XML 在线格式化
在线 XML 格式化压缩工具
HSV CMYK 转换工具
HSV CMYK互换工具