Spark DataFrame解析json字符串

栏目: 服务器 · 发布时间: 6年前

内容简介:近期在做Spark Streaming方面的测试,从Kafka中实时取数据。此时接收到的数据是一段json数组形式的字符串,那么就需要将其解析为DataFrame的行数据,以方便进行实时运算。下面的代码示例演示了如何实现这个功能,因为比较简单,就多作说明了。假设初始的字符串是:[{"a":1,"b":2},{"a":3,"b":4},{"a":5,"b":6},{"a":7,"b":8}],最终将其解析为下面的DataFrame:df.show()只显示了struct的字段值,没有显示字段名称,并且用[]

近期在做Spark Streaming方面的测试,从Kafka中实时取数据。此时接收到的数据是一段json数组形式的字符串,那么就需要将其解析为DataFrame的行数据,以方便进行实时运算。下面的代码示例演示了如何实现这个功能,因为比较简单,就多作说明了。

假设初始的字符串是:[{"a":1,"b":2},{"a":3,"b":4},{"a":5,"b":6},{"a":7,"b":8}],最终将其解析为下面的DataFrame:

a b
1 2
3 4
5 6
7 8
# coding=utf-8
from pyspark.sql import SparkSession
from pyspark.sql import types as T
from pyspark.sql import functions as F

spark = SparkSession.builder\
    .master("local[*]")\
    .appName("test.dataframe")\
    .getOrCreate()

# 第1步,加载数据,默认为字符串类型的单列,列名为value
data = ['[{"a":1,"b":2},{"a":3,"b":4},{"a":5,"b":6},{"a":7,"b":8}]']
df = spark.createDataFrame(data, T.StringType())
df.printSchema()
df.show()

schema = T.ArrayType(T.StructType([
    T.StructField("a", T.IntegerType()),
    T.StructField("b", T.IntegerType())
]))

# 第2步,将列转为数组类型
df = df.select(F.from_json(df["value"], schema).alias("json"))
df.printSchema()
df.show()

# 第3步,将列转为Struct类型
df = df.select(F.explode(df["json"]).alias("col"));
df.printSchema()
df.show()

# 第4步,对Struct进行拆分
col = df["col"]
df = df.withColumn("a", col["a"]) \
    .withColumn("b", col["b"]) \
    .drop("col")
df.printSchema()
df.show()

print "success"

df.show()只显示了struct的字段值,没有显示字段名称,并且用[],而不是{}来代表Struct,所以看上去像是一个数组

其执行结果如下:

# $SPARK_HOME/bin/spark-submit /data/pyjobs/sample/dataframe-json.py
root
 |-- value: string (nullable = true)

+--------------------+
|               value|
+--------------------+
|[{"a":1,"b":2},{"...|
+--------------------+

root
 |-- json: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a: integer (nullable = true)
 |    |    |-- b: integer (nullable = true)

+--------------------+
|                json|
+--------------------+
|[[1, 2], [3, 4], ...|
+--------------------+

root
 |-- col: struct (nullable = true)
 |    |-- a: integer (nullable = true)
 |    |-- b: integer (nullable = true)

+------+
|   col|
+------+
|[1, 2]|
|[3, 4]|
|[5, 6]|
|[7, 8]|
+------+

root
 |-- a: integer (nullable = true)
 |-- b: integer (nullable = true)

+---+---+
|  a|  b|
+---+---+
|  1|  2|
|  3|  4|
|  5|  6|
|  7|  8|
+---+---+

success

感谢阅读,希望这篇文章能给你带来帮助!

Spark DataFrame解析json字符串


以上所述就是小编给大家介绍的《Spark DataFrame解析json字符串》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

数据驱动设计

数据驱动设计

[美]罗谢尔·肯(RochelleKing)、[美]伊丽莎白F.邱吉尔(Elizabeth F Churchill)、Caitlin Tan / 傅婕 / 机械工业出版社 / 2018-8 / 69.00元

本书旨在帮你了解数据引导设计的基本原则,了解数据与设计流程整合的价值,避免常见的陷阱与误区。本书重点关注定量实验与A/B测试,因为我们发现,数据分析与设计实践在此鲜有交集,但相对的潜在价值与机会缺大。本书提供了一些关于在组织中开展数据实践的观点。通过阅读这本书,你将转变你的团队的工作方式,从数据中获得大收益。后希望你可以在衡量指标的选择、佳展示方式与展示时机、测试以及设计意图增强方面,自信地表达自......一起来看看 《数据驱动设计》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

在线进制转换器
在线进制转换器

各进制数互转换器

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码