RoP 重磅发布 0.2.0 版本: 架构全新升级,消息准确性达 100%

栏目: 软件资讯 · 发布时间: 2年前

内容简介:作者简介: 冉小龙-腾讯云中间件团队研发工程师 Apache Pulsar committer RoP 作者及 Maintainer Apache BookKeeper contributor Apache Pulsar Go client 作者 Apache Pulsar Go Functions作者 StreamNative/pul...

作者简介:

冉小龙-腾讯云中间件团队研发工程师

Apache Pulsar committer

RoP 作者及 Maintainer

Apache BookKeeper contributor

Apache Pulsar Go client 作者

Apache Pulsar Go Functions作者

StreamNative/pulsarctl 作者

摘要

日前,腾讯云中间件团队联合StreamNative社区正式发布了RoP 0.2.0版本,该版本在架构上全新升级,用户在使用中可以完全避免消息丢失、消息重复消费、只能消费一部分 Partition 的数据等问题。

RoP的定义

与 KoP、MoP 和 AoP 相似,RoP 是一种可插拔的协议处理插件。

将 RoP 协议处理插件添加到现有 Pulsar 集群后,用户无需修改代码,便能将现有的 RocketMQ 应用程序和服务迁移到 Pulsar,同时还能使用 Pulsar 的强大功能,例如:

计算与存储分离

多租户

跨地域复制

分层分片

轻量化计算框架 -- Pulsar Functions

•...

发布RoP 0.2.0

2021年5月17日,腾讯云中间件团队向社区贡献了 RoP 0.1.0 的 beta 版本,RoP(RocketMQ on Pulsar) 是 将 RocketMQ 协议处理插件引入 Pulsar Broker,这样 Pulsar 即可支持原生的 RocketMQ 协议,RocketMQ 用户可以无缝迁移到 Apache Pulsar

今天,我们重磅发布RoP 0.2.0 ,该版本在架构上全新升级,在功能和稳定性上得到了更大的提升。提供了 ACL 鉴权和验证的功能,可以更好的确保用户数据的安全性,同时允许用户对 Partitioned Topic 进行扩容,可以获得更好的并发写入能力, 并且完善了 RocketMQ 原生的管控端接口,可以更好的对服务进行处理和监控。

最新功能优化

在0.2.0版本中,腾讯云中间件团队在0.1.0的架构上进行全新设计,对MessageID 、消息路由模型进行重构,确保不同场景下 RoP 消息的准确性。

主要有以下三点优化内容:

  1. 支持 RoP ACL 功能

ACL 机制是RocketMQ 社区自带的一个能力,可以很好的对用户的数据进行鉴权和认证。 RoP 0.2.0 版本复用了 RocketMQ 自身的 Hook 实现,利用 Pulsar 自身的鉴权机制,实现了对用户数据进行鉴权和认证的功能。

RoP ACL 的使用方式依旧延续了 RocketMQ 的使用方式,只需定义 ACL_ACCESS_KEY 和 ACL_SECRET_KEY 字段,然后利用 RocketMQ 的 ACLRPCHook 函数加载即可,这样可以确保用户尽可能少的改动客户端的业务代码逻辑。

具体代码示例如下:

    private static final String ACL_ACCESS_KEY = "eyJrZXlJZCI6InJvY2tldG1xLW13bmI3bWFwMjhqZSIsImFsZyI6IkhTMjU2In0."

            + "eyJzdWIiOiJyb2NrZXRtcS1td25iN21hcDI4amVfdGVzdCJ9.BDOjqqY25a6apnZTMZCqg0I0pxVFcqz7fvZbaTqkf5U"; // token

    private static final String ACL_SECRET_KEY = "rop";



public static void producer() throws MQClientException {

        DefaultMQProducer producer = new DefaultMQProducer("rocketmq-mwnb7map28je|nit", "ProducerGroupName",

                getAclRPCHook());

...

    }



    static RPCHook getAclRPCHook() {

        return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY, ACL_SECRET_KEY));

    }

- ACL_ACCESS_KEY:即用户在 Namespace 级别下创建的 Token。

- ACL_SECRET_KEY:固定值,在 RoP 内部解析时,不会使用到这个字段。

 

  1. 重构 MessageID

RocketMQ 与 Kafka 类似,都是使用 64 位的 Offset 来唯一标识一条消息,但是在 Pulsar 中,使用 64 位的 LedgerID、64 位的 EntryID 来唯一标识一条消息。针对这个问题,在 RoP 0.1.0 中,我们使用如下的形式来构造 MessageID 对象:

RoP 重磅发布 0.2.0 版本: 架构全新升级,消息准确性达 100%

PartitionID: 8 位,可以允许一个 Topic 最多创建 256 个 Partitions

LedgerID: 32

EntryID: 24

使用如上的方式可能存在 MessageID 的消息精度丢失,在系统运行一段时间之后,无法继续创建出新的 LedgerID,导致整个集群的服务对外不可用的情况。这个问题与早期的 KoP 版本所面临的是同样的困境,所以在 RoP 0.2.0 中,我们采用了和 KoP 相同的处理方式,使用 [PIP 70: Introduce lightweight broker entry metadata](https://github.com/apache/pulsar/wiki/PIP-70%3A-Introduce-lightweight-broker-entry-metadata) 的处理思路,在 Broker 的协议头中,附加了一个 64 位的 index/publish-time 字段,这样无需在客户端侧进行协议的解析即可在每一条消息中附加一个 64 位的字段来使用。

PIP-70 是使用插件的方式进行加载的,所以在服务启动时,我们需要做如下配置:

brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor

> Note: Broker Entry Metadata 是在 Pulsar 2.8.0 的版本中才支持的,所以需要确保 Pulsar Broker 的版本在 2.8.0 及以上。

需要说明的是,RocketMQ 和 Kafka 在 Offset 的使用方式上又有所不同,RocketMQ 中有两个 Offset,一个是 Queue Offset,用来表示消息在 MessageQueue 中的位置, MessageQueue 本质上是一个数组,一条消息进来数组的下标就会 +1。一个是 CommitLog Offset 用来表示消息存储在 CommitLog 中的位置,消息存储是由 ConsumeQueue 和 CommitLog 配合完成,ConsumeQueue 是逻辑队列,CommitLog 是真正存储消息文件的,ConsumeQueue 存储的是指向物理存储的地址。 Topic 下的每个 MessageQueue 都有对应的 ConsumeQueue 文件,内容也会被持久化到磁盘。

所以,在 MessageID 重构的实现中,区别于 Kafka 中只有一个全局的 Offset 来标识消息的唯一性,在 RoP 中需要针对这两种 Offset 的情况分别进行处理,具体如下:

RoP 重磅发布 0.2.0 版本: 架构全新升级,消息准确性达 100%

  • RESERVED_BITS: 1 字节的保留位,避免第一个字节出现负数等情况导致 Offset 计算有误。
  • RETRY_TOPIC_TAG_BITS:1 字节的标记为,用来标识这个 Topic 是否为 Retry 类型的 Topic。
  • PULSAR_PARTITION_ID_BITS:10 字节的 Partition Num,用来记录一个 PartitionedTopic 下有多少个 Partitions,最大支持 1024 个 Partitions。
  • OFFSET_BITS:52 字节用来标识消息的 Offset。

 

  1. 重构消息的路由模型

在 RoP 0.1.0 的版本中,在消息路由的实现上,RocketMQ 和 Pulsar 都是首先通过 Topic Lookup 的操作找到对应的 Owner Broker 节点,然后将该 Broker 的地址返回。但是在这个动作中,忽略了一个重要的问题,即 RocketMQ 与 Kafka 和 Pulsar 都是不同的,它的 Queue 不是全局唯一的。

RocketMQ 路由协议主要包括两部分:

  1. Broker服务的 IP 地址信息;
  2. 某个Broker上对应的 Topic 分区总数以及分区可读写信息。

RocketMQ 路由协议中,没有全局标识 Topic 的分区的唯一ID(例如在Pulsar/Kafka中,分区 ID 在集群中是唯一的);而在 RocketMQ 中,分区路由信息是由 Broker 标识加上该 Broker 上的顺序从 0→N 的 Index 来标识 Topic 的分区。

因此 RocketMQ 协议中,客户端只需要获取到 Topic 对应 Broker 上分区总数,就能通过计算获得该 Broker 上分区的 ID;所有的请求都是基于【Broker-Tag】+【Broker-Topic-Seq】构建唯一路由查询原语来请求服务。简单来说:RocketMQ的分区是有状态的,他绑定在特定的Broker之上;分区一旦分配在某个Broker上,终身与之相关且不能迁移。客户端解析分区路由信息是通过计算得到;比如:某个TopicA有5个分区,分别落在BrokerA和BrokerB上,BrokerA有3个,BrokerB有2个;那么协议记录为(BrokerA,3)(BrokerB,2),客户端通过计算就得到全部的分区数据:

  • BrokerA-TopicA-0,BrokerA-TopicA-1,BrokerA-TopicA-2
  • BrokerB-TopicA-0, BrokerB-TopicA-1;

由于上面的路由关系的原因,所以没有办法通过 GET_ROUTEINTO_BY_TOPIC 这个协议请求去和Pulsar的Lookup协议去做映射。本质原因是像Kafka/Pulsar这种,它的Partition信息是全局唯一的,在执行 Topic 路由策略之后,能准确的返回某一个Topic 的 Partition所对应的Owner Broker是谁。但是RocketMQ的Topic路由返回的是两个字段,一个是Broker Name,一个是Queue的数量。具体的QueueID,是Client根据Broker返回的数量固定的从0开始递增计算。所以在Topic的路由映射中,RocketMQ和Pulsar自身的路由协议没办法一一映射。为了解决这个问题,在 RoP 0.2.0 中,抽象了一层 Proxy 用来维护 Topic 与 Broker 之间的映射关系。为了达到这个目的,这里主要有以下几方面的事情需要考虑:

  1. 这些映射关系存储在哪里?
  2. 如何分配路由关系?
  3. 当路由关系发生变化之后,如何处理?

针对第一个问题,综合考量,我们选择将路由的映射关系存储到 ZooKeeper 集群中来,因为当前 RoP 的服务本身也需要依赖 ZooKeeper 集群,不会引入新的组件;其次 ZooKeeper 自身的一致性能力能很好的满足这个场景需求。

针对第二个问题,我们是在 RoP 接口创建分区主题的同时,依次查找各个分区所在的 Broker 节点,依照初始主题所在节点信息为基准,将映射关系写入到 ZooKeeper 集群中。这样做的好处在于:

  • 复用Pulsar自身分区分配机制计算的结果,实现简单。
  • 初始分配后,虚拟节点和物理节点处于一个节点上,性能好。
  • 如果配合路由关系重平衡能力,可以实现最优性能。

针对第三个问题,我们通过增加 Master-Slave 模式,可以减少单节点故障对系统的影响。ZooKeeper元数据如下,只需要增加Broker相关信息,即可实现各个节点的互为主从关系,达到主节点不可用时从节点可以继续提供服务。由于当前 Offset 信息都存储在Compact Topic中,全部节点同时订阅,所以各个节点的元数据可以保证一致,可以实现主从切换。下面是测试环境中部署 RoP 集群中的路由映射关系:

RoP 重磅发布 0.2.0 版本: 架构全新升级,消息准确性达 100%

所以,为了保证 RoP 集群能有较好的容错能力,在部署 RoP 集群中建议使用偶数台节点。可以通过如下参数配置决定当前 Master 节点有几个 Slave 节点作为其备份节点:

RoPBrokerReplicationNum=2

假设有 6 台 Broker 节点,RoPBrokerReplicationNum=2, 那么就说明此时只有三台 Master Broker 节点对外提供服务。但是对于 Pulsar 来说,Broker 节点之间是对等的,当创建 Topic 的时候,可能会分配到任意节点上,所以对于不在Owner Broker节点上的请求,在 RoP Proxy 层做了一层代理,会先对该 Topic 进行查找的操作,然后将请求转发到 Owner Broker 的节点上来返回。

未来规划

为了更好的践行开源协同和开源共建的理念,目前,上述功能均已贡献回社区。除此之外针对 RocketMQ 商业版本的任意延迟消息功能,腾讯云中间件团队也基于 Pulsar 原生的特性开发了相关的插件来进行支持。RoP 的延迟消息功能除了支持多级别的延迟消息之外还具备支持任意延迟消息的能力。

之后,腾讯云中间件团队将会在确保 RoP 项目稳定的同时,持续开发 RoP 相关的功能,诸如消息轨迹,消息查询和回溯以及监控等能力,进一步完善 RoP 的功能以及周边生态。

RoP 项目地址:https://github.com/streamnative/rop

特别鸣谢

感谢腾讯云中间件团队韩明泽张勇华对本文提供的技术细节校验和支持。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

How Tomcat Works

How Tomcat Works

Budi Kurniawan、Paul Deck / BrainySoftware / 2004-4-1 / USD 54.95

A Guide to Developing Your Own Java Servlet Container一起来看看 《How Tomcat Works》 这本书的介绍吧!

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具