RocketMQ架构

栏目: 后端 · 发布时间: 5年前

内容简介:Apache RocketMQ是一个分布式消息和流媒体平台,具有低延迟,高性能和高可靠性,亿万级容量和灵活的可扩展性.它由四部分组成: 名称服务器,代理服务器,生产者和消费者.它们中的每一个都可以水平扩展,而不会出现单点故障.如上图所示.名称服务器集群名称服务器提供轻量级服务发现和路由.每个名称服务器记录完整的路由信息,提供相应的读写服务,支持快速的存储扩展.代理集群代理关注的是消息存储,它通过提供轻量级主题和队列机制来处理消息存储.他们支持推拉模型,包含容错机制(2个副本和三个副本), 并提供原始时间顺序

Apache RocketMQ是一个分布式消息和流媒体平台,具有低延迟,高性能和高可靠性,亿万级容量和灵活的可扩展性.它由四部分组成: 名称服务器,代理服务器,生产者和消费者.它们中的每一个都可以水平扩展,而不会出现单点故障.如上图所示.

名称服务器集群名称服务器提供轻量级服务发现和路由.每个名称服务器记录完整的路由信息,提供相应的读写服务,支持快速的存储扩展.

代理集群代理关注的是消息存储,它通过提供轻量级主题和队列机制来处理消息存储.他们支持推拉模型,包含容错机制(2个副本和三个副本), 并提供原始时间顺序累计千亿条消息的峰值填充和容量.此外,代理还提供容灾,丰富的度量统计数据和报警机制,这些都是传统消息系统所缺少的.

生产者集群生产者支持分布式部署,分布式生产者通过多种负载平衡模式向代理集群发送消息,发送进程支持快速故障和低延迟.

消费者集群消费者集群也支持推拉模式中的分布式部署.它还支持集群消息和消息广播.它提供了实时消息订阅机制,可以满足大多数用户的需求,RocketMQ的网站为感兴趣的用户提供了一个非常简单的快速入门指南.

名称服务

名称服务器是一个功能齐全的服务器,主要包含该两个功能:

  • 代理管理, 名称服务器从代理集群接收注册,并提供心跳机制来检查代理是否存活.
  • 路由管理, 每个名称服务器将保存有关代理集群的整个路由信息和用于客户端查询的队列信息.

如我们所知, RocketMQ客户端(生产者/消费者)将从nameserver查询队列路由信息,但是客户端如何找到nameserver地址的呢?

有四种方式向客户端提供名称服务器地址列表:

producer.setNamesrvAddr("ip:port")
rocket.namesrv.addr
NAMESRV_ADDR

关于更深入的如何找到NameServer地址的,请查看这里

代理服务

代理服务器负责消息存储和传递,消息查询, 高可用保证等.

如下图所示, 代理服务器有几个重要的子模块:

RocketMQ架构
  • 远程处理模块:代理的入口,处理来自客户端的请求.
  • 客户端管理模块:管理客户端(生产者/消费者)并维护消费者的主题订阅.
  • 存储服务模块: 提供简单的API来存储或查询物理磁盘中的消息.
  • 高可用服务模块: 在主代理和从代理之间提供数据同步功能.
  • 索引服务: 根据特定key,建立消息索引,并提供快速消息查询.

部署(Deployment)

本节介绍生产就绪,部署解决方案.一般来说,我们正在部署一个没有单点故障的弹性rocketmq集群.

前提条件(Prerequisite)

在开始本节之前,请确保您已经阅读了快速入门部分,并且熟悉RocketMQ的核心概念和组件.

生产就绪部署

  • 名称服务器

为了确保集群在一个实例奔溃时任然能够正常工作,建议使用两个或多个名称服务器实例,只要有一个名称服务器实例处于活动状态,整个集群就保持服务状态.

名称服务器遵循无共享设计范式,代理将心跳数据发送到所有名称服务器,生产者和消费者可以在发送/消费消息时从任何可用的名称服务器查询元数据.

  • 代理

代理可以根据其角色分为两: 主从. 主代理提供RW(读写)访问,而从代理只接收读访问.

要在没有单点故障的情况下部署高可用RockeMQ集群,应该部署一系列代理集.一个代理集包含一个主代理和几个从代理,其中主代理brokerid设置为0,从代理brokerid设置为非0.一组代理集中都代理有相同的代理名称(brokerName).在极端情况下,在一个代理集中至少需要设置两个代理.每个主题驻留在两个或多个代理中.

配置

部署RocketMQ集群时,建议的配置如下:

Broker configuration

Property Name Default value Details
listenPort 10911 listen port for client
namesrvAddr null name server address
brokerIP1 InetAddress for network interface Should be configured if having multiple addresses
brokerName null broker name
brokerClusterName DefaultCluster this broker belongs to which cluster
brokerId 0 broker id, 0 means master, positive integers mean slave
storePathCommitLog $HOME/store/commitlog/ file path for commit log
storePathConsumerQueue $HOME/store/consumequeue/ file path for consume queue
mapedFileSizeCommitLog 1024 * 1024 * 1024(1G) mapped file size for commit log
deleteWhen 04 When to delete the commitlog which is out of the reserve time
fileReserverdTime 72 The number of hours to keep a commitlog before deleting it
brokerRole ASYNC_MASTER SYNC_MASTER/ASYNC_MASTER/SLVAE
flushDiskType ASYNC_FLUSH {SYNC_FLUSH/ASYNC_FLUSH}. Broker of SYNC_FLUSH mode flushes each message onto disk before acknowledging producer. Broker of ASYNC_FLUSH mode, on the other hand, takes advantage of group-committing, achieving better performance.

CLI管理工具

RocketMQ提供了一个CLI(命令行界面)管理工具,用于查询,管理和诊断各种问题.

如何获得

管理 工具 是随RocketMQ一起提供,你要么下载一个预构建的二进制版本,要么自己从源代码构建,这样你就拥有它了.

如果您需要源代码, RocketMQ 工具模块包含其源代码.

如何使用管理工具非常容易使用,这里处于演示的目的,假设为 Linux 的环境.

在mq安装目录下的/bin目录中,使用bash命令: mqadmin, 就可以看到以下的帮助菜单:

The most commonly used mqadmin commands are:
   updateTopic          Update or create topic
   deleteTopic          Delete topic from broker and NameServer
   updateSubGroup       Update or create subscription group
   deleteSubGroup       Delete subscription group from broker
   updateBrokerConfig   Update broker's config
   updateTopicPerm      Update topic perm
   topicRoute           Examine topic route info
   topicStatus          Examine topic Status info
   topicClusterList     get cluster info for topic
   brokerStatus         Fetch broker runtime status data
   queryMsgById         Query Message by Id
   queryMsgByKey        Query Message by Key
   queryMsgByUniqueKey  Query Message by Unique key
   queryMsgByOffset     Query Message by offset
   queryMsgByUniqueKey  Query Message by Unique key
   printMsg             Print Message Detail
   sendMsgStatus        Send msg to broker
   brokerConsumeStats   Fetch broker consume stats data
   producerConnection   Query producer's socket connection and client version
   consumerConnection   Query consumer's socket connection, client version and subscription
   consumerProgress     Query consumers's progress, speed
   consumerStatus       Query consumer's internal data structure
   cloneGroupOffset     Clone offset from other group
   clusterList          List all of clusters
   topicList            Fetch all topic list from name server
   updateKvConfig       Create or update KV config
   deleteKvConfig       Delete KV config
   wipeWritePerm        Wipe write perm of broker in all name server
   resetOffsetByTime    Reset consumer offset by timestamp(without client restart)
   updateOrderConf      Create or update or delete order conf
   cleanExpiredCQ       Clean expired ConsumeQueue on broker.
   cleanUnusedTopic     Clean unused topic on broker
   startMonitoring      Start Monitoring
   statsAll             Topic and Consumer tps stats
   syncDocs             Synchronize wiki and issue to github.com
   allocateMQ           Allocate MQ
   checkMsgSendRT       Check message send response time
   clusterRT            List All clusters Message Send RT

复制代码

主从复制模式

为了确保不会丢失任何成功发布的消息,RocketMQ提供了一种复制模式,通过两种复制方式: 同步和异步,以获得更强的持久性和高可用性.

主从复制: 同步/异步代理

与许多复制系统一样,同步代理要等到提交日志被复制到从服务器后才能确认.相反,异步代理在主服务器上处理消息后立即返回.

如何配置

在conf文件夹下的rocketmq发行版附带了三个预构建的配置供您参考.

2m-2s-sync
2m-2s-async
2m-noslave
复制代码

注意: 所有的配置使用了:异步刷新的方式.

部署

以2M-2S-SYNC的部署为例,首先,启动两个名称服务器,如快速启动部分所示: 假设他们的IP为192.168.0.2和192.168.0.3

开启代理(假设二进制rocketmq位于/home/rocketmq/dist)

>cd /home/rocketmq/dist/bin
>bash mqbroker -c ../conf/2m-2s-sync/broker-a.properties -n 192.168.0.2:9876,192.168.0.3:9876
>bash mqbroker -c ../conf/2m-2s-sync/broker-a-s.properties -n 192.168.0.2:9876,192.168.0.3:9876
>bash mqbroker -c ../conf/2m-2s-sync/broker-b.properties -n 192.168.0.2:9876,192.168.0.3:9876
>bash mqbroker -c ../conf/2m-2s-sync/broker-b-s.properties -n 192.168.0.2:9876,192.168.0.3:9876
How to verify
Execute the following command to verify according to the CLI section:
> bash mqadmin clusterlist
复制代码

核心概念

RocketMQ架构

根据上面的模型,我们可以深入探讨消息传递系统设计的一些话题:

  • 消费者并发问题
  • 消费者热点问题
  • 消费者负载均衡问题
  • 消息路由
  • 连接多路复用
  • 灰度部署(Canary Deployments)

生产者

生产者将业务应用程序系统生成的消息发送给代理,RocketMQ提供了多种发送模式: 同步,异步和单向传输

生产组

相同角色的生产者被分组在一起.如果原始生产这在宕机崩溃,代理可以联系同一生产者组的不同生产者实例来提交或者回滚.

考虑到所提供的生产者在发送消息时足够强大,每个生产组只允许一实例,以避免不必要的生产者实例初始化.

消费者

消费者从代理中提取消并将消息输入应用程序.

从用户应用的角度来看,提供了两种类型的用户:

推送消费者

另一方面, punsh-consumer封装了消息拉取,消耗进度和维护内部的其他工作,为最终用户留下一个回调接口来实现,该接口将在消息到达时执行.

拉取消费者

拉消费者积极从代理中拉取消息,一旦一批消息被拉取出来,用户应用程序就会启动消费过程.

消费组

与前面提到的生产者组类似,具有完全相同角色的消费这被分组在一起,并命名为消费者组.

消费时一个伟大的概念,在消息消费方面,实现负载均衡和容错的目标非常容易.

注意:消费者组的消费实例必须具有完全相同的主题订阅.

主题

主题是生产者投递消息,消费者拉取消息的一个类别.主题的生产者,消费者的关系非常松散.具体来说,一个主题可以有零个,一个或或者多个向其发送消息的生产者;相反,生产者可以发送不同主题的消息.从消费者角度来看,一个主题可以由零个,一个或多个消费者群体订阅.同样,只要消费组的实例保持订阅一致,用户组就可以订阅一个或多个主题.

消息

消息是要传递的信息.消息必须有一个主题,可以将其解释为要邮寄到的信件地址.消息还可以具有可选的标记和额外的键值对.例如,您可以为消息设置业务密钥,并在代理服务器上查找消息,以诊断开发过程中的问题.

消息队列

主题被划分为一个或多个子主题:"消息队列"

标签

换句话说,标签子主题为生产者提供了额外的灵活性.对于标签,来自同一业务模块的具有不同目的的消息可能具有相同的主题和不同标记.标签有助于保持代码的整洁和一致,而且标签还可以帮助RocketMQ提供查询系统.

代理

代理时RocketMQ系统的主要组成部分,它接收来自生产者的消息,存储它们,并准备处理来自消费者的请求.他还存储与消息相关的元数据,包括消费组,消费进度偏移量和主题/队列信息.

名称服务器

名称服务器用作路由信息提供者.生产者/消费者客户端查找主题以查找相应的代理列表.

消息模型

  • 集群
  • 广播

消息顺序

当使用DefaultMQPushConsumer时,您可以决定是有序的或者是并发的消费消息.

  • 有序的 按顺序消费消息意味着,消息的消费顺序与生产者为每个消息队列发送的顺序相同,如果您正在处理全局顺序是必需的场景,请确保您使用的主题只有一个消息队列.

注意:如果指定了按顺序消费,则消息消费的最大并发性是消费组订阅的消息队列数.

  • 并发的 当并发消费消息是,消费的最大并发性仅仅受每个消费者客户端指定的线程池的限制.

注意:在此模式下,不再保证消息的顺序

代理的最佳实践

对使用者的一些有用方法.

代理角色

代理角色有:异步主节点,同步主节点和从节点. 如果你不能容忍消息丢失,我们建议你部署同步主机并在其上附加从属服务器.如果您对丢失可以容忍,但希望代理始终可用,则可以将ASYNC_MASTER和SLAVE一起部署,如果你只想简单一点,你可能只需要一个没有从节点的异步主机.

刷新磁盘类型

建议使用异步刷新,因为同步刷新时非常昂贵的,会造成太多的新能损失.如果你想要可靠性,我们建议您使用同步主机和丛机.

生产者的最佳实践

对使用者的一些有用的方法.

发送状态

当发送一条消息,你将会得到一个发送状态和一个发送结果,首先我们建设消息的iswaitstoremsgok=true(默认为true), 如果没有异常,我们将始终收到"OK", 以下是每个状态描述列表:

FLUSH_DISK_TIMEOUT:刷新超时时间

如果代理设置了MessageStoreConfig的FlushDiskType=SYNC_FLUSH(默认是ASYNC_FLUSH),并且代理未在messagestoreconfig的syncflushtimeout默认为5秒)内完成磁盘刷新,则将获得此状态.

FLUSH_SLAVE_TIMEOUT:刷新从节点超时

如果代理的角色时同步主机(默认异步主机),如果从机代理没有在MessageStoreConfig的syncFlushTimeout(默认时5秒)时间内异步刷新,则将获得此状态.

SLAVE_NOT_AVAILABLE:从机不可用

如果代理的角色是同步主代理(默认时异步主代理), 但是没有配置从属代理,则将会获得这个状态.

SEND_OK:发送成功

发送"确定"并不意味着它是可靠的,为确保不会丢失任何消息,应该启用同步主机或同步刷新.

Duplication or Missing: 重复或丢失

如果您得到了flush-disk-timeout,flush-slave-timeout,并且代理在此时正好关闭,那么您可以发现您的消息丢失了.这个时候你有两种选择,一个是让它离开,这可能导致这个消息丢失,另一个时重新发送消息,这个可能会导致消息重复.通常,我们建议重新发送并在使用时找到处理重复删除的方法.除非你觉得当一些信息丢失时没有关系.但是请记住,当没有从属主机时,重新发送是无用的,如果发生这种情况,宁应该保留场景并警告集群管理器.

超时

客户端向代理发送请求,并等待响应,但是如果最大等待事件已经过去,并且没有返回响应,则客户端将抛出一个RemotingTimeoutException.默认等待时间时3秒,你可以通过使用send(msg, timeout) 方法来设置超时时间来代替send(msg).注意,我们不建议等待时间设置得太短.因为代理需要一些时间来刷新磁盘或与从属服务器同步.

另外,如果该值超过了syncflushtimeout太多,则效果可能很小,因为代理可能会在超时之前返回FLUSH_SLAVE_TIME 或 FLUSH_SLAVE_TIMEOU

消息体大小

我们建议消息提的大小时不超过512K.

异步发送(Async Sending)

默认的send(msg方法在接收到返回响应消息之前,将会被一直阻塞,因此,如果宁关心性能,我们建议您使用send(msg, callback), 它将以异步的方式进行工作.

生产组(Producer Group)

通常,生产者组是没有影响的.但是如果你加入了一个事物,你就应该注意,默认情况下,在同一个JVM中只能创建一个具有相同生产组的生产这,这通常已经足够了.

线程安全

生产者是线程安全的.您可以业务解决方案中使用它.

性能

如果你希望在一个JVM中有多个生产者进行大数据处理.我们建议:

  • 使用几个生产者异步发送(3-5个就足够了)
  • 为每个生产者设置实例名称

消费的最佳实践

为用户提供一些有用的提示

消费组和订阅

首先,你需要注意的是,不用的消费组可以独立的消费同一个主题,并且每个消费组都有自己的消费补偿.请确保同一组中的每个消费者订阅相同的主题.

消息监听器

Orderly

消费者将锁定每个消息队列,以确保按顺序逐个消费它.这个将导致性能损失,但当你关系消息的顺序时,它是有用的.不建议抛出异常,你可以返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT.

Concurrently

顾名思义,使用者将并发消费.建议使用它以获得良好的性能,不建议抛出异常,你可以使用ConsumeConcurrentlyStatus.RECONSUME_LATER 来进行替代.

Consume Status

对MessageListenerConcurrently来说,你可以返回 RECONSUME_LATER 告诉消费者你现在不能马上消费它,想稍等一会儿再消费它,然后你可以继续消费其他消息,对MessageListenerOrderly来说,因为你关心的是顺序,你不能跳过某一条消息,但是你可以返回 SUSPEND_CURRENT_QUEUE_A_MOMENT (暂停当前队列)来告诉消费者等待一会儿

Blocking

不建议阻塞监听器,因为它会阻塞线程池,最终可能会导致消费进程的停止.

线程数量

消费者使用 ThreadPoolExecutor 来处理内部消费,因此您可以通过 setConsumeThreadMin 或者 setConsumeThreadMax 来改变最大消费线程数和最大消费线程数.

从哪里开始消费

当一个新的消费组建立时,它将需要决定是否需要消费已经存在与代理中的历史消息.

CONSUME_FROM_LAST_OFFSET: 这个配置将会忽略历史消息,并消费之后产生的任何内容.

CONSUME_FROM_FIRST_OFFSET: 这个配置将会消费代理中存在的所有消息.

CONSUME_FROM_TIMESTAMP: 这个配置将消费指定时间戳之后再生成消息.

重复

许多情况可能导致重复,例如:

  • 生产者重复发送(例如: 在FLUSH_SLAVE_TIMEOUT的情况下)
  • 消费者关闭,某些补偿机制未及时更新到代理

因此,如果应用程序不能容忍重复,你可能需要做一些外部工作来处理这一问题,例如: 通过检查数据库的主键.

名称服务的最佳实践

在ApacheRocketMQ中,名称服务器被设计为协调分布式系统的每个组件,协调主要通过管理主题路由信息来实现.

管理主要由两部分构成:

  • 代理定期更新保存在每个名称服务器中的元数据
  • 名称服务器为客户端服务,包括生产者,消费者和命令行客户机提供最新的路由信息

因此,在启动代理和客户端之前,我们需要告诉他们如果通过名称服务器提供名称服务器地址列表来访问名称服务器.在Apache RocketMQ中可以通过四种方式实现.

编程的方式

Java配置项

名称服务器地址列表可以通过在启动之前指定后继 Java 选项rocketmq.namesrv.addr来提供给应用程序.


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

C++标准库(第2版)

C++标准库(第2版)

Nicolai M. Josuttis / 侯捷 / 电子工业出版社 / 2015-6 / 186.00元

《C++标准库(第2版)》是全球C++经典权威参考书籍时隔12年,基于C++11标准的全新重大升级。标准库提供了一组公共类和接口,极大地拓展了C++语言核心功能。《C++标准库(第2版)》详细讲解了每一标准库组件,包括其设计目的和方法、复杂概念的剖析、实用而高效的编程细节、存在的陷阱、重要的类和函数,又辅以大量用C++11标准实现的实用代码范例。除覆盖全新组件、特性外,《C++标准库(第2版)》一......一起来看看 《C++标准库(第2版)》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

MD5 加密
MD5 加密

MD5 加密工具

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具