内容简介:本文中,我们来看下用 Structured Streaming 怎么处理 kafka中的复杂json 数据流,Structured Streaming 强项我们都知道,可以使用 event-time 进行聚合,可以使用 watermark处理延迟数据, 保证 exactly-once, 可以输出各种外部系统,spark 和 kafka 一起可以让你放飞自我我们先来了解下 kafka, 然后举几个 Structured Streaming 读写 kafka的例子,然后看一个 真实使用场景。/ kaf
本文中,我们来看下用 Structured Streaming 怎么处理 kafka中的复杂json 数据流,Structured Streaming 强项我们都知道,可以使用 event-time 进行聚合,可以使用 watermark处理延迟数据, 保证 exactly-once, 可以输出各种外部系统,spark 和 kafka 一起可以让你放飞自我
-
可以让你流式处理和批处理的代码一样一样的
-
整合各种外部存储系统,比如 S3,HDFS, MySQL
-
对每个批次的增使用 Catalyst 优化器 进行优化
我们先来了解下 kafka, 然后举几个 Structured Streaming 读写 kafka的例子,然后看一个 真实使用场景。
/ kafka 了解一下?/
kafka 是现在最流行的一个分布式的实时流消息系统,给下游订阅消费系统提供了并行处理和可靠容错机制,现在大公司在流式数据的处理场景, kafka 基本是标配。
kafka 把生产者发送的数据放在不同的分区里面,这样就可以并行进行消费了。每个分区里面的数据都是递增有序的,跟 structured commit log 类似,生产者和消费者使用 kafka 进行解耦,消费者不管你生产者发送的速率如何,我只要按照我们的节奏进行消费就可以了。每条消息在一个分区里面都有一个唯一的序列号 offset, kafka 会对内部存储的消息设置一个过期时间,如果过期了,就会标记删除,不管这条消息有没有被消费。
kafka 消费策略
kafka 可以被看成一个无限的流,里面的流数据是短暂存在的,如果不消费,消息就过期滚动没了,这就涉及一个问题,就是你如果开始消费,就要定一下从什么位置开始。
-
earliest 从最起始位置开始消费,当然不一定是从 0 开始,因为如果数据过期了,就清掉了,所以可以理解为从现存的数据里最小位置开始消费。
-
latest 这个好理解,从最末位置开始消费。
-
per-partition assignment , 对每个分区都指定一个 offset, 然后从这组 offset 位置开始消费。
当你第一次开始消费一个 kafka 流的时候,上述策略任选其一,如果你之前已经消费了,而且做了 checkpoint ,比如你消费程序升级了, 这时候就会从你上次结束的位置开始继续消费。
/ Structured streaming 对kafka的支持 /
Structured Streaming 对 批处理 和流处理 提供统一的 API, 这一点在我看来是很重要的,我们一套相同的处理逻辑,没必要写出来两套代码,减少了维护成本。 Structured Streaming 从 kafka 拉取消息,然后就可以把 流数据看做 一个 DataFrame , 一张无限增长的大表,在这个大表上做查询,Structured Streaming 给你保证了端到端的 exactly-once,你只需要关心你的业务即可,不用费心去关心底层是怎么做的。
从 kafka topics 中读取消息
首先需要你指定你的数据源,kafka 集群的连接地址,你需要消费的 topic, 指定topic 的时候,可以使用正则来指定,也可以指定一个 topic 的集合,是不是很灵活?我们这里只消费一个 topic
这个例子里面我们指定消费 topic1, 在 spark 内部 DataStreamReader 会根据你的 配置来 拉取指定的 topic, kafka.bootstrap.servers (i.e. host:port) 地址 和 topic 这两个参数是必填的。startingOffsets 可以选填,默认是 从 latest 位置开始消费
df.printSchema() , 我们来打印一些 DataFrame 的 schema
我们可以看到,schema 里面包含了 kafka record 里面的key value 和 相关 元信息字段,到这时候,我们就可以使用 DataFrame 和 Dataset 的方式来进行转换处理了,有人问,如果 kafka 消息的 value 字节数组里面包含的 是 json 数据怎么办,我们在这里拿到的 key 和 value 都是 字节数组,需要解析一下格式,当然 Spark SQL 内嵌了一些类型处理方法了。
Data Stored as a UTF8 String
把字节数据转为 strings 类型
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
Data Stored as JSON
如果你在kafka 里面保存的是 json 格式,我们可以使用内嵌的 from_json 去解析,你需要传入一个 schema, 然后就变成了 Spark SQL 的类型了。
使用UDF注册反序列化类
你也可以注册一个 udf 进行反序列化,你可以实现一个 MyDeserialzer ,只要实现了 Kafka Deserializer 的接口即可
/ 输出数据到 kafka /
往 kafka 里面写数据也类似,我们可以在 DataFrame 上调用 writeStream 来写入 kafka, 指定value, key 是可选的,如果你不指定,就是null。如果 key 为null, 有时候可能导致 分区数据不均匀,需要注意一下。
需要打到哪个 topic, 可以在 D ataStreamWriter 上指定 option 配置, 也可以操作 DataFrame 的时候 在每条 record 上加一列 topic 字段指定。
这个例子中,DataFrame 中是用户信息数据,我们把 序列化后的 userId 作为 key,然后把所有字段 都转成 json string 格式当做 value
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 「学习笔记」无法远程访问 MySQL 碰到的坑
- Junit单元测试碰到静态变量如何处理
- IT 职场新人碰到的几个常见误区
- React小知识(3) - 国际化中碰到的问题
- 无依赖开发中的碰到的问题——封装DOM操作
- 第一次把mysql装进docker里碰到的各种问题
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
深入浅出程序设计(中文版)
Paul Barry、David Griffiths / 蒋雁翔、童健 / 东南大学出版社 / 2012-1 / 98.00元
《深入浅出程序设计(中文版)》介绍了编写计算机程序的核心概念:变量、判断、循环、函数与对象——无论运用哪种编程语言,都能在动态且多用途的python语言中使用具体示例和练习来运用并巩固这些概念。学习基本的工具来开始编写你感兴趣的程序,而不是其他人认为你应该使用的通用软件,并对软件能做什么(不能做什么)有一个更好的了解。当你完成这些,你就拥有了必要的基础去使用任何一种你需要或想要学习的语言或软件项目......一起来看看 《深入浅出程序设计(中文版)》 这本书的介绍吧!