Spark 从文本创建DataFrame的3种方式

栏目: 服务器 · 发布时间: 7年前

内容简介:Spark Sql的核心对象DataFrame提供了Sql查询的能力,极大的方便了数据的查询和统计。如果要对DataFrame进行Sql查询,需要指定Scheme信息。而文本文件往往缺少Scheme信息,这篇文章将演示3种从文本文件创建DataFrame,并设置其Scheme的方式。本文用来做示范的文件,是一个简化的手机端的事件日志,以“|”分隔,其数据格式为:一般我们在代码里习惯用英文,翻译一下就是:

Spark Sql的核心对象DataFrame提供了 Sql 查询的能力,极大的方便了数据的查询和统计。如果要对DataFrame进行Sql查询,需要指定Scheme信息。而文本文件往往缺少Scheme信息,这篇文章将演示3种从文本文件创建DataFrame,并设置其Scheme的方式。

范例文件介绍

本文用来做示范的文件,是一个简化的手机端的事件日志,以“|”分隔,其数据格式为:

客户端ip|事件时间|事件标识|机器码|操作系统|设备型号

一般我们在代码里习惯用英文,翻译一下就是:

ip|date|event|machine|os|device

事件日志的名称为 events.log,内容如下:

49.77.134.137|1536544453|touxiang_1267|868385038260432|android 7.1.1 R11s_11_A.25_180706|OPPO R11s
223.104.145.59|1536544452|icon_frame_314|865902037967669|android 7.1.1 NMF26F|MI MAX 2
222.93.202.10|1536544344|btn_speak_30|867701035125231|android 7.1.1 NMF26X release-keys|vivo X20Plus A
222.93.202.10|1536544346|img_sound_293|867701035125231|android 7.1.1 NMF26X release-keys|vivo X20Plus A
222.93.202.10|1536544437|btn_hui_fu_1997|867701035125231|android 7.1.1 NMF26X release-keys|vivo X20Plus A
117.91.166.96|1536544419|btn_speak_30|864088038729653|android 6.0.1 R9sPlus_11_A.11_180515|OPPO R9s Plus
117.91.166.96|1536544426|btn_speak_30|864088038729653|android 6.0.1 R9sPlus_11_A.11_180515|OPPO R9s Plus
117.91.166.96|1536544433|btn_speak_30|864088038729653|android 6.0.1 R9sPlus_11_A.11_180515|OPPO R9s Plus

先将这个文件从当前目录(~/downloads/events.log)上传到hdfs的/test文件夹下,方便后续测试:

# hdfs dfs -put ~/downloads/events.log /test

方式1:从RDD创建

第一种方式先创建RDD,然后将RDD的行转换为Row对象,再根据Row去推断Schema。也可以转换为元组,再明确指定Schema。

# coding=utf-8
from pyspark.sql import SparkSession
from pyspark import Row
from pyspark.sql import types as T

spark = SparkSession.builder\
    .master("local[*]")\
    .appName("test.dataframe")\
    .getOrCreate()

sc = spark.sparkContext

linesRDD = sc.textFile("/test/events.log")
partsRDD = linesRDD.map(lambda x: x.split("|"))

# 方法1:创建ROW对象,根据ROW来创建Schema,自动推断字段类型
rowsRDD = partsRDD.map(lambda x:Row(ip=x[0],date=int(x[1]),event=x[2], machine=x[3], os=x[4], device=x[5]))
df = spark.createDataFrame(rowsRDD)

# 方法2:通过Schema字符串创建
# rowsRDD = partsRDD.map(lambda x:(x[0], int(x[1]), x[2], x[3], x[4], x[5]))
# schema = "ip: string, date: long, event: string, machine: string, os: string, device: string"
# df = spark.createDataFrame(rowsRDD, schema)

# 方法3:通过StructType对象创建
# rowsRDD = partsRDD.map(lambda x:(x[0], int(x[1]), x[2], x[3], x[4], x[5]))
# schema = T.StructType([
#     T.StructField("ip", T.StringType(), True),
#     T.StructField("date", T.LongType(), False),
#     T.StructField("event", T.StringType()),
#     T.StructField("machine", T.StringType(), True),
#     T.StructField("os", T.StringType(), True),
#     T.StructField("device", T.StringType(), True),
# ])
# df = spark.createDataFrame(rowsRDD, schema)

df.printSchema()
df.createOrReplaceTempView("events")
df = spark.sql("select * from events where device like '%OPPO%'")
df.show()

print "success"

这里采用了三种给RDD赋予Scheme的方法。最复杂(繁琐)的是第3种,但是第3种可以指定Field是否可以为Null。

结果如下(3种方法的结果大同小异):

# $SPARK_HOME/bin/spark-submit /data/pyjobs/sample/dataframe-rdd.py
root
 |-- ip: string (nullable = true)
 |-- date: long (nullable = true)
 |-- event: string (nullable = true)
 |-- machine: string (nullable = true)
 |-- os: string (nullable = true)
 |-- device: string (nullable = true)

+-------------+----------+-------------+---------------+--------------------+-------------+
|           ip|      date|        event|        machine|                  os|       device|
+-------------+----------+-------------+---------------+--------------------+-------------+
|49.77.134.137|1536544453|touxiang_1267|868385038260432|android 7.1.1 R11...|    OPPO R11s|
|117.91.166.96|1536544419| btn_speak_30|864088038729653|android 6.0.1 R9s...|OPPO R9s Plus|
|117.91.166.96|1536544426| btn_speak_30|864088038729653|android 6.0.1 R9s...|OPPO R9s Plus|
|117.91.166.96|1536544433| btn_speak_30|864088038729653|android 6.0.1 R9s...|OPPO R9s Plus|
+-------------+----------+-------------+---------------+--------------------+-------------+

success

方式2:从DataFrame转换

从DataFrame转换,就是SparkSession的read方法,直接将文件读取为DataFrame,此时的Schema就一个string类型的字段,默认名为value,接着再对此DataFrame进行转换。

需要注意:pyspark.sql.functions.split函数,其接收的是一个正则表达式,而“|”在正则表达式中是一个特殊字符,因此需要用“\”进行转义。

# coding=utf-8
from pyspark.sql import SparkSession
from pyspark.sql import types as T
from pyspark.sql import functions as F

spark = SparkSession.builder\
    .master("local[*]")\
    .appName("test.dataframe")\
    .getOrCreate()

# step1. 原始数据
df = spark.read.text("/test/events.log")
df.printSchema()
df.show()

# step2. 转成了数组
df = df.select(F.split(df["value"], '\|').alias("cols"))
df.printSchema()
df.show()

# step3. 将数组字段转成了列
cols = df["cols"]
df = df.withColumn("ip", cols.getItem(0)) \
    .withColumn("date", cols.getItem(1).cast(T.LongType())) \
    .withColumn("event", cols.getItem(2)) \
    .withColumn("machine", cols.getItem(3)) \
    .withColumn("os", cols.getItem(4)) \
    .withColumn("device", cols.getItem(5)) \
    .drop("cols")

df.printSchema()
df.createOrReplaceTempView("events")
df = spark.sql("select * from events where device like '%OPPO%'")
df.show()

print "success"

输出的结果如下:

# $SPARK_HOME/bin/spark-submit /data/pyjobs/sample/dataframe-df.py
root
 |-- value: string (nullable = true)

+--------------------+
|               value|
+--------------------+
|49.77.134.137|153...|
|223.104.145.59|15...|
|222.93.202.10|153...|
|222.93.202.10|153...|
|222.93.202.10|153...|
|117.91.166.96|153...|
|117.91.166.96|153...|
|117.91.166.96|153...|
+--------------------+

root
 |-- cols: array (nullable = true)
 |    |-- element: string (containsNull = true)

+--------------------+
|                cols|
+--------------------+
|[49.77.134.137, 1...|
|[223.104.145.59, ...|
|[222.93.202.10, 1...|
|[222.93.202.10, 1...|
|[222.93.202.10, 1...|
|[117.91.166.96, 1...|
|[117.91.166.96, 1...|
|[117.91.166.96, 1...|
+--------------------+

root
 |-- ip: string (nullable = true)
 |-- date: long (nullable = true)
 |-- event: string (nullable = true)
 |-- machine: string (nullable = true)
 |-- os: string (nullable = true)
 |-- device: string (nullable = true)

+-------------+----------+-------------+---------------+--------------------+-------------+
|           ip|      date|        event|        machine|                  os|       device|
+-------------+----------+-------------+---------------+--------------------+-------------+
|49.77.134.137|1536544453|touxiang_1267|868385038260432|android 7.1.1 R11...|    OPPO R11s|
|117.91.166.96|1536544419| btn_speak_30|864088038729653|android 6.0.1 R9s...|OPPO R9s Plus|
|117.91.166.96|1536544426| btn_speak_30|864088038729653|android 6.0.1 R9s...|OPPO R9s Plus|
|117.91.166.96|1536544433| btn_speak_30|864088038729653|android 6.0.1 R9s...|OPPO R9s Plus|
+-------------+----------+-------------+---------------+--------------------+-------------+

success

方式3:使用csv方法

最后一种就是使用csv方法了,可以直接指定分隔符和schema:

需要注意:这里的schema字符串格式,和SparkSession.CreateDataFrame方法中接收的schema字符串格式略有差异,少了“:”。

# coding=utf-8
from pyspark.sql import SparkSession

spark = SparkSession.builder\
    .master("local[*]")\
    .appName("test.dataframe")\
    .getOrCreate()

schema = "ip string, date long, event string, machine string, os string, device string"
df = spark.read.csv("/test/events.log", sep="|", schema=schema)

df.printSchema()
df.createOrReplaceTempView("events")
df = spark.sql("select * from events where device like '%OPPO%'")
df.show()

print "success"

其运行结果和方式1是类似的,就不再演示了。

总结

不是后缀名是csv的文件就是csv文件,csv文件本意是以逗号分隔的文件,扩展一下,就是以某字符进行分隔的文件都可以视为csv文件。因此,对于上面的事件日志文件,也可以视为csv文件,那么显然,使用上面的方式3是最方便快捷的了。我本人最开始用的是方式1,兜兜转转才发现了方式3,文档还是看得不够仔细呀。总的来说,Spark的文档在几个开源大数据项目中,算是最优秀的一个了。有时间就看看说不定就会有新的收获。

感谢阅读,希望这篇文章能给你带来帮助!

Spark 从文本创建DataFrame的3种方式


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

The Practice of Programming

The Practice of Programming

Brian W. Kernighan、Rob Pike / Addison-Wesley / 1999-2-14 / USD 49.99

With the same insight and authority that made their book The Unix Programming Environment a classic, Brian Kernighan and Rob Pike have written The Practice of Programming to help make individual progr......一起来看看 《The Practice of Programming》 这本书的介绍吧!

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具