内容简介:本文中,我们来看下用 Structured Streaming 怎么处理 kafka中的复杂json 数据流,Structured Streaming 强项我们都知道,可以使用 event-time 进行聚合,可以使用 watermark处理延迟数据, 保证 exactly-once, 可以输出各种外部系统,spark 和 kafka 一起可以让你放飞自我我们先来了解下 kafka, 然后举几个 Structured Streaming 读写 kafka的例子,然后看一个 真实使用场景。/ kaf
本文中,我们来看下用 Structured Streaming 怎么处理 kafka中的复杂json 数据流,Structured Streaming 强项我们都知道,可以使用 event-time 进行聚合,可以使用 watermark处理延迟数据, 保证 exactly-once, 可以输出各种外部系统,spark 和 kafka 一起可以让你放飞自我
-
可以让你流式处理和批处理的代码一样一样的
-
整合各种外部存储系统,比如 S3,HDFS, MySQL
-
对每个批次的增使用 Catalyst 优化器 进行优化
我们先来了解下 kafka, 然后举几个 Structured Streaming 读写 kafka的例子,然后看一个 真实使用场景。
/ kafka 了解一下?/
kafka 是现在最流行的一个分布式的实时流消息系统,给下游订阅消费系统提供了并行处理和可靠容错机制,现在大公司在流式数据的处理场景, kafka 基本是标配。
kafka 把生产者发送的数据放在不同的分区里面,这样就可以并行进行消费了。每个分区里面的数据都是递增有序的,跟 structured commit log 类似,生产者和消费者使用 kafka 进行解耦,消费者不管你生产者发送的速率如何,我只要按照我们的节奏进行消费就可以了。每条消息在一个分区里面都有一个唯一的序列号 offset, kafka 会对内部存储的消息设置一个过期时间,如果过期了,就会标记删除,不管这条消息有没有被消费。
kafka 消费策略
kafka 可以被看成一个无限的流,里面的流数据是短暂存在的,如果不消费,消息就过期滚动没了,这就涉及一个问题,就是你如果开始消费,就要定一下从什么位置开始。
-
earliest 从最起始位置开始消费,当然不一定是从 0 开始,因为如果数据过期了,就清掉了,所以可以理解为从现存的数据里最小位置开始消费。
-
latest 这个好理解,从最末位置开始消费。
-
per-partition assignment , 对每个分区都指定一个 offset, 然后从这组 offset 位置开始消费。
当你第一次开始消费一个 kafka 流的时候,上述策略任选其一,如果你之前已经消费了,而且做了 checkpoint ,比如你消费程序升级了, 这时候就会从你上次结束的位置开始继续消费。
/ Structured streaming 对kafka的支持 /
Structured Streaming 对 批处理 和流处理 提供统一的 API, 这一点在我看来是很重要的,我们一套相同的处理逻辑,没必要写出来两套代码,减少了维护成本。 Structured Streaming 从 kafka 拉取消息,然后就可以把 流数据看做 一个 DataFrame , 一张无限增长的大表,在这个大表上做查询,Structured Streaming 给你保证了端到端的 exactly-once,你只需要关心你的业务即可,不用费心去关心底层是怎么做的。
从 kafka topics 中读取消息
首先需要你指定你的数据源,kafka 集群的连接地址,你需要消费的 topic, 指定topic 的时候,可以使用正则来指定,也可以指定一个 topic 的集合,是不是很灵活?我们这里只消费一个 topic
这个例子里面我们指定消费 topic1, 在 spark 内部 DataStreamReader 会根据你的 配置来 拉取指定的 topic, kafka.bootstrap.servers (i.e. host:port) 地址 和 topic 这两个参数是必填的。startingOffsets 可以选填,默认是 从 latest 位置开始消费
df.printSchema() , 我们来打印一些 DataFrame 的 schema
我们可以看到,schema 里面包含了 kafka record 里面的key value 和 相关 元信息字段,到这时候,我们就可以使用 DataFrame 和 Dataset 的方式来进行转换处理了,有人问,如果 kafka 消息的 value 字节数组里面包含的 是 json 数据怎么办,我们在这里拿到的 key 和 value 都是 字节数组,需要解析一下格式,当然 Spark SQL 内嵌了一些类型处理方法了。
Data Stored as a UTF8 String
把字节数据转为 strings 类型
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
Data Stored as JSON
如果你在kafka 里面保存的是 json 格式,我们可以使用内嵌的 from_json 去解析,你需要传入一个 schema, 然后就变成了 Spark SQL 的类型了。
使用UDF注册反序列化类
你也可以注册一个 udf 进行反序列化,你可以实现一个 MyDeserialzer ,只要实现了 Kafka Deserializer 的接口即可
/ 输出数据到 kafka /
往 kafka 里面写数据也类似,我们可以在 DataFrame 上调用 writeStream 来写入 kafka, 指定value, key 是可选的,如果你不指定,就是null。如果 key 为null, 有时候可能导致 分区数据不均匀,需要注意一下。
需要打到哪个 topic, 可以在 D ataStreamWriter 上指定 option 配置, 也可以操作 DataFrame 的时候 在每条 record 上加一列 topic 字段指定。
这个例子中,DataFrame 中是用户信息数据,我们把 序列化后的 userId 作为 key,然后把所有字段 都转成 json string 格式当做 value
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 「学习笔记」无法远程访问 MySQL 碰到的坑
- Junit单元测试碰到静态变量如何处理
- IT 职场新人碰到的几个常见误区
- React小知识(3) - 国际化中碰到的问题
- 无依赖开发中的碰到的问题——封装DOM操作
- 第一次把mysql装进docker里碰到的各种问题
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
The Definitive Guide to MongoDB
Peter Membrey、Wouter Thielen / Apress / 2010-08-26 / USD 44.99
MongoDB, a cross-platform NoSQL database, is the fastest-growing new database in the world. MongoDB provides a rich document orientated structure with dynamic queries that you’ll recognize from RDMBS ......一起来看看 《The Definitive Guide to MongoDB》 这本书的介绍吧!