当 Structured Streaming 碰到 Kafka

栏目: 后端 · 发布时间: 6年前

内容简介:本文中,我们来看下用  Structured Streaming 怎么处理 kafka中的复杂json 数据流,Structured Streaming 强项我们都知道,可以使用 event-time  进行聚合,可以使用  watermark处理延迟数据, 保证 exactly-once, 可以输出各种外部系统,spark 和 kafka 一起可以让你放飞自我我们先来了解下 kafka, 然后举几个  Structured Streaming 读写 kafka的例子,然后看一个 真实使用场景。/ kaf

本文中,我们来看下用  Structured Streaming 怎么处理 kafka中的复杂json 数据流,Structured Streaming 强项我们都知道,可以使用 event-time  进行聚合,可以使用  watermark处理延迟数据, 保证 exactly-once, 可以输出各种外部系统,spark 和 kafka 一起可以让你放飞自我

  • 可以让你流式处理和批处理的代码一样一样的

  • 整合各种外部存储系统,比如 S3,HDFS, MySQL

  • 对每个批次的增使用 Catalyst 优化器 进行优化

我们先来了解下 kafka, 然后举几个  Structured Streaming 读写 kafka的例子,然后看一个 真实使用场景。

/ kafka 了解一下?/ 

kafka 是现在最流行的一个分布式的实时流消息系统,给下游订阅消费系统提供了并行处理和可靠容错机制,现在大公司在流式数据的处理场景, kafka 基本是标配。

kafka 把生产者发送的数据放在不同的分区里面,这样就可以并行进行消费了。每个分区里面的数据都是递增有序的,跟 structured commit log 类似,生产者和消费者使用 kafka 进行解耦,消费者不管你生产者发送的速率如何,我只要按照我们的节奏进行消费就可以了。每条消息在一个分区里面都有一个唯一的序列号 offset, kafka 会对内部存储的消息设置一个过期时间,如果过期了,就会标记删除,不管这条消息有没有被消费。

kafka 消费策略

kafka 可以被看成一个无限的流,里面的流数据是短暂存在的,如果不消费,消息就过期滚动没了,这就涉及一个问题,就是你如果开始消费,就要定一下从什么位置开始。

  • earliest 从最起始位置开始消费,当然不一定是从 0 开始,因为如果数据过期了,就清掉了,所以可以理解为从现存的数据里最小位置开始消费。

  • latest 这个好理解,从最末位置开始消费。

  • per-partition assignment , 对每个分区都指定一个 offset, 然后从这组 offset 位置开始消费。

当 Structured Streaming 碰到 Kafka

当你第一次开始消费一个 kafka 流的时候,上述策略任选其一,如果你之前已经消费了,而且做了 checkpoint ,比如你消费程序升级了, 这时候就会从你上次结束的位置开始继续消费。

/ Structured streaming 对kafka的支持 / 

Structured Streaming 对 批处理 和流处理 提供统一的 API, 这一点在我看来是很重要的,我们一套相同的处理逻辑,没必要写出来两套代码,减少了维护成本。 Structured Streaming 从 kafka 拉取消息,然后就可以把 流数据看做 一个 DataFrame , 一张无限增长的大表,在这个大表上做查询,Structured Streaming 给你保证了端到端的 exactly-once,你只需要关心你的业务即可,不用费心去关心底层是怎么做的。

从 kafka topics 中读取消息

首先需要你指定你的数据源,kafka 集群的连接地址,你需要消费的 topic, 指定topic 的时候,可以使用正则来指定,也可以指定一个 topic 的集合,是不是很灵活?我们这里只消费一个 topic

当 Structured Streaming 碰到 Kafka

这个例子里面我们指定消费 topic1, 在 spark 内部 DataStreamReader 会根据你的 配置来 拉取指定的 topic,  kafka.bootstrap.servers (i.e. host:port) 地址   和 topic 这两个参数是必填的。startingOffsets 可以选填,默认是 从 latest 位置开始消费

df.printSchema()  , 我们来打印一些 DataFrame 的 schema

当 Structured Streaming 碰到 Kafka

我们可以看到,schema 里面包含了 kafka record 里面的key value 和 相关 元信息字段,到这时候,我们就可以使用 DataFrame 和 Dataset 的方式来进行转换处理了,有人问,如果 kafka 消息的 value 字节数组里面包含的 是 json 数据怎么办,我们在这里拿到的 key 和 value 都是  字节数组,需要解析一下格式,当然  Spark SQL 内嵌了一些类型处理方法了。

Data Stored as a UTF8 String

把字节数据转为 strings 类型

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

Data Stored as JSON

如果你在kafka 里面保存的是 json 格式,我们可以使用内嵌的  from_json 去解析,你需要传入一个 schema, 然后就变成了  Spark SQL  的类型了。

当 Structured Streaming 碰到 Kafka

使用UDF注册反序列化类

当 Structured Streaming 碰到 Kafka

你也可以注册一个 udf 进行反序列化,你可以实现一个 MyDeserialzer ,只要实现了 Kafka Deserializer 的接口即可

 / 输出数据到 kafka / 

往 kafka 里面写数据也类似,我们可以在 DataFrame 上调用 writeStream  来写入 kafka, 指定value, key 是可选的,如果你不指定,就是null。如果 key 为null, 有时候可能导致 分区数据不均匀,需要注意一下。

需要打到哪个 topic, 可以在  D ataStreamWriter 上指定 option 配置, 也可以操作 DataFrame 的时候 在每条 record 上加一列 topic 字段指定。

当 Structured Streaming 碰到 Kafka

这个例子中,DataFrame 中是用户信息数据,我们把 序列化后的 userId 作为 key,然后把所有字段 都转成 json string 格式当做 value


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Language Implementation Patterns

Language Implementation Patterns

Terence Parr / Pragmatic Bookshelf / 2010-1-10 / USD 34.95

Knowing how to create domain-specific languages (DSLs) can give you a huge productivity boost. Instead of writing code in a general-purpose programming language, you can first build a custom language ......一起来看看 《Language Implementation Patterns》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

随机密码生成器
随机密码生成器

多种字符组合密码

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具