内容简介:近期在做Spark Streaming方面的测试,从Kafka中实时取数据。此时接收到的数据是一段json数组形式的字符串,那么就需要将其解析为DataFrame的行数据,以方便进行实时运算。下面的代码示例演示了如何实现这个功能,因为比较简单,就多作说明了。假设初始的字符串是:[{"a":1,"b":2},{"a":3,"b":4},{"a":5,"b":6},{"a":7,"b":8}],最终将其解析为下面的DataFrame:df.show()只显示了struct的字段值,没有显示字段名称,并且用[]
近期在做Spark Streaming方面的测试,从Kafka中实时取数据。此时接收到的数据是一段json数组形式的字符串,那么就需要将其解析为DataFrame的行数据,以方便进行实时运算。下面的代码示例演示了如何实现这个功能,因为比较简单,就多作说明了。
假设初始的字符串是:[{"a":1,"b":2},{"a":3,"b":4},{"a":5,"b":6},{"a":7,"b":8}],最终将其解析为下面的DataFrame:
a | b |
---|---|
1 | 2 |
3 | 4 |
5 | 6 |
7 | 8 |
# coding=utf-8 from pyspark.sql import SparkSession from pyspark.sql import types as T from pyspark.sql import functions as F spark = SparkSession.builder\ .master("local[*]")\ .appName("test.dataframe")\ .getOrCreate() # 第1步,加载数据,默认为字符串类型的单列,列名为value data = ['[{"a":1,"b":2},{"a":3,"b":4},{"a":5,"b":6},{"a":7,"b":8}]'] df = spark.createDataFrame(data, T.StringType()) df.printSchema() df.show() schema = T.ArrayType(T.StructType([ T.StructField("a", T.IntegerType()), T.StructField("b", T.IntegerType()) ])) # 第2步,将列转为数组类型 df = df.select(F.from_json(df["value"], schema).alias("json")) df.printSchema() df.show() # 第3步,将列转为Struct类型 df = df.select(F.explode(df["json"]).alias("col")); df.printSchema() df.show() # 第4步,对Struct进行拆分 col = df["col"] df = df.withColumn("a", col["a"]) \ .withColumn("b", col["b"]) \ .drop("col") df.printSchema() df.show() print "success"
df.show()只显示了struct的字段值,没有显示字段名称,并且用[],而不是{}来代表Struct,所以看上去像是一个数组
其执行结果如下:
# $SPARK_HOME/bin/spark-submit /data/pyjobs/sample/dataframe-json.py root |-- value: string (nullable = true) +--------------------+ | value| +--------------------+ |[{"a":1,"b":2},{"...| +--------------------+ root |-- json: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- a: integer (nullable = true) | | |-- b: integer (nullable = true) +--------------------+ | json| +--------------------+ |[[1, 2], [3, 4], ...| +--------------------+ root |-- col: struct (nullable = true) | |-- a: integer (nullable = true) | |-- b: integer (nullable = true) +------+ | col| +------+ |[1, 2]| |[3, 4]| |[5, 6]| |[7, 8]| +------+ root |-- a: integer (nullable = true) |-- b: integer (nullable = true) +---+---+ | a| b| +---+---+ | 1| 2| | 3| 4| | 5| 6| | 7| 8| +---+---+ success
感谢阅读,希望这篇文章能给你带来帮助!
以上所述就是小编给大家介绍的《Spark DataFrame解析json字符串》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- 查找一个字符串中最长不含重复字符的子字符串,计算该最长子字符串的长度
- 字符串、字符处理总结
- 高频算法面试题(字符串)leetcode 387. 字符串中的第一个唯一字符
- php删除字符串最后一个字符
- (三)C语言之字符串与字符串函数
- 算法笔记字符串处理问题H:编排字符串(2064)
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。